Source file: mdx/script/manitou-mdx
#!/usr/bin/perl
# manitou-mdx v-1.3.1
# Copyright (C) 2004-2014 Daniel Verite
# This file is part of Manitou-Mail (see http://www.manitou-mail.org)
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
#####################################################################
# manitou-mdx: mail-database exchanger
# This program basically does the following:
#
# 1) polls directories where incoming mail files are dropped
# (by an external delivery agent): parse them, run plugins
# and apply filter rules, and import them into the database.
# 2) polls the database for new outgoing mail to pass
# on to a local delivery agent like sendmail.
#
#####################################################################
use strict;
use Encode;
use DBI;
use DBD::Pg;
use MIME::Head;
use MIME::Entity;
use MIME::Words qw(:all);
use MIME::Parser;
use MIME::Body;
use Mail::Address;
use Mail::Internet;
use POSIX qw(strftime mktime tmpnam getuid);
use File::stat;
use IO::Handle;
use IPC::Open3;
use URI::Escape;
use Getopt::Long;
use File::Temp qw(tempfile tempdir);
use File::Basename qw(basename dirname);
use Fcntl qw(:seek);
use Time::HiRes qw(gettimeofday tv_interval);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError) ;
use Manitou::Filters;
use Manitou::Jobs;
use Manitou::MailFormat;
use Manitou::Words qw(load_stopwords index_words flush_word_vectors
clear_word_vectors last_flush_time);
use Manitou::Tags qw(action_tag);
use Manitou::Attachments qw(flatten_and_insert_attach
detach_text_attachments
attach_parts create_html_part has_attachments);
use Manitou::Encoding qw(encode_dbtxt decode_dbtxt header_decode);
use Manitou::Config qw(getconf getconf_bool add_mbox readconf
set_common_conf mailboxes);
use Manitou::Database qw(db_connect);
use Manitou::Log qw(notice_log error_log init_log debug_log warning_log);
use Data::Dumper;
my $DEBUG=$ENV{'MANITOU_MDX_DEBUG'};
my $dbh;
my $global_end;
my $mail_id; # currently processed
my $verbosity;
# hash of mail_id that we failed to send and do not want to retry
my %hsend_blocked;
my %preprocess_plugins; # mbox => array of plugins in the order of execution
my %postprocess_plugins;
my %mimeprocess_plugins;
my %outgoing_plugins;
my @maintenance_plugins;
my %loaded_plugins;
my %options;
# header -> mail_addresses.addr_type
my %hAdrTypes=( "From" => 1,
"To" => 2,
"Cc" => 3,
"ReplyTo" => 4,
"Bcc" => 5
);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main_multi();
# Write the current time into the runtime_info table to tell we're
# currently running
sub update_runtime_timestamp {
local $dbh->{AutoCommit}=1;
my ($key)=@_;
my $t=time;
my $sth=$dbh->prepare("UPDATE runtime_info SET rt_value=? WHERE rt_key=?");
$sth->execute($t,$key);
if (!$sth->rows) {
my $sthi=$dbh->prepare("INSERT INTO runtime_info(rt_key,rt_value) VALUES (?,?)");
$sthi->execute($key,$t);
$sthi->finish;
}
$sth->finish;
return $t;
}
sub update_runtime_errcount {
local $dbh->{AutoCommit}=1;
if (!$dbh->do("UPDATE runtime_info SET rt_value=to_char(to_number(rt_value,'9999999999')+1, '9999999999') WHERE rt_key='nb_errors'")) {
$dbh->do("INSERT INTO runtime_info(rt_key,rt_value) VALUES ('nb_errors','1')");
}
}
sub do_flush_word_vectors {
$dbh->begin_work;
print "Flushing vectors..." if ($verbosity);
Manitou::Words::flush_word_vectors($dbh);
Manitou::Words::clear_word_vectors;
$dbh->commit;
print "done\n" if ($verbosity);
}
sub list_pretagged_files {
my $spooldir=$_[0];
my @dirs=readdir($_[1]);
my %tags;
for my $d (@dirs) {
next if ($d eq "." || $d eq "..");
my $t=uri_unescape($d);
$t=Encode::decode("utf-8", $t, Encode::FB_PERLQQ);
$t =~ s/\//->/g;
if (opendir(my $dh, $spooldir."/".$d)) {
my @files=grep (/^mail-(\d+\-\d+\-\d+)\.received$/, readdir($dh));
foreach (@files) {
$tags{$d."/".$_} = $t;
}
closedir $dh;
}
}
return %tags;
}
# mbox: identity from the configuration file
# origin: {type=>tempfile|dot-received|maildir , location=>'/some/path' }
sub import_mailfiles {
my ($mbox, $origin)=@_;
my ($done, $ret);
my @files;
my $dir;
my %pretagged_files;
if ($origin->{type} eq "dot-received") {
$dir=$origin->{location};
my $dh;
if (!opendir($dh, $dir)) {
error_log("Unable to access mailfiles_directory $dir: $!");
return 0;
}
if (getconf("mailfiles_directory_flattened_folders", $mbox)) {
%pretagged_files = list_pretagged_files($dir, $dh);
@files = keys %pretagged_files;
}
else {
@files=grep (/^mail-(\d+\-\d+\-\d+)\.received$/, readdir($dh));
}
closedir $dh;
}
elsif ($origin->{type} eq "maildir") {
$dir=$origin->{location}."/new";
my $dh;
if (!opendir($dh, $dir)) {
error_log("Unable to access directory $dir: $!");
return 0;
}
@files=grep { -f "$dir/$_" } readdir($dh);
closedir $dh;
}
elsif ($origin->{type} eq "tempfile") {
@files = (basename($origin->{location}));
$dir = dirname($origin->{location});
}
else {
error_log("Unknown type in import for identity '$mbox'");
}
my $import_count=0;
foreach my $rel_fname (sort @files) {
# we don't want to import too much files in one go to avoid
# having maintenance plugins and the sending of outgoing mail
# being delayed
last if ($global_end || $import_count>=20);
my $fname = "$dir/$rel_fname";
my $st=stat($fname);
my $proc_filename = $fname;
my $basename = $fname;
my $do_process=1;
if ($origin->{type} eq "dot-received") {
$proc_filename =~ s/(.*)\.received$/$1.$$.processing/;
$basename=$1;
if (!$st || !rename($fname, $proc_filename)) {
$do_process=0;
error_log("Cannot rename $fname");
}
}
if (getconf_bool("auto_db_reconnect") && !$dbh->ping()) {
db_reconnect();
}
if ($do_process) {
my $t0 = [gettimeofday];
$import_count++;
my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime ($st->mtime));
my %plugins_ctxt;
$plugins_ctxt{filename}=$proc_filename;
$plugins_ctxt{filesize} = $st->size;
$plugins_ctxt{dbh}=$dbh;
$plugins_ctxt{stage}="preprocess";
$plugins_ctxt{status}=0; # can be updated by plugins
$plugins_ctxt{notice_log} = \¬ice_log;
$plugins_ctxt{error_log} = \&error_log;
my $l=getconf("tags_incoming", $mbox);
@{$plugins_ctxt{tags}}=@{$l} if ($l);
if (exists $pretagged_files{$rel_fname}) {
push @{$plugins_ctxt{tags}}, $pretagged_files{$rel_fname};
}
@{$plugins_ctxt{tags}}=@{$l} if ($l);
for my $preproc_plugin (@{$preprocess_plugins{$mbox}}) {
$preproc_plugin->process(\%plugins_ctxt);
}
my %ctxt;
my $fh;
if (open($fh, $proc_filename)) {
$ctxt{'filename'} = $fname;
$ctxt{'filesize'} = $st->size;
$ctxt{'proc_filename'} = $proc_filename;
$ctxt{'str_date'} = $str_file_date;
if (defined($plugins_ctxt{'tags'})) {
@{$ctxt{'tags'}}=@{$plugins_ctxt{'tags'}};
}
if ($plugins_ctxt{'status'}) {
$ctxt{'status'}=$plugins_ctxt{'status'};
}
$ret = import_message($mbox, $fh, \%ctxt);
close($fh);
}
else {
$ret=0;
}
$done = $proc_filename;
if ($ret>0) {
$done = "$basename.processed";
} elsif ($ret==0) {
$done = "$basename.error";
} elsif ($ret==-1) {
$done = "$basename.discarded";
}
my $done_ok=1;
if ($origin->{type} eq "dot-received") {
if (!rename($proc_filename, $done)) {
error_log("Rename failed: $proc_filename => $done");
$done_ok=0;
}
}
if ($done_ok) {
if ($ret>0) {
$plugins_ctxt{filename}=$done;
$plugins_ctxt{stage}="postprocess";
if (defined($ctxt{'tags'})) {
@{$plugins_ctxt{'tags'}}=@{$ctxt{'tags'}};
}
for my $plugin (@{$postprocess_plugins{$mbox}}) {
$plugin->process(\%plugins_ctxt);
}
my $msg=sprintf("Imported: %s in %0.2fs", $fname, tv_interval($t0));
notice_log($msg);
update_runtime_timestamp("last_import");
}
elsif ($ret==0) {
error_log("Import failed: $fname");
update_runtime_timestamp("last_error");
update_runtime_errcount();
}
elsif ($ret==-1) {
error_log("Discarded: $fname");
}
my $cmd=getconf("postprocess_mailfile_cmd", $mbox);
if (defined $cmd) {
my $result="imported";
if ($ret==0) { $result="error"; }
elsif ($ret==-1) { $result="discarded"; }
system("/bin/sh", "-c", $cmd, $result, $done, $mbox);
if (($?>>8)!=0) {
warning_log("Execution of postprocess_mailfile_cmd failed (\`$cmd\`, exit code=".($?>>8).")");
}
}
if ($origin->{type} eq "maildir") {
if (!unlink($proc_filename)) {
error_log("Failed to delete file $proc_filename: $!");
}
}
if ($ret!=0 && $origin->{type} eq "dot-received" &&
getconf_bool("delete_processed_mail_files", $mbox)) {
if (!unlink($done)) {
error_log("Failed to delete file $done: $!");
}
}
}
}
# Word indexing: flush the word vectors to the db if necessary
my $widx_size=Manitou::Words::queue_size();
if ($widx_size > 0) {
if ($widx_size >= getconf("flush_word_index_max_queued") ||
time-Manitou::Words::last_flush_time() >= getconf("flush_word_index_interval")) {
do_flush_word_vectors;
}
}
}
# returns the number of files that are still candidates to being imported
# in most cases that will be 0
return @files-$import_count;
}
sub min {
my $m=shift;
foreach (@_) {
$m=$_ if ($_<$m);
}
return $m;
}
sub is_excluded {
my $c=shift;
my $r=getconf("exclude_contents");
if (defined $r) {
my @x=split /\s+,\s+/, $r;
foreach (@x) {
return 1 if ($_ eq $c);
}
}
return 0;
}
sub db_reconnect {
db_connect();
notice_log("Successful database reconnect");
}
# Extract one message from a mailbox and copy it into a temporary file
# The first line may be ^From_ but it may also be the next line
# when the ^From_ has already been read as the end marker of the previous
# message
sub extract_mbox_tmpfile {
my ($pfh)=@_; # may be at end of file
my $fh=$$pfh;
my $end=0;
my $line=0;
my $llwe=0; # last line was empty
my ($ofh,$name);
while (<$fh>) {
$line++;
if (/^From /) {
if ($line==1) {
next;
}
else {
last if ($llwe); # empty line followed by ^From_ means end of mail
}
}
if (!defined $ofh) {
($ofh,$name)= tempfile();
if (!$ofh) {
error_log("Could not create temporary file when parsing mailbox:$!");
return undef;
}
}
print $ofh $_;
$llwe=($_ eq "\n")?1:0;
}
close $ofh if (defined $ofh);
return $name;
}
sub main_multi {
my $mailbox_file;
my $conf_file;
my $global_tag;
my $global_mbox;
my $option_fork;
my $option_pidfile;
my ($option_import_list, $option_import_basedir);
my $mbox_skip=0;
my $rc = GetOptions("mboxfile:s" => \$mailbox_file,
"status:i", => \$options{'status'},
"conf:s" => \$conf_file,
"tag:s" => \$global_tag,
"fork" => \$option_fork,
"import-list:s" => \$option_import_list,
"import-basedir:s" => \$option_import_basedir,
"pidfile:s" => \$option_pidfile,
"mailbox:s" => \$global_mbox,
"skip:i" => \$mbox_skip,
"verbosity" => \$verbosity);
if (!$rc) {
print STDERR "Usage: $0 [--conf=config_file] [[--mboxfile=path [--skip=# of msgs]] [--status=import_status] [--tag=tagname] ]\n";
print STDERR "Usage: $0 [--conf=config_file] --import-list=path [--import-basedir=directory] [--status=import_status] [--tag=tagname] [--mailbox=mailbox_name]\n";
print STDERR "Usage: $0 [--conf=config_file] --fork [--pidfile=/path/to/pidfile]\n";
exit 1;
}
if (!defined $conf_file && -e "/etc/manitou-mdx.conf") {
$conf_file="/etc/manitou-mdx.conf"; # default config file
}
if (defined $conf_file) {
my %err;
if (!readconf($conf_file, \%err)) {
print STDERR "Error in config file: ", $err{msg}, "\n";
exit 1;
}
}
init_log();
init_temp_dir();
if (defined $global_mbox && $global_mbox ne "") {
# let init_mailboxes create the mailbox if necessary
add_mbox($global_mbox);
}
# If $option_fork is set, we connect to the database only to test if the
# connection succeeds, so that we can exit with a useful error message
# instead of forking.
$dbh = db_connect();
init_identities();
init_plugins();
load_stopwords($dbh);
# Build the list of directories where mailfiles are to be looked for
# A directory points to a mailbox (possibly an anonymous mailbox)
my %mailfiles_dirs;
my %maildirs;
my %spool_mboxes;
for my $m (mailboxes()) {
my $dir=getconf("mailfiles_directory", $m);
if (defined $dir) {
$mailfiles_dirs{$m}= {type=>"dot-received", location=>$dir};
}
$dir=getconf("spool_maildir", $m);
if (defined $dir) {
$maildirs{$m} = {type=>"maildir", location=>$dir};
}
$dir=getconf("spool_mailbox", $m);
if (defined $dir) {
$spool_mboxes{$m} = {type=>"mailbox", location=>$dir, last_size=>0, last_mtime=>0};
}
}
if ($option_fork) {
$dbh->disconnect;
# Do as many error checks we can before closing stderr
if (!%mailfiles_dirs && !%maildirs && !%spool_mboxes) {
print STDERR "No mailspool defined. In daemon mode, the configuration should define spool_maildir or spool_mailbox or mailfiles_directory for at least one identity.\n";
exit 1;
}
my $f=fork();
if ($f>0) {
if (defined $option_pidfile) {
open(F, ">$option_pidfile") or die "Cannot open $option_pidfile: $!\n";
print F "$f\n";
close(F);
}
exit(0); # exit of parent process
}
else {
# reopen
open(STDIN, "</dev/null");
open(STDOUT, ">/dev/null");
open(STDERR, ">/dev/null");
# reconnect
$dbh = db_connect();
}
}
if (defined($mailbox_file)) {
my $mail_cnt=0;
my $filename;
open (F, $mailbox_file) or die "$mailbox_file: $!\n";
my $end=0;
umask(077);
my $opened=0;
my $fh_tmp;
while (!$end) {
$_=<F>;
if (!defined($_)) { $end=1; }
if ($end || /^From /) {
if ($mail_cnt>$mbox_skip) {
# import mail
if (defined $fh_tmp) {
close $fh_tmp;
$fh_tmp=undef;
}
my $st=stat($filename) or die "$filename: $!\n";
open (MAIL_FILE,"$filename") or die "$filename: $!\n";
print "\rImporting $mail_cnt";
my %ctxt;
$ctxt{'filename'}=$filename;
$ctxt{'str_date'}=undef;
$ctxt{'proc_filename'}=$filename;
$ctxt{'filesize'}=$st->size;
if ($global_tag ne "") {
push @{$ctxt{'tags'}}, $global_tag;
}
import_message($global_mbox, \*MAIL_FILE, \%ctxt);
close(MAIL_FILE);
unlink($filename);
if (Manitou::Words::queue_size() > 0) {
if (time-Manitou::Words::last_flush_time() >=
getconf("flush_word_index_interval")) {
do_flush_word_vectors;
}
}
}
if (!$end) {
if (++$mail_cnt > $mbox_skip) {
($fh_tmp,$filename) = tempfile();
die $! if (!defined $fh_tmp);
}
}
}
else {
if (defined $fh_tmp) {
print $fh_tmp $_ or die "$filename: $!\n";
}
}
}
close (F);
print "\n" if ($mail_cnt>$mbox_skip);
}
elsif (@ARGV) {
foreach my $fname (@ARGV) {
my $st=stat($fname) or die "$fname: $!";
my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime($st->mtime));
my $fh;
open($fh, $fname) or die "$fname: $!\n";
my %ctxt;
$ctxt{'filename'}=$fname;
$ctxt{'filesize'} = $st->size;
$ctxt{'proc_filename'}=$fname;
$ctxt{'str_date'}=$str_file_date;
if ($global_tag ne "") {
push @{$ctxt{'tags'}}, $global_tag;
}
my $l=getconf("tags_incoming", $global_mbox);
if ($l) {
push @{$ctxt{tags}}, @{$l};
}
import_message($global_mbox, $fh, \%ctxt);
close($fh);
}
}
elsif (defined $option_import_list) {
open(my $fhl, '<', $option_import_list) or die "$option_import_list: $!";
while (my $fname=<$fhl>) {
chomp $fname;
my $orig_fname=$fname;
if (defined $option_import_basedir) {
$fname = "$option_import_basedir/$fname";
}
my $st=stat($fname) or die "$fname: $!";
my $str_file_date=strftime("%Y-%m-%d %H:%M:%S", localtime($st->mtime));
open(my $fh, "<", $fname) or die "$fname: $!\n";
my %ctxt;
$ctxt{'filename'}=$fname;
$ctxt{'filesize'} = $st->size;
$ctxt{'proc_filename'}=$fname;
$ctxt{'str_date'}=$str_file_date;
if ($global_tag ne "") {
push @{$ctxt{'tags'}}, $global_tag;
}
my $l=getconf("tags_incoming", $global_mbox);
if ($l) {
push @{$ctxt{tags}}, @{$l};
}
import_message($global_mbox, $fh, \%ctxt);
print "$orig_fname\n" if ($verbosity);
close($fh);
if (Manitou::Words::queue_size() >= getconf("flush_word_index_max_queued")) {
do_flush_word_vectors;
}
}
close $fhl;
}
else {
# daemon mode
$SIG{'TERM'} = 'sigterm';
$SIG{'INT'} = 'sigterm';
LogSuccess("starting in daemon mode");
$dbh->begin_work;
Manitou::Words::flush_jobs_queue($dbh);
$dbh->commit;
my $last_checked_incoming=time-getconf("incoming_check_interval")-1;
$global_end=0;
$mail_id=0;
my $last_alive;
my $alive_interv=getconf('alive_interval');
update_runtime_timestamp("last_alive") if ($alive_interv);
my $in_interv=getconf("incoming_check_interval");
my $out_interv=getconf("outgoing_check_interval");
my $no_send=(getconf_bool("no_send"));
my $still_to_go;
# Start by sending any pending outgoing mail
my $last_checked_outgoing=time;
send_mails() unless $no_send;
# Main loop
while (!$global_end) {
if ($still_to_go>0 || time >= $last_checked_incoming+$in_interv) {
# import
$last_checked_incoming=time;
$still_to_go=0;
for my $mbox (keys %mailfiles_dirs) {
$still_to_go += import_mailfiles($mbox, $mailfiles_dirs{$mbox});
}
for my $mbox (keys %maildirs) {
$still_to_go += import_mailfiles($mbox, $maildirs{$mbox});
}
for my $mbox (keys %spool_mboxes) {
my $o=$spool_mboxes{$mbox};
my $stmb=stat($o->{location});
if ($stmb) {
if ($stmb->mtime!=$o->{last_mtime} || $stmb->size!=$o->{last_size}) {
$o->{last_mtime}=$stmb->mtime;
$o->{last_size}=$stmb->size;
my $cmd=getconf("movemail_command", $mbox);
$cmd="movemail" if (!defined $cmd);
my $tmpdir=getconf("tmpdir");
my $dest_inbox=$tmpdir."/inbox-".$mbox;
if ($cmd) {
$cmd="$cmd \"" . getconf("spool_mailbox", $mbox) . "\" \"$dest_inbox\"";
my $output=`$cmd`;
if (($?>>8)!=0) {
error_log("movemail error: (\`$cmd\`, exit code=".($?>>8)."): $output");
}
else {
my $fh;
if (open($fh, $dest_inbox)) {
while(1) {
my ($tmpname) = extract_mbox_tmpfile(\$fh);
last if (!$tmpname);
my $htmp = {type=>"tempfile", location=>$tmpname};
import_mailfiles($mbox, $htmp);
unlink($tmpname);
}
close($fh);
unlink($dest_inbox);
}
}
}
}
}
}
}
if (!$no_send) {
# send
if (time >= $last_checked_outgoing+$out_interv) {
$last_checked_outgoing=time;
if (getconf_bool("auto_db_reconnect") && !$dbh->ping()) {
db_reconnect();
}
send_mails();
}
}
else {
$last_checked_outgoing=time;
}
if ($alive_interv && time >= $last_alive+$alive_interv) {
# confirm that we're running
$last_alive = update_runtime_timestamp("last_alive");
}
# compute the minimum lapse of time before we'll need to
# do something, and then sleep that amount of time
my $t=time;
my ($mplugin,$maint_when) = check_maintenance_schedule($t);
while ($maint_when && $maint_when<=$t) {
run_maint_plugin($mplugin);
($mplugin,$maint_when) = check_maintenance_schedule($t);
}
my $nx_act=min($last_checked_incoming+$in_interv,
$last_checked_outgoing+$out_interv);
$nx_act = min($nx_act, $last_alive+$alive_interv) if ($alive_interv);
$nx_act = min($nx_act, $maint_when) if ($maint_when);
my $widx_size=Manitou::Words::queue_size();
if ($widx_size >0) {
my $when_widx = Manitou::Words::last_flush_time() + getconf("flush_word_index_interval");
if ($when_widx <= $t || $widx_size >= getconf("flush_word_index_max_queued")) {
do_flush_word_vectors;
}
else {
$nx_act=min($nx_act, $when_widx);
}
}
Manitou::Jobs::check_end_jobs($dbh);
Manitou::Jobs::process_jobs_queue($dbh);
check_new_jobs();
# sleep unless there's something better to do right now
if (!($nx_act<=$t || $still_to_go>0)) {
my $bits;
vec($bits, $dbh->{pg_socket}, 1)=1;
my $found = select($bits, undef, undef, $nx_act-$t);
if ($found) {
while (my $notif = $dbh->pg_notifies) {
my ($notif_name, $pid, $payload) = @$notif;
process_notification($notif_name, $payload);
}
}
}
}
}
if (Manitou::Words::queue_size() > 0) {
do_flush_word_vectors;
}
$dbh->disconnect;
}
sub check_new_jobs {
my $sthj = $dbh->prepare("SELECT job_id,job_type,job_args,status FROM jobs_queue WHERE coalesce(status,0)=0");
$sthj->execute;
my $job=$sthj->fetchrow_hashref;
if ($job) {
if ($job->{job_type} eq 'import_mailbox') {
$dbh->do("UPDATE jobs_queue SET status=1 WHERE job_id=?", {}, $job->{job_id});
if (job_import_mailbox($job)) {
$dbh->do("DELETE FROM jobs_queue WHERE job_id=?",{}, $job->{job_id});
}
}
}
}
sub process_notification {
my ($name, $payload)=@_;
print "notification: $name, payload: $payload\n";
check_new_jobs();
}
sub job_import_mailbox {
# Input: h{job_id,job_type,job_args}
my $row=shift;
if ($row->{job_args} =~ /^\d+$/) {
notice_log("Starting mailbox import job #$row->{job_id}");
if (import_database_mailbox($row->{job_args})) {
notice_log("Mailbox import job #$row->{job_id} (import_id=$row->{job_args}) complete");
}
else {
error_log("Mailbox import job #$row->{job_id} (import_id=$row->{job_args}) did not complete successfully");
}
}
}
sub import_database_mailbox {
my $import_id=shift;
my $result=1;
my $end=0;
my $previous_completion;
my $s1=$dbh->prepare("SELECT tag_id,mail_status,apply_filters,auto_purge FROM import_mbox WHERE import_id=?");
$s1->execute($import_id);
my $import = $s1->fetchrow_hashref;
$s1->finish;
my $s2=$dbh->prepare("SELECT count(*),sum(case when status=1 then 1 else 0 end) FROM import_message WHERE import_id=?");
$s2->execute($import_id);
my ($cnt_total, $cnt_done)=$s2->fetchrow_array;
$s2->finish;
# We retrieve only one message per query from import_message. This was meant to start
# the import while the next messages were still being uploaded. Actually ATM this is not
# the case, the UI won't start the import until the entire mbox has been uploaded
# and we need them (at least the count) to send a proper progress report anyway.
my $sth=$dbh->prepare("SELECT mail_number, encoded_mail FROM import_message WHERE import_id=? AND coalesce(status,0)=0 ORDER BY mail_number LIMIT 1");
my $sth2=$dbh->prepare("SELECT status FROM import_mbox where import_id=?");
my $su = $dbh->prepare("UPDATE import_message SET status=?,mail_id=? WHERE mail_number=? AND import_id=?");
$dbh->do("UPDATE import_mbox SET status=1 WHERE import_id=?", {} , $import_id);
while (!$end && $result) {
$sth2->execute($import_id);
my ($import_status) = $sth2->fetchrow_array;
if ($sth2->rows==0 || $import_status==2) {
# The import has been aborted
$end=1;
$result=0;
last;
}
$sth->execute($import_id);
my @r=$sth->fetchrow_array;
if (!@r) {
$end=1;
last;
}
my ($fh,$filename) = tempfile();
if (!defined $fh) {
error_log("Unable to create temp file: $!");
$result=0;
last;
}
print $fh $r[1];
close($fh);
my $st=stat($filename);
if (!open($fh,"$filename")) {
error_log("Unable to open $filename: $!");
$result=0;
last;
}
my %ctxt;
$ctxt{'filename'}=$filename;
$ctxt{'str_date'}=undef;
$ctxt{'proc_filename'}=$filename;
$ctxt{'filesize'}=$st->size;
$ctxt{'skip_filters'}=1 if ($import->{apply_filters} eq "N");
$ctxt{tag_id}=$import->{tag_id} if ($import->{tag_id}>0);
$ctxt{status}=$import->{mail_status} if ($import->{mail_status});
my $mail_id=import_message(undef, $fh, \%ctxt); # mbox is undef
if ($mail_id>0) {
$su->execute(1, $mail_id, $r[0], $import_id);
if ($cnt_total>0) {
my $c=sprintf("%0.2f", (++$cnt_done)/$cnt_total);
if ($c ne $previous_completion) {
$previous_completion=$c;
$dbh->do("UPDATE import_mbox SET completion=? WHERE import_id=?", {}, $c, $import_id);
$dbh->do("NOTIFY mbox_import_progress"); # TODO: payload
}
}
}
else {
$su->execute(2, $r[0], $import_id);
}
close($fh);
unlink($filename);
$sth->finish;
}
if ($result==1) {
$dbh->begin_work;
$dbh->do("DELETE FROM import_message WHERE import_id=?", {}, $import_id);
if ($import->{auto_purge} eq "Y") {
$dbh->do("DELETE FROM import_mbox WHERE import_id=?", {}, $import_id);
}
else {
$dbh->do("UPDATE import_mbox SET status=3 WHERE import_id=? AND status=1", {} , $import_id);
}
$dbh->do("NOTIFY mbox_import_progress");
$dbh->commit;
}
return $result;
}
# Returns the timestamp at which the maintenance plugin $p should be run
# after the timestamp $now. The result must be >$now
sub next_maint_run {
my ($p,$now)=@_;
if ($p->{frequency_type} eq "interval") {
return $now+$p->{frequency}*60;
}
elsif ($p->{frequency_type} eq "pit") {
my ($h,$mn)=split /:/, $p->{frequency};
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) =
localtime($now);
if ($h eq "*") {
return $now+(($mn>=$min)?($mn-$min):(60-$min+$mn))*60;
}
else {
my ($dh,$dm);
if ($h==$hour) {
if ($mn==min) { $dh=24; $dm=0; }
elsif ($mn>$min) { $dh=0; $dm=$mn-$min; }
else { $dh=23; $dm=60-$min+$mn; }
}
elsif ($h>$hour) {
$dh=$h-$hour;
if ($mn>=$min) {
$dm=$mn-$min
}
else {
$dm=60-$min+$mn;
$dh--;
}
}
else { # $h<$hour
$dh=24-$hour+$h;
if ($mn>=$min) {
$dm=$mn-$min;
}
else {
$dm=60-$min+$mn;
$dh--;
}
}
return $now+($dh*60+$dm)*60;
}
}
else {
die $p->{frequency_type} . ": unsupported frequency type\n";
}
}
# Compute the minimum number of seconds between $now and the launch
# of any maintenance plugin
# return ($mplugin,$t) where $mplugin is a reference to the plugin
# and $t the number of seconds.
sub check_maintenance_schedule {
my ($now)=@_;
my $tmin=0;
my $pmin;
for my $p (@maintenance_plugins) {
if (!$p->{next_run}) { # the plugin has never been run nor scheduled
$p->{next_run} = next_maint_run($p,$now);
}
if ($p->{next_run}<$tmin || $tmin==0) {
$tmin=$p->{next_run};
$pmin=$p;
}
}
if ($pmin) {
# round off the timestamp to an integral number of minutes
return ($pmin, $tmin-($tmin%60));
}
return 0;
}
sub run_maint_plugin {
my ($p)=@_;
my %ctxt;
$ctxt{'dbh'}=$dbh;
$ctxt{'stage'}="maintenance";
$ctxt{'notice_log'} = \¬ice_log;
$ctxt{'error_log'} = \&error_log;
notice_log("Running maintenance plugin: $p->{name}");
$p->process(\%ctxt);
$p->{next_run}=next_maint_run($p, time);
}
sub init_temp_dir {
my $dir = getconf('tmpdir');
if (defined $dir) {
if (! -d $dir) {
mkdir($dir, 0700) or die "Cannot create $dir: $!\nPlease set the 'tmpdir' configuration parameter to a usable directory for temporary files";
}
my $st=stat($dir);
if (($st->mode & 7) != 0) {
die "Security check failed: $dir has unsecure permissions: use 0700 permissions.\n" unless (getconf('security_checks') eq "no");
}
if ($st->uid != getuid()) {
my @pw_me=getpwuid(getuid());
my @pw_dir=getpwuid($st->uid);
my $msg=sprintf("Security problem: $dir belongs to user '%s' while we are running as user '%s'\n", $pw_dir[0], $pw_me[0]);
die $msg unless (getconf('security_checks') eq "no");
}
}
else {
$dir=File::Temp::tempdir(CLEANUP=>1);
if (!$dir) {
die "Could not create a temporary directory";
}
set_common_conf('tmpdir', $dir);
}
}
sub LogError {
error_log(@_);
}
sub LogSuccess {
notice_log(@_);
}
# Remove the non-paired occurrences of symbols from a phrase
# and returns that phrase
sub enforce_pairs {
my ($phrase,$symboles) = @_;
my @parenth; # stack of open parenthesis
my $replc = "";
# last character: '(' or similar opening element
my $f = chop($symboles);
# first character: ')' or similar closing element
my $o = $symboles;
while ($phrase =~ /[$o$f]/g) {
if ("$&" eq $o) {
# opening
push (@parenth, pos($phrase)-1);
}
elsif ("$&" eq $f) {
# closing, pop the last opened element
if (@parenth != 0) {
pop (@parenth);
}
else {
# if no '(' can be popped, remove the ')' from the phrase
substr ($phrase,pos($phrase)-1,1) = $replc;
}
}
}
# Remove the '(' that have no matching ')'
while (@parenth != 0) {
substr ($phrase,pop(@parenth),1) = $replc;
}
return $phrase;
}
sub insert_body {
my ($mail_id, $btext, $bhtml) = @_;
my $sth = $dbh->prepare("INSERT INTO body(mail_id,bodytext,bodyhtml) VALUES (?,?,?)");
# the DBD character escaping does not allow for '\0' characters
# we just remove them
my $enc_txt = encode_dbtxt($$btext);
my $h_txt = encode_dbtxt($$bhtml);
$enc_txt =~ s/\000//g;
$h_txt =~ s/\000//g;
$sth->bind_param(1, $mail_id);
$sth->bind_param(2, $enc_txt);
$sth->bind_param(3, $h_txt);
my $rc = $sth->execute;
$sth->finish;
}
sub insert_header {
my ($mail_id, $btext) = @_;
my $sth = $dbh->prepare("INSERT INTO header(mail_id,lines) VALUES (?,?)");
my $enc_txt = encode_dbtxt($$btext);
$sth->bind_param(1, $mail_id);
$sth->bind_param(2, $enc_txt);
$sth->execute;
$sth->finish;
}
sub convnull {
my ($str) = @_;
if (defined $str) {
return $str;
} else {
return "";
}
}
sub sigterm {
LogError ("SIGTERM caught. Exiting...");
$global_end=1;
}
# Import the message and deal with database errors
sub import_message {
my ($mbox_name, $mail_handle, $mail_ctxt) = @_;
# short-circuit the processing if the message is to be discarded
return -1 if ($mail_ctxt->{status}==-1);
my $id;
eval {
$id = import_message_2(@_);
};
if ($@) {
if ($dbh->err > 0) {
error_log("Database error: $@");
}
else {
error_log("Import error: $@");
}
$dbh->rollback();
Manitou::Words::clear_last_indexed_mail($dbh, $mail_id) if ($mail_id>0);
return 0;
}
return $id;
}
# Return values
# >0: OK
# 0: error
# -1: mail discarded
sub import_message_2 {
my ($mbox_name, $mail_handle, $mail_ctxt) = @_;
my ($mail_filename, $mail_str_date)=($mail_ctxt->{'filename'}, $mail_ctxt->{'str_date'});
my ($head, $body_text, $body_html, $top, $attachments);
my $parser = new MIME::Parser;
my $gzfh; # handle for Gunzip
my $failed=0;
$mail_id=0;
# Mime Parser configuration
$parser->output_dir(getconf("tmpdir"));
# $parser->decode_headers(1);
$parser->output_to_core(20000);
$parser->parse_nested_messages(0);
# check if it'gzip'ed
my $header2;
my $rd2=read($mail_handle, $header2, 2);
seek $mail_handle, 0, SEEK_SET;
if ($rd2==2 && $header2 eq "\x1f\x8b") {
my $gzfh=new IO::Uncompress::Gunzip $mail_handle
or die "IO::Uncompress::Gunzip failed: $GunzipError";
$top = $parser->read($gzfh);
}
else {
$top = $parser->read($mail_handle);
}
# Discard the mail if it can't be parsed
if (!$top) {
error_log("Malformed mail message");
close $gzfh if (defined $gzfh);
return 0;
}
# $top->head->unfold;
$dbh->begin_work;
# Get a new mail id from the sequence
$mail_id = get_sequence_nextval("seq_mail_id");
my %pl_ctxt;
if (defined($mimeprocess_plugins{$mbox_name})) {
$pl_ctxt{'filename'}=$mail_filename;
$pl_ctxt{'stage'}="mimeprocess";
$pl_ctxt{'mimeobj'}=$top;
$pl_ctxt{'mail_id'}=$mail_id;
$pl_ctxt{'dbh'}=$dbh;
if ($mail_ctxt->{'tags'}) {
@{$pl_ctxt{'tags'}} = @{$mail_ctxt->{'tags'}};
}
for my $plugin (@{$mimeprocess_plugins{$mbox_name}}) {
$plugin->process(\%pl_ctxt);
}
if (defined $pl_ctxt{'tags'}) {
@{$mail_ctxt->{'tags'}} = @{$pl_ctxt{'tags'}};
}
}
$attachments = 0;
if ($top->effective_type && $top->effective_type ne "text/plain") {
$body_text = "";
}
else {
$body_text=$top->bodyhandle->as_string unless (!$top->bodyhandle);
my $charset = $top->head->mime_attr("content-type.charset") || 'iso-8859-1';
$charset='iso-8859-1' if (!Encode::resolve_alias($charset));
$body_text = Encode::decode($charset, $body_text, Encode::FB_PERLQQ);
}
my $thread_id = get_thread_id ($top);
$mail_ctxt->{mail_id} = $mail_id;
$mail_ctxt->{thread_id} = $thread_id;
$mail_ctxt->{mailbox_address} = $mbox_name;
my $action;
$action = apply_filters($mail_ctxt, $top, \$body_text, 'I') unless ($mail_ctxt->{skip_filters});
if ($action ne "discard") {
my $status=$options{'status'}+0;
if ($mail_ctxt->{status}) {
$status |= $mail_ctxt->{status};
}
#if ($action eq "trash") { $status |= 16+32; }
insert_mail($mbox_name,
$mail_id,
$attachments,
$mail_str_date,
$top,
$thread_id,
$status,
$mail_ctxt);
if (getconf("store_filenames", $mbox_name)) {
store_filename($mail_id, $mail_filename);
}
if (getconf_bool("store_raw_mail", $mbox_name)) {
store_raw_mail($mail_id, $mail_ctxt->{proc_filename});
}
# Insert attachments before the body since detach_text_attachments
# does affect $body_text
if ($top->effective_type && $top->effective_type ne "text/plain") {
if ($top->effective_type eq "text/html") {
my $charset = $top->head->mime_attr("content-type.charset") || 'iso-8859-1';
$charset ='iso-8859-1' if (!Encode::resolve_alias($charset));
$body_html = Encode::decode($charset, $top->bodyhandle->as_string);
}
else {
if (getconf_bool("detach_text_plain", $mbox_name)) {
$attachments += detach_text_attachments($dbh, $top, $mail_id, \$body_text, \$body_html);
}
else {
$attachments += flatten_and_insert_attach($dbh, $top, $mail_id);
}
}
}
if ($attachments>0) {
my $sthua=$dbh->prepare("UPDATE mail SET flags=flags|1 WHERE mail_id=?");
$sthua->execute($mail_id);
}
insert_body($mail_id, \$body_text, \$body_html);
my $safe_header;
foreach (decode_mimewords($top->head->as_string)) {
my @t=@{$_};
if (!defined($t[1])) { # us-ascii substring
$t[0] =~ s/\r?\n(\s)/$1/sog if defined ($t[0]);
$safe_header .= $t[0];
}
else { # other charset: convert it to perl internal format
my $tu;
eval {
$tu=Encode::decode($t[1], $t[0]);
};
if ($@) {
# if the decode fails (typically if the charset is unknown)
# we fall down to using the string as is
$tu=$t[0];
}
$tu =~ s/\r?\n(\s)/$1/sog if (defined($tu));
$safe_header .= $tu;
}
}
insert_header($mail_id, \$safe_header);
if (getconf_bool('index_words', $mbox_name)) {
my $idx_header = Manitou::Words::header_contents_to_ftidx($safe_header);
my $ref_body=\$body_text;
my $truncated_body;
if ($top->head->get("From") =~ /^MAILER-DAEMON@/) {
# Some SMTP servers (e.g. qmail) may send bounces that regurgitate the
# entire original message in one big destructured piece,
# possibly with encoded attachments that can't be recognized
# as such for lack of a proper MIME structure.
# As a workaround to avoid the vast pollution of the word index
# that it may incur, we limit the indexing at 500 lines of text.
my $nbl=500;
my $pos=0;
while (($nbl--)>0 && $pos>=0) {
$pos=index($body_text, "\n", $pos);
$pos++ unless ($pos==-1);
}
if ($nbl<0 && $pos>0) {
$truncated_body=substr($body_text, 0, $pos);
$ref_body=\$truncated_body;
}
}
my $other_words;
if (defined $body_html && getconf_bool('index_words_html_parts', $mbox_name)) {
$other_words = Manitou::Words::html_to_text(\$body_html);
}
my %extractors = Manitou::Attachments::text_extractors($mbox_name);
if (scalar %extractors || getconf_bool('index_words_html_parts', $mbox_name)) {
# If there are no extractors defined, we may still extract words
# with our internal html-to-plaintext converter
Manitou::Attachments::launch_text_extractors($dbh, $mail_id,
\%extractors,
\$other_words);
}
index_words($dbh, $mail_id, $ref_body, \$idx_header, \$other_words);
my $sthj=$dbh->prepare("INSERT INTO jobs_queue(mail_id,job_type) VALUES(?,?)") or die $dbh->errstr;
$sthj->execute($mail_id, "widx");
# end of code block to mutualize
}
if ($mail_id>0 && defined $mail_ctxt->{'tags'}) {
for my $t (@{$mail_ctxt->{'tags'}}) {
Manitou::Tags::insert_tag($dbh, $mail_id, $t);
}
}
if ($mail_id>0 && defined $mail_ctxt->{tag_id}) {
Manitou::Tags::action_tag($dbh, $mail_id, $mail_ctxt->{tag_id});
}
}
else {
# discarded
$mail_id=-1;
}
$dbh->commit;
$top->purge;
close $gzfh if (defined $gzfh);
return $mail_id;
}
# Input: (<msgid1>,<msgid2>,<msgid3>...)
# Output: thread_id or undef
sub get_references {
my ($thread_msg_ids)=@_;
my $thread;
# first message found with a message-id that matches one of our references
my $ref_mail_id;
my $sth=$dbh->prepare("SELECT mail_id,thread_id FROM mail WHERE message_id=?");
# Lookup the thread_id
for my $s (@{$thread_msg_ids}) {
if ($s =~ /\<(.*)\>/) {
$sth->execute($1);
my @res;
if (@res=$sth->fetchrow_array) {
$ref_mail_id=$res[0];
if (defined($res[1])) {
$thread=$res[1];
}
last; # stop at the first thread_id found
}
}
# else {
# print STDERR "$s: cannot be parsed as a Message-Id\n";
# }
}
if ($ref_mail_id) {
# If at least one of the database messages is referred to by
# the incoming mail, then give them the same thread_id
if (!$thread) {
$thread=get_sequence_nextval("seq_thread_id");
}
my $sthu=$dbh->prepare("UPDATE mail set thread_id=? WHERE mail_id=?");
$sthu->execute($thread,$ref_mail_id);
}
$sth->finish;
return $thread;
}
sub get_sequence_nextval {
my ($seq) = @_;
my ($nextval, $sth, $row);
$sth = $dbh->prepare("SELECT nextval('".$seq."')");
$sth->execute;
my @row = $sth->fetchrow_array;
if ($row[0]) {
$nextval = $row[0];
} else {
$nextval = 1;
}
$sth->finish;
return $nextval;
}
sub store_filename {
my ($mail_id, $filename) = @_;
my ($sth, $rc);
$sth = $dbh->prepare ("INSERT INTO files(mail_id,filename) VALUES (?,?)");
$rc = $sth->execute($mail_id,$filename);
$sth->finish;
}
sub store_raw_mail {
my ($mail_id, $filename) = @_;
my $sth = $dbh->prepare ("INSERT INTO raw_mail(mail_id,mail_text) VALUES (?,?)");
my $obj_id=$dbh->func($filename, 'lo_import');
my $rc = $sth->execute($mail_id, $obj_id);
$sth->finish;
}
sub insert_addresses {
my ($mail,$mail_id)=@_;
my @haddr;
my $sth = $dbh->prepare("SELECT addr_id,recv_pri FROM addresses WHERE email_addr=?") or die $dbh->errstr;
my $sth_insert_ad = $dbh->prepare("INSERT INTO addresses(addr_id,email_addr,name) VALUES (?,?,?)") or die $dbh->errstr;
for my $adrtype (keys %hAdrTypes) {
my @addrs;
eval {
no warnings 'all';
@addrs=Mail::Address->parse(header_decode($mail->head->get($adrtype)));
};
for my $a (@addrs) {
my $pos=0;
if ($a->address) {
my $la=lc($a->address);
my $ln=lc($a->name);
$sth->execute($la) or die $sth->errstr;
my ($id,$addr_pri)=$sth->fetchrow_array;
if (!$id) {
$id=get_sequence_nextval("seq_addr_id");
$sth_insert_ad->execute($id,
substr($la,0,300),
substr($ln,0,300))
or die $sth_insert_ad->errstr;
}
# update addresses.last_recv_from
if ($adrtype eq "From") {
my $sth_upd = $dbh->prepare("UPDATE addresses SET last_recv_from=now(),nb_recv_from=1+coalesce(nb_recv_from,0) WHERE addr_id=?");
$sth_upd->execute ($id);
}
push @haddr, { "email"=>$a->address,
"addr_id"=>$id,
"pos"=>$pos,
"prio"=>$addr_pri,
"type"=> $hAdrTypes{$adrtype} };
}
}
}
return \@haddr;
}
# Compare 'References' and 'In-Reply-To' fields with the
# contents of mail.message_id in the db, and returns the db thread id
# if it exists, or undef.
sub get_thread_id {
my ($mailobj) = @_;
my @thread_msgs;
if ($mailobj->head->get('In-Reply-To') =~ /.*\<(.*)\>/) {
push @thread_msgs, "<$1>";
}
my $other_mails=$mailobj->head->get('References');
chomp $other_mails;
push @thread_msgs, split(/\s+/, $other_mails);
my $thread_id = get_references(\@thread_msgs);
return $thread_id;
}
sub insert_mail {
my ($mbox_name, $mail_id, $attachments,$file_date, $mailobj, $thread_id,$status, $ctxt) = @_;
my ($rc, $sth);
my $priority;
my $from = convnull(header_decode($mailobj->head->get('From')));
my $to = convnull(header_decode($mailobj->head->get('To')));
my $subject = convnull(header_decode($mailobj->head->get('Subject')));
my $sender_date = Manitou::MailFormat::reformat_sender_date($mailobj->head->get('Date'));
my $query=q{
INSERT INTO mail (mail_id,
sender,
sender_fullname,
recipients,
subject,
msg_date,
sender_date,
status,
flags,
message_id,
thread_id,
in_reply_to,
priority,
identity_id,
raw_size)
VALUES (?, ?, ?, ?, ?, ?::timestamptz, ?::timestamptz,?,?,?,?,?,?,?,?)};
$sth = $dbh->prepare($query);
chomp ($from);
chomp ($to);
chomp ($subject);
$from = enforce_pairs ($from, "()");
$to = enforce_pairs ($to, "()");
$from = enforce_pairs ($from, "<>");
$to = enforce_pairs ($to, "<>");
my @adr_from;
my @adr_to;
eval {
@adr_from=Mail::Address->parse($from);
@adr_to=Mail::Address->parse($to);
};
my $bp = 0; # parameters counter
$sth->bind_param (++$bp, $mail_id);
if (@adr_from) {
my $sender = encode_dbtxt(substr($adr_from[0]->address,0,200));
$sth->bind_param(++$bp, $sender);
my $fullname = encode_dbtxt(substr($adr_from[0]->name,0,200));
$sth->bind_param(++$bp, $fullname);
}
else {
$sth->bind_param(++$bp, undef);
$sth->bind_param(++$bp, undef);
}
# To
if (@adr_to) {
$sth->bind_param(++$bp, $to);
}
else {
$sth->bind_param(++$bp, undef);
}
# Subject
# clean it before: (see the comment before the call to insert_header)
$subject =~ tr/\x00-\x08//d;
$subject =~ tr/\x0b-\x1F//d;
$subject =~ tr/\x0a/ /;
$subject = substr($subject,0,1000);
$sth->bind_param(++$bp, encode_dbtxt($subject));
# Receive Date
if ((getconf('preferred_datetime', $mbox_name) eq "sender") ||
!defined($file_date)) {
$file_date = $sender_date; # the same as the sender's date
}
$sth->bind_param (++$bp, $file_date);
# Sender_Date
$sth->bind_param (++$bp, $sender_date);
# status
$sth->bind_param (++$bp, $status);
# flags
$sth->bind_param (++$bp, $attachments>0?1:0);
my $haddr=insert_addresses($mailobj,$mail_id);
$priority = $ctxt->{priority};
# Adds the (optional) priorities of From addresses
foreach (@$haddr) {
$priority += $_->{prio} if (defined $_->{prio} && $_->{type} eq $hAdrTypes{"From"});
}
my $msg_id=$mailobj->head->get('Message-Id');
if ($msg_id =~ /\<(.*)\>/) {
$msg_id=$1;
}
my $in_reply_to;
if ($mailobj->head->get('In-Reply-To') =~ /.*\<(.*)\>/) {
$in_reply_to=$1;
}
$sth->bind_param(++$bp, substr(encode_dbtxt($msg_id),0,100));
$sth->bind_param (++$bp, $thread_id);
my $in_reply_to_id;
if (defined($in_reply_to)) {
# Search for a message to which this message would reply
my $sthm=$dbh->prepare("SELECT max(mail_id) FROM mail where message_id=?");
$sthm->execute($in_reply_to);
my @row=$sthm->fetchrow_array;
if (@row) {
$in_reply_to_id=$row[0];
}
$sthm->finish;
if (@row) {
my $stht = $dbh->prepare("SELECT value FROM config WHERE conf_key='auto_tag_thread'");
$stht->execute;
my @rt=$stht->fetchrow_array;
if (@rt && $rt[0] eq "1") {
# Apply to the new mail the same tags as the message it replies to
my $sthtag=$dbh->prepare("SELECT tag FROM mail_tags WHERE mail_id=?");
$sthtag->execute($row[0]);
my @rttag;
while (@rttag=$sthtag->fetchrow_array) {
action_tag($dbh, $mail_id, $rttag[0]);
}
}
}
}
$sth->bind_param(++$bp, $in_reply_to_id);
$sth->bind_param(++$bp, $priority);
my $identity_id = $ctxt->{identity_id};
if (!defined $identity_id) {
$identity_id=getconf("identity_id", $mbox_name);
}
$sth->bind_param(++$bp, $identity_id);
$sth->bind_param(++$bp, $ctxt->{filesize});
$sth->execute or die("Can't execute statement: $DBI::errstr");
$sth->finish;
my $sth_ma_insert = $dbh->prepare("INSERT INTO mail_addresses(mail_id,addr_type,addr_pos,addr_id) VALUES (?,?,?,?)") or die $dbh->errstr;
foreach (@$haddr) {
$sth_ma_insert->execute($mail_id, $_->{type}, $_->{pos}, $_->{addr_id}) or die $sth_ma_insert->errstr;
}
if ($status==0) {
$dbh->do("NOTIFY new_message");
}
return $thread_id;
}
# direction='I' for incoming, 'O' for outgoing
sub apply_filters {
my ($ctxt, $mime_obj, $pbody, $direction) = @_;
my %exprs;
my %actions;
my %all_exprs;
my $final_action;
my @sorted_keys; # list of expr_id sorted by apply_order
my $sth = $dbh->prepare("SELECT expr_id,name,expression,direction FROM filter_expr ORDER BY apply_order");
$sth->execute;
while (my @r=$sth->fetchrow_array) {
my %h;
$h{expr_id}=$r[0];
$h{name}=decode_dbtxt($r[1]);
$h{expr}=decode_dbtxt($r[2]);
$h{direction}=$r[3];
$exprs{$r[0]}=\%h;
push @sorted_keys, $r[0];
$all_exprs{$h{name}}=\%h;
}
$sth->finish;
$sth=$dbh->prepare("SELECT expr_id,action_type,action_arg FROM filter_action ORDER BY expr_id,action_order");
$sth->execute;
while (my @r=$sth->fetchrow_array) {
push @{$actions{$r[0]}}, [ $r[1],decode_dbtxt($r[2]) ];
}
$sth->finish;
my $stop_filters = 0;
for my $n (@sorted_keys) {
last if ($stop_filters);
my $e=$exprs{$n};
if (defined $actions{$n} && ($e->{direction} eq "B" || $e->{direction} eq $direction)) {
my $res;
my $r=Manitou::Filters::process_filter_mimeobj(
$e->{expr}, $mime_obj, \$res, $ctxt, $dbh, \%all_exprs, $n);
if (!$r) {
print STDERR "filter ERROR: filter_expr=", $e->{expr}, " result=$res\n";
}
elsif ($res) {
if (getconf_bool("log_filter_hits", $ctxt->{mailbox_address})) {
Manitou::Filters::log_filter_hit($dbh, $ctxt, $n);
}
# apply the actions
foreach (@{$actions{$e->{expr_id}}}) {
my ($action_type,$action_arg)=($_->[0],$_->[1]);
if ($DEBUG > 3) {
debug_log("applying action '$action_type' with arg '$action_arg'");
}
if ($action_type eq "tag") {
push @{$ctxt->{tags}}, $action_arg;
}
elsif ($action_type eq "status") {
my @st = split(/\+/, $action_arg);
# note: it replaces the previous retcode; if it was 1 (discard)
# then the message won't be discarded after all
for my $sttus (@st) {
if ($sttus eq 'T') { # obsolete since 1.2.0, superseded by the discard action
$ctxt->{status} |= 0x10+0x20; # trashed+processed
$final_action = "trash";
}
elsif ($sttus eq 'R') { # read
$ctxt->{status} |= 0x1;
}
elsif ($sttus eq 'P' || $sttus eq 'A') { # processed/archived
$ctxt->{status} |= 0x20;
}
elsif ($sttus eq 'D') { # deleted - obsolete since 1.2.0, superseded by the discard action
if (!defined($final_action)) {
$final_action = "discard";
}
}
}
}
elsif ($action_type eq "discard") {
if ($action_arg eq "trash") {
$ctxt->{status} |= 0x10+0x20;
$final_action = "trash";
}
elsif ($action_arg eq "delete") {
$final_action = "discard";
}
}
elsif ($action_type eq "priority") {
if (substr($action_arg,0,2) eq "+=") {
$ctxt->{priority} += int(substr($action_arg,2));
}
elsif (substr($action_arg,0,1) eq "=") {
$ctxt->{priority} = int(substr($action_arg,1));
}
}
elsif ($action_type eq "redirect" && $action_arg ne "") {
redirect({"mailfile"=>$ctxt->{proc_filename}, # we redirect the original mailfile
"From"=>$ctxt->{mailbox_address}, # the sender
"To"=>$action_arg}); # the recipient
}
elsif ($action_type eq "set header") {
my ($htag, $hcontents) = split(/:/, $action_arg, 2);
$mime_obj->set($htag, $hcontents) if (length($hcontents)>0);
}
elsif ($action_type eq "remove header") {
$mime_obj->delete($action_arg) if (length($action_arg)>0);
}
elsif ($action_type eq "set identity") {
$ctxt->{identity_id} = Manitou::Config::get_identity_id($dbh, $action_arg);
if (!defined $ctxt->{identity_id}) {
warning_log(sprintf("Identity '%s' not found for set identity action", $action_arg));
}
}
elsif ($action_type eq "stop") {
$stop_filters=1;
last;
}
}
}
}
}
return $final_action;
}
# Redirects a message (filter action)
# Args:
# From => our sender address
# To => redirection address
# mailfile => optional path to a file
# mimeobj => pre-constructed MIME::Entity object if no mailfile given
sub redirect {
my $args=shift;
my $fh;
my $cmd=getconf("local_delivery_agent", $args->{From});
if (!defined($cmd)) {
print STDERR "redirect: unable to pass the mail to a local delivery agent.\nCheck your configuration file for the 'local_delivery_agent' entry\n";
return 0;
}
my $top;
if (defined $args->{mailfile}) {
if (!open($fh, $args->{mailfile})) {
print STDERR "Unable to open ".$args->{mailfile}.": $!\n";
return 0;
}
$top = Mail::Internet->new($fh); # MIME::Entity->new($fh) doesn't work here!
}
elsif (defined $args->{mimeobj}) {
$top = $args->{mimeobj}->dup();
}
else {
print STDERR "Filter: redirect action cancelled since no mimeobj and no mailfile\n";
return 0;
}
$top->head->combine("From");
$top->head->replace("From", $args->{From});
$top->head->combine("To");
$top->head->replace("To", $args->{To});
my $subject = $top->head->get("Subject");
if (defined $subject) {
$top->head->replace("Subject", $subject . " (by way of <".$args->{From}.">)");
}
$cmd =~ s/\$FROM\$/$args->{From}/g;
my $mfh;
if (!open($mfh, "|$cmd")) {
print STDERR "Error while passing redirected mail to the local delivery agent (\`$cmd\`): $!\n";
close($fh) if (defined $fh);
return 0;
}
$top->print($mfh);
close($mfh);
close($fh) if (defined $fh);
return 1;
}
sub send_one_mail {
my ($from, $subject, $mail_id) = @_;
my $decl_charset = getconf("preferred_charset", $from) || 'iso-8859-1';
my @charsets = split(/\s+/, $decl_charset);
my $cmd = getconf("local_delivery_agent", $from);
if (!defined($cmd)) {
print STDERR "Unable to pass the mail to a local delivery agent.\nCheck your configuration file for the 'local_delivery_agent' entry\n";
return 0;
}
$cmd =~ s/\$FROM\$/$from/g;
$dbh->begin_work;
my $sthb = $dbh->prepare("SELECT bodytext,bodyhtml FROM body WHERE mail_id=?") || die "Can't prepare statement: $DBI::errstr";
$sthb->execute($mail_id) || die "Can't execute statement: $DBI::errstr";
my ($db_body, $html_body) = $sthb->fetchrow_array;
$sthb->finish;
$db_body = decode_dbtxt($db_body);
$html_body = decode_dbtxt($html_body);
my ($body,$body_charset) = Manitou::MailFormat::encode_text_body($db_body, @charsets);
my $top;
my $text_part;
my $html_part;
my %mime_args = (From => $from, Encoding => '-SUGGEST', Subject => $subject,
'X-Mailer' => undef);
if (!defined $html_body) {
$mime_args{Charset} = $body_charset;
$mime_args{Data} = $body;
$top = MIME::Entity->build(%mime_args);
$text_part = \$top;
}
else {
my $dual_part;
# multipart/alternative with text and HTML. TODO: also handle html-only
if (has_attachments($dbh, $mail_id)) {
$mime_args{'Type'} = 'multipart/mixed';
$top = MIME::Entity->build(%mime_args);
$dual_part = MIME::Entity->build('Type'=>'multipart/alternative');
$top->add_part($dual_part);
}
else {
$mime_args{'Type'} = 'multipart/alternative';
$top = MIME::Entity->build(%mime_args);
$dual_part = $top;
}
my $p = MIME::Entity->build('Charset' => $body_charset,
'Encoding' => '-SUGGEST',
'Data' => $body,
'X-Mailer' => undef);
$text_part = $p;
$dual_part->add_part($p);
$html_part = create_html_part($dbh, $mail_id, \$html_body);
$dual_part->add_part($html_part);
}
# format=flowed is unsupported at the moment
# if (defined $format_flowed) {
# my $ct = $text_part->head->get("Content-Type");
# if ($ct =~ /^text\/plain/) {
# chomp $ct;
# $ct.="; format=flowed";
# $text_part->head->replace("Content-Type", $ct);
# }
# }
my $header_lines;
my $sthd = $dbh->prepare ("SELECT lines FROM header WHERE mail_id=?") || die "Can't prepare statement: $DBI::errstr";
$sthd->execute($mail_id) || die "Can't execute statement: $DBI::errstr";
($header_lines) = $sthd->fetchrow_array;
$header_lines = decode_dbtxt($header_lines);
Manitou::MailFormat::encode_header($top, $header_lines, @charsets);
# That shouldn't happen, but if the sender is not set, we try to get
# it from the "From:" header
if ($from eq '' || !defined $from) {
my $v = $top->head->get("From");
if (defined $v) {
my @addr_from=Mail::Address->parse($v);
$from = $addr_from[0]->address if (@addr_from);
}
}
my $bcc=getconf("outgoing_bcc", $from);
if (defined($bcc)) {
my $oldbcc=$top->head->get("Bcc");
if (defined($oldbcc)) {
$bcc = "$oldbcc, $bcc";
}
$top->head->replace("Bcc", $bcc);
}
attach_parts($dbh, $mail_id, $top, getconf("tmpdir"));
my @opl;
@opl=@{$outgoing_plugins{$from}} if (defined $outgoing_plugins{$from});
push @opl, @{$outgoing_plugins{"common"}} if (defined $outgoing_plugins{"common"});
if (@opl) {
my %pl_ctxt = (stage => "outgoing",
mimeobj => $top,
mail_id => $mail_id,
dbh => $dbh);
for my $plugin (@opl) {
$plugin->process(\%pl_ctxt);
}
}
if (getconf_bool('index_words', $from)) {
my $other_words;
# Extract words from the "html body"
if (defined $html_body && getconf_bool('index_words_html_parts', $from)) {
$other_words = Manitou::Words::html_to_text(\$html_body);
}
# Extract words from other attachments
my %extractors = Manitou::Attachments::text_extractors($from);
if (scalar %extractors || getconf_bool('index_words_html_parts', $from)) {
# If there are no extractors defined, we may still extract words
# with our internal html-to-plaintext converter
Manitou::Attachments::launch_text_extractors($dbh, $mail_id,
\%extractors,
\$other_words);
}
my $headerx = Manitou::Words::header_contents_to_ftidx($header_lines);
index_words($dbh, $mail_id, \$db_body, \$headerx, \$other_words);
my $sthj=$dbh->prepare("INSERT INTO jobs_queue(mail_id,job_type) VALUES(?,?)") or die $dbh->errstr;
$sthj->execute($mail_id, "widx");
}
my %fctxt; # context for filters
$fctxt{mail_id}=$mail_id;
$fctxt{tags}=();
$fctxt{priority}=undef;
my $faction = apply_filters(\%fctxt, $top, \$db_body, "O");
# Apply filter actions to the database
for my $tag (@{$fctxt{'tags'}}) {
Manitou::Tags::insert_tag($dbh, $mail_id, $tag);
}
# TODO: other filter actions (status, priority)
# Pipe the message to the delivery agent
my $ret=0;
my $in=IO::Handle->new();
my $out=IO::Handle->new();
my $err=IO::Handle->new();
eval {
$SIG{'PIPE'} = 'IGNORE';
my $pid = open3($in, $out, $err, $cmd);
die $! if ($pid==0);
$top->print($in) or die $!;
close($in);
$top->purge;
waitpid($pid, 0);
};
if ($@) {
error_log("Error while passing outgoing mail to the local delivery agent (\`$cmd\`): $@");
}
else {
my $e=<$err>;
close($err);
close($out);
if ($e ne "" || ($?>>8)!=0) {
error_log("Local delivery agent error: (\`$cmd\`, exit code=".($?>>8)."): $e");
}
else {
# Set the SENT and ARCHIVED status bits
my $sths = $dbh->prepare("UPDATE mail SET status=status|(256+32) WHERE mail_id=?");
$sths->execute($mail_id);
$sths->finish;
$ret=1; # OK
}
}
$SIG{'PIPE'}='DEFAULT';
$dbh->commit;
return $ret;
}
sub send_mails {
my $sth1 = $dbh->prepare("SELECT mail_id FROM mail_status WHERE status=129");
$sth1->execute;
my @res;
while (@res = $sth1->fetchrow_array) {
my $mail_id=$res[0];
next if (exists $hsend_blocked{$mail_id});
my $sth = $dbh->prepare ("SELECT sender,subject FROM mail WHERE mail_id=? AND status=129");
$sth->execute($mail_id);
my @row;
while (@row = $sth->fetchrow_array) {
if (send_one_mail($row[0], $row[1], $mail_id)) {
update_runtime_timestamp("last_sent");
notice_log("Sent outgoing message #$mail_id");
my $nb_blocked = scalar (keys %hsend_blocked);
if ($nb_blocked>0) {
# If messages were blocked due to previous errors
# and one has got through, we retry them all
notice_log("Starting to send $nb_blocked outgoing frozen message(s) due to recent success");
%hsend_blocked=();
}
}
else {
error_log("Outgoing message #$mail_id NOT sent");
$hsend_blocked{$mail_id}=time;
}
}
$sth->finish;
}
$sth1->finish;
}
sub init_plugins {
my $d = getconf("plugins_directory");
unshift(@INC, $d) if defined $d;
for my $mbox (mailboxes()) {
for my $pl_type ("incoming_preprocess_plugins",
"incoming_mimeprocess_plugins",
"incoming_postprocess_plugins",
"outgoing_plugins",
"maintenance_plugins")
{
my $plist = getconf($pl_type, $mbox, 1);
for my $p (@{$plist}) {
$p =~ s/^\s+//; # trim leading blanks
$p =~ s/\s+$//; # trim trailing blanks
my $args;
my $plugin;
my ($freq,$freq_type);
if ($pl_type eq "maintenance_plugins") {
if ($mbox ne "common") {
die "Configuration error: maintenance_plugins are only allowed in the [common] section ($mbox)\n";
}
# maintenance_plugins have a frequency at the start of their
# declaration, expressed as
# HH:MN => every day at given time
# *:MN => every hour at given minute
# X mn or Xmn => every X minutes
# X h or Xh => every X hours
if ($p =~ /^([0-9]+)\s*(mn|h)\s+(.*)$/) {
$freq=$1;
$freq=$freq*60 if ($2 eq "h");
$freq_type="interval";
$p=$3;
}
elsif ($p =~ /^([0-9]{1,2}|\*)\:([0-9]{1,2})\s+(.*)$/) {
$freq_type="pit"; # point in time
$freq="$1:$2";
$p=$3;
if ($1 ne "*") {
if ($1>=24) {
die "maintenance_plugins: incorrect hour $1.\nHour must be between 0 and 23.\n";
}
}
if ($2>=60) {
die "maintenance_plugins: incorrect minutes $2.\nMinutes must be between 0 and 59.\n";
}
}
else {
die "maintenance_plugins: unrecognized frequency at start of declaration.\nAccepted syntax is Xmn or Xh where X is a number, or HH:MM, or *:MM.\nExamples:\nmaintenance_plugins = 2h plugin1 \\\n 10mn plugin2 \\\n 07:00 plugin3\n";
}
}
elsif ($mbox eq "common" && $pl_type ne "outgoing_plugins") {
die "Configuration error: $pl_type are only allowed in mailboxes sections, not in the [common] section\n";
}
if ($p =~ /^([a-zA-Z_0-9]+)\s*\((.*)\)$/) { # has args
$plugin=$1;
$args=$2;
}
elsif (!($p =~ /^[a-zA-Z_0-9]+$/ )) {
print STDERR "init_plugins: unrecognized plugin declaration: $p\n";
exit 1;
}
else { # no args
$plugin=$p;
}
if (!$loaded_plugins{$plugin}) {
require "Manitou/Plugins/$plugin.pm";
$loaded_plugins{$plugin}=1;
}
my $evplugin;
if (defined($args)) {
$evplugin = "Manitou::Plugins::$plugin" . '::init($dbh,' .$args.')';
}
else {
$evplugin = "Manitou::Plugins::$plugin" . '::init($dbh)';
}
my $pl=eval $evplugin;
if ($@ || !defined($pl)) {
print STDERR "Error in initializing plugin $plugin for mailbox $mbox: $@\n";
exit 1;
}
$pl->{name}=$plugin;
$pl->{type}=$pl_type;
if ($pl_type eq "incoming_preprocess_plugins") {
push @{$preprocess_plugins{$mbox}}, $pl;
}
elsif ($pl_type eq "incoming_mimeprocess_plugins") {
push @{$mimeprocess_plugins{$mbox}}, $pl;
}
elsif ($pl_type eq "incoming_postprocess_plugins") {
push @{$postprocess_plugins{$mbox}}, $pl;
}
elsif ($pl_type eq "outgoing_plugins") {
push @{$outgoing_plugins{$mbox}}, $pl;
}
elsif ($pl_type eq "maintenance_plugins") {
$pl->{frequency}=$freq;
$pl->{frequency_type}=$freq_type;
push @maintenance_plugins, $pl;
}
}
}
}
}
# Assign its identity_id field to each mbox_conf entry
# If entries are missing in the MAILBOXES table, insert them
sub init_identities {
my $sth=$dbh->prepare("SELECT identity_id FROM identities WHERE lower(email_addr)=?");
for my $mbox (mailboxes()) {
next if ($mbox eq 'common');
$sth->execute($mbox);
my @r=$sth->fetchrow_array;
if (!@r) {
# Create the identity if it doesn't exist
my $sthcr=$dbh->prepare("INSERT INTO identities(email_addr) VALUES(?)");
$sthcr->execute($mbox);
$sth->execute($mbox);
@r=$sth->fetchrow_array;
}
add_mbox($mbox, $r[0]);
}
}
__END__
=head1 NAME
manitou-mdx - Manitou-Mail Mail-Database eXchanger daemon
=head1 SYNOPSIS
manitou-mdx [--conf=/path/to/config] [--fork] [--pidfile=/path/to/pidfile]
manitou-mdx [--conf=/path/to/config] --mboxfile=/path/to/mbox [--status=status] [--tag=tag] [--mailbox=mailbox] [--skip=number] [-v]
manitou-mdx [--conf=/path/to/config] --import-list=path [--import-basedir=directory] [--status=import_status] [--tag=tagname] [--mailbox=mailbox_name];
=head1 DESCRIPTION
When invoked without argument or with --fork, run continuously to import into the database the messages that appear in the spool directories as defined in the configuration file.
=over
=item B<fork>:
Create a new process to run in background and quit if no error occurred.
=item B<pidfile>:
Write into I<pidfile> the process ID of the background process.
=item B<mboxfile>:
Import a mailbox file (mbox format) and quit. When this options is used, manitou-mdx won't try to import any files from the spool directories defined in the configuration file or to send any outgoing message.
=item B<status>:
When importing a mailbox, set the specificied status (a numeric value) to each imported message.
=item B<tag>:
When importing a mailbox, assign the specificied tag (a name) to each imported message.
=item B<skip>:
When importing a mailbox, skip the specificied number of messages before starting to import. This is useful to import in several steps.
=item B<mailbox>:
When importing a mailbox, assign the imported messages to the specified mailbox. The mailbox name is generally a destination email address.
=item B<import-list>:
Import mail files that are listed in the file, one line per file name. When specified, --import-basedir is a directory name that is prepended to all file names read from this file.
=back
=head1 CONFIGURATION
=over
The default configuration file is /etc/manitou-mdx.conf
=back
=cut
HTML source code generated by GNU Source-Highlight plus some custom post-processing
List of all available source files