Manitou-Mail logo title

Source file: mdx/lib/Manitou/Words.pm

# Copyright (C) 2004-2012 Daniel Verite

# This file is part of Manitou-Mail (see http://www.manitou-mail.org)

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

package Manitou::Words;

use strict;
use vars qw(@ISA @EXPORT_OK);
use Carp;

require Exporter;
@ISA = qw(Exporter);
@EXPORT_OK = qw(load_stopwords index_words flush_word_vectors clear_word_vectors
		queue_size last_flush_time search load_partsize);

use DBD::Pg qw(:pg_types);
use Time::HiRes qw(gettimeofday tv_interval);
use Bit::Vector;
use Manitou::Log qw(error_log notice_log);
use Manitou::Config qw(getconf getconf_bool);
use Manitou::Encoding qw(encode_dbtxt decode_dbtxt);
use Manitou::Attachments;
use Manitou::Database qw(bytea_output);
use Unicode::Normalize;
use HTML::TreeBuilder;
use integer;
use Data::Dumper;

# cache for indexed words
my %hwords;

# Cache for bit vectors. The hash is two-level deep
# $vecs{$widx}->{$part_no} => vector for the word of index $widx and
# for the messages whose mail_id fit in the $part_no partition
my %vecs;
our $vecs_estim_size;		# estimated size in bytes

my %no_index_words;

# The size of partitions.
my $partsize;

# When $unaccent is true, we index only the unaccented form of words.
# This allows for accent-insensitive search.
my $unaccent;

# When $add_unaccent is true, accented words are indexed both with and
# without accents. This allows for exact searches of accented words in
# addition to accent-insensitive search.
my $add_unaccent;

# Read the configuration only once
my $accents_configured=0;

# The queue of mails whose word vectors haven't been flushed the db yet
my @flush_queue;

# Time of last flush
my $last_flush_time=time;

my $max_mail_id_index_flush=0;

sub queue_size {
  return scalar(@flush_queue);
}

sub load_partsize {
  my $dbh=shift;
  my $s=$dbh->prepare("SELECT rt_value FROM runtime_info WHERE rt_key='word_index_partsize'");
  $s->execute;
  ($partsize) = $s->fetchrow_array;
  # If no 'word_index_partsize' entry, we assume a default value, but 
  # only if nothing has been indexed yet
  if ($s->rows==0) {
    $s=$dbh->prepare("SELECT word_id FROM inverted_word_index LIMIT 1");
    $s->execute;
    if ($s->rows!=0) {
      croak "Fatal error: unable to find a 'word_index_partsize' entry in the RUNTIME_INFO table and INVERTED_WORD_INDEX is not empty\n";
    }
    $partsize=16384;		# default value
    $dbh->do("INSERT INTO runtime_info(rt_key,rt_value) VALUES('word_index_partsize','16384')");
  }
  return $partsize;
}

sub get_accents_conf {
  my $mode=getconf("index_words_accent_mode");
  if ($mode eq "dual" || !defined $mode) {
    $unaccent=1;
    $add_unaccent=1;
  }
  elsif ($mode eq "strip") {
    $unaccent=1;
    $add_unaccent=0;
  }
  elsif ($mode eq "keep") {
    $unaccent=0;
    $add_unaccent=0;
  }
  $accents_configured=1;
}

sub load_stopwords {
  my $dbh=shift;
  my $sth=$dbh->prepare("SELECT wordtext FROM non_indexable_words");
  $sth->execute;
  while (my @row=$sth->fetchrow_array) {
    $no_index_words{$row[0]}=1;
  }
  $sth->finish;
}

sub flush_word_vectors {
  my $dbh=shift;
  my $options=shift;

  my $sthu=$dbh->prepare("UPDATE inverted_word_index SET mailvec=?,nz_offset=? WHERE word_id=? AND part_no=?");

  my $vec_cnt_insert=0;
  my $vec_cnt_update=0;
  my $t0 = [gettimeofday];
  my @insert_array;

  foreach my $wid (keys %vecs) {
    foreach my $part (keys %{$vecs{$wid}}) {
      next if (!exists $vecs{$wid}->{$part}->{dirty});
      my $bits;
      my $v = $vecs{$wid}->{$part}->{v};
      my ($min,$max) = $v->Interval_Scan_inc(0);
      my $nz_offset=0;
      if ($min>0) {
	$nz_offset=$min/8;
	my $v2 = Bit::Vector->new(0);
	$v2->Interval_Substitute($v, 0, 0, $nz_offset*8, ($v->Size()-$nz_offset*8));
	$bits=$v2->Block_Read();
	# Block_Read rounds to more than 8 bits (32 bits?) so we cut
	# to 8 bits
	$bits=substr($bits, 0, ($v2->Size()+7)/8);
      }
      else {
	$bits=$v->Block_Read();
	# Block_Read rounds to more than 8 bits (32 bits?) so we cut
	# to 8 bits
	$bits=substr($bits, 0, ($v->Size()+7)/8);
      }
      if (defined $vecs{$wid}->{$part}->{insert}) {
	#insert
	my $bits_text=$dbh->quote($bits, { pg_type=>DBD::Pg::PG_BYTEA });
	$bits_text =~ s/''/'/g;
	$bits_text = substr($bits_text,2,length($bits_text)-3);
	push @insert_array, [ $wid, $part, $bits_text, $nz_offset ];
	delete $vecs{$wid}->{$part}->{insert};
	$vec_cnt_insert++;
      }
      else {
	# update
	$sthu->bind_param(1, $bits, { pg_type => DBD::Pg::PG_BYTEA });
	$sthu->bind_param(2, $nz_offset);
	$sthu->bind_param(3, $wid);
	$sthu->bind_param(4, $part);
	$sthu->execute;
	$vec_cnt_update++;
      }
      # Can't do this check with indexes created with pre-1.3.0 versions
      # and not recreated.
      # if (length($bits)+$nz_offset>$partsize/8) {
      #	die sprintf("Vector too large (%d bytes) for (word_id,part_no)=(%d,%d)", length($bits)+$nz_offset, $wid, $part);
      #      }
      delete $vecs{$wid}->{$part}->{dirty};
    }
  }

  if (@insert_array>0) {
    $dbh->do("COPY inverted_word_index(word_id,part_no,mailvec,nz_offset) FROM STDIN");
    foreach my $vl (@insert_array) {
      $dbh->pg_putcopydata(join("\t", @{$vl})."\n");
    }
    $dbh->pg_putcopyend();
  }

  if (!defined $options->{no_jobs_queue}) {
    my $sthd=$dbh->prepare("DELETE FROM jobs_queue WHERE mail_id=? AND job_type='widx'");
    foreach (@flush_queue) {
      $sthd->execute($_);
    }
  }

  @flush_queue=();
  $last_flush_time=time;
  notice_log(sprintf("Index vectors flush: %d inserted, %d updated in %0.2fs",$vec_cnt_insert, $vec_cnt_update, tv_interval($t0)));
}

sub last_flush_time {
  return $last_flush_time;
}

sub clear_word_vectors {
  %vecs=();
  %hwords=();
  $vecs_estim_size=0;
}

# Clear the bits corresponding to a mail_id in the inverted word
# index cache in memory.
# Normally, this is always the last inserted mail, after is has been
# rolled back
# It's important to delete the vector if it's empty because the word
# entry may not exist at all in the database and there's an FK constraint
# from table inverted_word_index referencing table words.
sub clear_last_indexed_mail {
  my ($dbh,$mail_id)=@_;
  load_partsize($dbh) if !defined($partsize);
  my $part_no = $mail_id / $partsize;
  my $bit_id = ($mail_id-1) % $partsize;

  foreach my $wid (keys %vecs) {
    if (exists $vecs{$wid}->{$part_no}->{dirty}) {
      my $vec=$vecs{$wid}->{$part_no}->{v};
      if ($vec->Size()>$bit_id && $vec->bit_test($bit_id)) {
	$vec->Bit_Off($bit_id);
	if ($vec->is_empty()) {
	  delete $vecs{$wid}->{$part_no};
	}
	else {
	  $vecs{$wid}->{$part_no}->{v}=$vec;
	}
      }
    }
  }
  # since INSERT INTO words may have been rolled back,
  # the cache of word_id can no longer be trusted, so it's reset
  %hwords=();
}

sub extract_words {
  my ($ptext, $tb)=@_;
  my %seen;
  foreach (split(/[\x{0}-\x{1e}\x{80}-\x{bf}\s+,\.\(\)\\<\>\{\}\x{2013}\x{2019}\x{201c}\x{201d}\"'`:;\/!\[\]\?=*\|]/o, $$ptext)) {
    next if (/^[-_#%|*=]+$/);  # skip horizontal separation lines
    if (/^[-~*_^|_=]+(.*)$/) {
      $_ = $1;
    }
    if (/^([^-~*^|_=]+)[-~*^|_=]+$/) {
      $_ = $1;
    }

    next if (length($_)<=2 || length($_)>50);
    $_=lc($_);
    next if (exists $seen{$_});
    $seen{$_}=1;
    next if (exists $no_index_words{$_});

    if ($unaccent && ! /^[0-9_a-z]+$/) {
      my $w=NFD($_);
      $w =~ s/\pM//g;  # strip combining characters
      if ($w ne $_) {
	if (!exists $seen{$w} && !exists $no_index_words{$w}) {
	  # push the non-accented version
	  push @{$tb}, $w;
	  $seen{$w}=1;
	}
	push @{$tb}, $_ if ($add_unaccent);
      }
      else {
	push @{$tb}, $_;
      }
    }
    else {
      push @{$tb}, $_;
    }
    # Add components of compound words
    my @cw=split /-/;
    if (@cw>1) {
      foreach (@cw) {
	next if (length($_)<=2 || length($_)>50);
	next if (exists $seen{$_});
	$seen{$_}=1;
	next if (exists $no_index_words{$_});
	if ($unaccent && ! /^[0-9_a-z]+$/) {
	  my $w=NFD($_);
	  $w =~ s/\pM//g;	# strip combining characters
	  if ($w ne $_) {
	    if (!exists $seen{$w} && !exists $no_index_words{$w}) {
	      # push the non-accented version
	      push @{$tb}, $w;
	      $seen{$w}=1;
	    }
	    push @{$tb}, $_ if ($add_unaccent);
	  } else {
	    push @{$tb}, $_;
	  }
	} else {
	  push @{$tb}, $_;
	}
      }
    }
  }

  # extract complete email addresses, plus local part and domain
  # components, with and without TLD
  while ($$ptext =~ m/\b([A-Z0-9._%+-]+)\@([A-Z0-9.-]+)\.([A-Z]+)\b/gi) {
    my $lp=lc($1);
    my $h=lc($2);
    my $hh="$h.".lc($3);
    my $em=lc("$lp\@$hh");
    foreach ($em,$lp,$h,$hh) {
      next if (length($_)>50 || length($_)<=2);
      if (!exists $seen{$_}) {
	push @{$tb}, $_;
	$seen{$_}=1;
      }
    }
    # If the local part contains dots, extract parts
    foreach (split /\./, $lp) {
      next if (length($_)>50 || length($_)<=2);
      if (!exists $seen{$_}) {
	push @{$tb}, $_;
	$seen{$_}=1;
      }
    }
  }
}


sub index_words {
  my ($dbh, $mail_id, $pbody, $pheader, $ref_more_text, $isub, $ctxt)=@_;
  load_partsize($dbh) if !defined($partsize);
  get_accents_conf() if (!$accents_configured);

  my $cnt=0;
  my $sth_w = $dbh->prepare("SELECT word_id FROM words WHERE wordtext=?");
  my $sth_n = $dbh->prepare("INSERT INTO words(word_id,wordtext) VALUES (nextval('seq_word_id'),?) RETURNING word_id");
  my $svec = $dbh->prepare("SELECT mailvec,nz_offset FROM inverted_word_index WHERE word_id=? AND part_no=?");

  my @words;
  extract_words($pbody, \@words);
  extract_words($pheader, \@words) if (defined $pheader);
  extract_words($ref_more_text, \@words) if (defined $ref_more_text);

  # hbody_words contains a unique entry for each word occurring in the
  # body, and is used to avoid inserting multiple (word_id,mail_id) tuples.
  my %hbody_words;

  for my $s (@words) {
    next if (defined($no_index_words{$s}) or defined($hbody_words{$s}));
    $hbody_words{$s}=1;

    # Find the word_id
    my $word_id=$hwords{$s};
    if (!defined $word_id) {
      # The word hasn't been encountered before in any mail
      my $se=encode_dbtxt($s);

      if (defined $isub) {
	$word_id = $isub->($se, $ctxt);	
	if (!$word_id) {
	  die "Failed to obtain a word_id for '$se' from external function";
	}
	$hwords{$s}=$word_id;
      }
      else {
	$sth_w->execute($se);
	my @r=$sth_w->fetchrow_array;
	if (@r) {
	  $word_id=$r[0];
	  $hwords{$s}=$word_id;
	}
	else {
	  # The word isn't in the words table: let's insert it
	  $sth_n->execute($se);
	  ($word_id) = $sth_n->fetchrow_array;
	  $hwords{$s}=$word_id;
	}
      }
    }

    # Find the vector
    my $part_no = $mail_id / $partsize;
    my $bit_id = ($mail_id-1) % $partsize;
    my $vec = $vecs{$word_id}->{$part_no}->{v};

    if (!defined $vec) {
      $svec->execute($word_id, $part_no);
      if ($svec->rows>0) {
	# found in db
	my @r=$svec->fetchrow_array;
	my $bits = "\000"x$r[1] . $r[0];
	$vec = Bit::Vector->new(length($bits)*8);
	# Can't do this check with indexes created with pre-1.3.0 versions
	# and not recreated.
	# if (length($bits) > $partsize/8)	{ # sanity check
	#  die sprintf("Word vector read from database for (word_id=%d,part_no=%d) exceeds the maximum size (%d bytes,max=%d bytes)", $word_id, $part_no, length($bits), $partsize/8);
	# }
	$vecs{$word_id}->{$part_no}->{v} = $vec;
	$vec->Block_Store($bits);
	$vecs_estim_size += ($vec->Size()+7)>>3;
      }
      else {
	$vec = Bit::Vector->new($bit_id+1);
	$vecs{$word_id}->{$part_no}->{v} = $vec;
	$vecs{$word_id}->{$part_no}->{insert} = 1; # not in db yet
	$vecs{$word_id}->{$part_no}->{dirty} = 1;
      }
    }
    if ($vec->Size()<$bit_id+1) {
      $vecs_estim_size += scalar(($bit_id+1+7>>3)-($vec->Size()+7>>3));
      $vec->Resize($bit_id+1);
    }
    if (!$vec->bit_test($bit_id)) {
      $vec->Bit_On($bit_id);
      $vecs{$word_id}->{$part_no}->{dirty} = 1;
    }
    $cnt++;
  }

  if ($cnt) {
    push @flush_queue, $mail_id;
  }
  if ($mail_id > $max_mail_id_index_flush) {
    $max_mail_id_index_flush = $mail_id;
  }
}

sub fetch_vec {
  my ($sth_w, $svec, $vr, $w)=@_;

  $sth_w->execute($w);
  my ($wid)=$sth_w->fetchrow_array;
  return 0 if (!$wid);
  $svec->execute($wid);
  while (my ($vecbits,$part,$nz_offset) = $svec->fetchrow_array) {
    my $bits = "\000"x$nz_offset . $vecbits;
    my $v = Bit::Vector->new(length($bits)*8);
    $v->Block_Store($bits);
    if (defined $vr->{$part}) {
      if ($vr->{$part}->Size() < $v->Size()) {
	$v->Resize($vr->{$part}->Size());
      }
      elsif ($vr->{$part}->Size() > $v->Size()) {
	$vr->{$part}->Resize($v->Size());
      }
      $vr->{$part}->And($vr->{$part}, $v);
    }
    else {
      $vr->{$part}=$v;
    }
  }
  return 1;
}

sub vecs_to_mailid {
  my $vr=shift;
  my @result;
  foreach my $p (sort (keys %{$vr})) {
    foreach my $bit ($vr->{$p}->Index_List_Read()) {
       # each bit number is a (mail_id/partsize)-1
      push @result, $bit+($p*$partsize)+1;
    }
  }
  return @result;
}

sub and_lists {
  my ($list_one,$list_two) = @_;
  my %temp = ();
  @temp{@$list_one} = (1) x @$list_one;
  return grep $temp{$_}, @$list_two;
}

sub search {
  my $dbh=shift;
  load_partsize($dbh) if !defined($partsize);
  my ($nb_substrings, $nb_words);
  my $sth_w = $dbh->prepare("SELECT word_id FROM words WHERE wordtext=?");
  my $svec = $dbh->prepare("SELECT mailvec,part_no,nz_offset FROM inverted_word_index WHERE word_id=?");

  my %vres;
  my @res_substrings;
  foreach my $w (@_) {
    my @words=split /\W/, $w;
    if (@words==1) {
      # search for one individual word
      $nb_words++;
      fetch_vec($sth_w, $svec, \%vres, $w);
    }
    else {
      # search for a substring
      # first, reduce the search space to the words within the string
      # then intersect with the mails containing the exact string in
      # the mail bodies
      my %vr;
      foreach (@words) {
	fetch_vec($sth_w, $svec, \%vr, $_) if (length($_)>2);
      }
      my $mid=join(',', vecs_to_mailid(\%vr));
      if ($mid) {
	my $qw=$dbh->quote("%$w%");
	my $s=$dbh->prepare("SELECT mail_id FROM body WHERE bodytext ilike $qw AND mail_id IN ($mid) ORDER BY mail_id");
	$s->execute;
	my @r;
	while (my @rr=$s->fetchrow_array) {
	  push @r, $rr[0];
	}
	if ($nb_substrings>0) {
	  @res_substrings = and_lists(\@res_substrings, \@r);
	}
	else {
	  @res_substrings=@r;
	}
	$nb_substrings++;
      }
    }
  }

  if ($nb_substrings>0) {
    if ($nb_words>0) {
      my @results=vecs_to_mailid(\%vres);
      return and_lists(\@res_substrings, \@results);
    }
    else {
      return @res_substrings;
    }
  }
  else {
    return vecs_to_mailid(\%vres);
  }
}

# Given a fully decoded mail header, extract the strings that
# should be full-text indexed along with the contents of
# the mail
sub header_contents_to_ftidx {
  my $r;
  while ($_[0] =~ /^(Subject|From|To|Cc|Date): (.*)$/img) {
    $r.="$2\n";
  }
  return $r;
}


sub flush_jobs_queue {
  my $dbh=shift;
  my $sth=$dbh->prepare("SELECT job_id,mail_id FROM jobs_queue WHERE job_type='widx'");
  $sth->execute;
  if ($sth->rows>0) {
    my %extractors = Manitou::Attachments::text_extractors();
    my $idx_html = getconf_bool("index_words_html_parts");

    my $col_html = $idx_html ? "bodyhtml":"null";
    my $sthb=$dbh->prepare("SELECT bodytext,$col_html FROM body WHERE mail_id=?");
    my $sthh=$dbh->prepare("SELECT lines FROM header WHERE mail_id=?");
    while (my ($job_id,$mail_id)=$sth->fetchrow_array) {
      $sthb->execute($mail_id) or die $dbh->errstr;
      my ($body,$html)=$sthb->fetchrow_array;
      $sthh->execute($mail_id) or die $dbh->errstr;
      my ($header)=$sthh->fetchrow_array;
      $body = decode_dbtxt($body);
      my $other_words;
      if ($html) {
	$html = decode_dbtxt($html);
	$other_words = html_to_text(\$html);
      }
      $header = decode_dbtxt($header);
      $header = Manitou::Words::header_contents_to_ftidx($header);
      if (scalar(%extractors)!=0 || ($idx_html && length($html)>0)) {
	Manitou::Attachments::launch_text_extractors($dbh, $mail_id,
						     \%extractors,
						     \$other_words);
      }
      index_words($dbh, $mail_id, \$body, \$header, \$other_words);
    }
  }

  if (queue_size()>0) {
    flush_word_vectors($dbh);
    clear_word_vectors;
  }
}

# Taken from HTML::Element::as_text and modified to add a space
# after each piece of text
sub in_html_to_text {
  # Yet another iteratively implemented traverser
  my($this,%options) = @_;
  my $nillio = [];
  my(@pile) = ($this);
  my $tag;
  my $text = '';

  while(@pile) {
    if(!defined($pile[0])) { # undef!
      # no-op
    } elsif(!ref($pile[0])) { # text bit!  save it!
      $text .= (shift @pile)." ";
    } else { # it's a ref -- traverse under it
      unshift @pile, @{$this->{'_content'} || $nillio}
        unless
          ($tag = ($this = shift @pile)->{'_tag'}) eq 'style'
          or $tag eq 'script';
    }
  }
  return $text;
}

# In: html source
sub html_to_text {
  my $tree = HTML::TreeBuilder->new;
  $tree->store_declarations(0);
  eval {
    $tree->parse_content($_[0]);
  };
  my $txt= $@ ? undef:in_html_to_text($tree);
  $tree->delete();
  return $txt;
}

1;

HTML source code generated by GNU Source-Highlight plus some custom post-processing

List of all available source files