X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=e3ea16ad4d7fb672ddce392ad6b2f3d5a40b8b07;hb=629ca1bff41ef8c74e61267b36f4028581789789;hp=f3c2172cfba7c45c70e7fa5d3c12353853dd2b92;hpb=48d614dae214326305879ac572d8c5f0a6150f99;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index f3c2172c..e3ea16ad 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -27,6 +27,13 @@ use IO::File; use Fcntl; use strict; + +use vars qw($VERSION $BRANCH); +$VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ ); +$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0; +$main::build += $VERSION; +$main::branch += $BRANCH; + use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime $queueinterval $lastq $importfn $minchunk $maxchunk $bulltopriv); @@ -62,10 +69,10 @@ $importfn = "$msgdir/import"; # import directory to => '0,To', from => '0,From', t => '0,Msg Time,cldatetime', - private => '5,Private', + private => '5,Private,yesno', subject => '0,Subject', linesreq => '0,Lines per Gob', - rrreq => '5,Read Confirm', + rrreq => '5,Read Confirm,yesno', origin => '0,Origin', lines => '5,Data', stream => '9,Stream No', @@ -147,19 +154,14 @@ sub process my @f = split /\^/, $line; my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number + my ($tonode, $fromnode) = @f[1, 2]; + my $stream = $f[3] if $pcno > 29 && $pcno <= 33; SWITCH: { if ($pcno == 28) { # incoming message # sort out various extant protocol errors that occur - my ($fromnode, $origin); - if ($self->is_arcluster && $f[13] eq $self->call) { - $fromnode = $f[13]; - $origin = $f[2]; - } else { - $fromnode = $f[2]; - $origin = $f[13]; - } + my $origin = $f[13]; $origin = $self->call unless $origin && $origin gt ' '; # first look for any messages in the busy queue @@ -168,27 +170,27 @@ sub process if (exists $busy{$fromnode}) { my $ref = $busy{$fromnode}; - my $tonode = $ref->{tonode}; - dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$tonode") if isdbg('msg'); - $ref->stop_msg($self->call); + my $otonode = $ref->{tonode} || "unknown"; + dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg'); + $ref->stop_msg($fromnode); } my $t = cltounix($f[5], $f[6]); - my $stream = next_transno($fromnode); + $stream = next_transno($fromnode); my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]); # fill in various forwarding state variables $ref->{fromnode} = $fromnode; - $ref->{tonode} = $f[1]; + $ref->{tonode} = $tonode; $ref->{rrreq} = $f[11]; $ref->{linesreq} = $f[10]; $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg'); - Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" ); - $work{"$fromnode$stream"} = $ref; # store in work + Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" ); + $work{"$fromnode,$stream"} = $ref; # store in work $busy{$fromnode} = $ref; # set interlock - $self->send(DXProt::pc30($fromnode, $f[1], $stream)); # send ack + $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack $ref->{lastt} = $main::systime; # look to see whether this is a non private message sent to a known callsign @@ -196,67 +198,68 @@ sub process if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { $ref->{private} = 1; dbg("set bull to $ref->{to} to private") if isdbg('msg'); + Log('msg', "set bull to $ref->{to} to private"); } last SWITCH; } if ($pcno == 29) { # incoming text - my $ref = $work{"$f[2]$f[3]"}; + my $ref = $work{"$fromnode,$stream"}; if ($ref) { $f[4] =~ s/\%5E/^/g; push @{$ref->{lines}}, $f[4]; $ref->{count}++; if ($ref->{count} >= $ref->{linesreq}) { - $self->send(DXProt::pc31($f[2], $f[1], $f[3])); - dbg("stream $f[3]: $ref->{count} lines received\n") if isdbg('msg'); + $self->send(DXProt::pc31($fromnode, $tonode, $stream)); + dbg("stream $stream: $ref->{count} lines received\n") if isdbg('msg'); $ref->{count} = 0; } $ref->{lastt} = $main::systime; } else { - dbg("PC29 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + dbg("PC29 from unknown stream $stream from $fromnode") if isdbg('msg'); + $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream } last SWITCH; } if ($pcno == 30) { # this is a incoming subject ack - my $ref = $work{$f[2]}; # note no stream at this stage + my $ref = $work{"$fromnode,"}; # note no stream at this stage if ($ref) { - delete $work{$f[2]}; - $ref->{stream} = $f[3]; + delete $work{"$fromnode,"}; + $ref->{stream} = $stream; $ref->{count} = 0; $ref->{linesreq} = 5; - $work{"$f[2]$f[3]"} = $ref; # new ref - dbg("incoming subject ack stream $f[3]\n") if isdbg('msg'); - $busy{$f[2]} = $ref; # interlock + $work{"$fromnode,$stream"} = $ref; # new ref + dbg("incoming subject ack stream $stream\n") if isdbg('msg'); + $busy{$fromnode} = $ref; # interlock push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg("PC30 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + dbg("PC30 from unknown stream $stream from $fromnode") if isdbg('msg'); + $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream } last SWITCH; } if ($pcno == 31) { # acknowledge a tranche of lines - my $ref = $work{"$f[2]$f[3]"}; + my $ref = $work{"$fromnode,$stream"}; if ($ref) { - dbg("tranche ack stream $f[3]\n") if isdbg('msg'); + dbg("tranche ack stream $stream\n") if isdbg('msg'); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg("PC31 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + dbg("PC31 from unknown stream $stream from $fromnode") if isdbg('msg'); + $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream } last SWITCH; } if ($pcno == 32) { # incoming EOM - dbg("stream $f[3]: EOM received\n") if isdbg('msg'); - my $ref = $work{"$f[2]$f[3]"}; + dbg("stream $stream: EOM received\n") if isdbg('msg'); + my $ref = $work{"$fromnode,$stream"}; if ($ref) { - $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it + $self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol # store the file or message @@ -272,10 +275,10 @@ sub process my $m; for $m (@msg) { if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { - $ref->stop_msg($self->call); + $ref->stop_msg($fromnode); my $msgno = $m->{msgno}; - dbg("duplicate message from $ref->{from} -> $ref->{to} to $msgno") if isdbg('msg'); - Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); + dbg("duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno") if isdbg('msg'); + Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno"); return; } } @@ -285,45 +288,62 @@ sub process # look for 'bad' to addresses if ($ref->dump_it($self->call)) { - $ref->stop_msg($self->call); + $ref->stop_msg($fromnode); dbg("'Bad' message $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' message $ref->{to}"); return; } + # check the message for bad words + my @words; + for (@{$ref->{lines}}) { + push @words, BadWords::check($_); + } + push @words, BadWords::check($ref->{subject}); + if (@words) { + dbg("message with badwords '@words' $ref->{from} -> $ref->{to} '$ref->{subject}' origin: $ref->{origin} via " . $self->call) if isdbg('msg'); + Log('msg',"message with badwords '@words' $ref->{from} -> $ref->{to} origin: $ref->{origin} via " . $self->call); + Log('msg',"subject: $ref->{subject}"); + for (@{$ref->{lines}}) { + Log('msg', "line: $_"); + } + $ref->stop_msg($fromnode); + return; + } + $ref->{msgno} = next_transno("Msgno"); - push @{$ref->{gotit}}, $f[2]; # mark this up as being received + push @{$ref->{gotit}}, $fromnode; # mark this up as being received $ref->store($ref->{lines}); add_dir($ref); my $dxchan = DXChannel->get($ref->{to}); $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; - Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); + Log('msg', "Message $ref->{msgno} from $ref->{from} received from $fromnode for $ref->{to}"); } } - $ref->stop_msg($self->call); + $ref->stop_msg($fromnode); } else { - dbg("PC32 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + dbg("PC32 from unknown stream $stream from $fromnode") if isdbg('msg'); + $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream } # queue_msg(0); last SWITCH; } if ($pcno == 33) { # acknowledge the end of message - my $ref = $work{"$f[2]$f[3]"}; + my $ref = $work{"$fromnode,$stream"}; if ($ref) { if ($ref->{private}) { # remove it if it private and gone off site# - Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted"); + Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted"); $ref->del_msg; } else { - Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]"); - push @{$ref->{gotit}}, $f[2]; # mark this up as being received + Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode"); + push @{$ref->{gotit}}, $fromnode; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg($self->call); + $ref->stop_msg($fromnode); } else { - dbg("PC33 from unknown stream $f[3] from $f[2]") if isdbg('msg'); - $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream + dbg("PC33 from unknown stream $stream from $fromnode") if isdbg('msg'); + $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream } # send next one if present @@ -350,28 +370,28 @@ sub process last SWITCH if !mkdir $fn, 0777; dbg("created directory $fn\n") if isdbg('msg'); } - my $stream = next_transno($f[2]); + my $stream = next_transno($fromnode); my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); # forwarding variables - $ref->{fromnode} = $f[1]; - $ref->{tonode} = $f[2]; + $ref->{fromnode} = $tonode; + $ref->{tonode} = $fromnode; $ref->{linesreq} = $f[5]; $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; $ref->{lastt} = $main::systime; - $work{"$f[2]$stream"} = $ref; # store in work - $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $work{"$fromnode,$stream"} = $ref; # store in work + $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack last SWITCH; } if ($pcno == 42) { # abort transfer - dbg("stream $f[3]: abort received\n") if isdbg('msg'); - my $ref = $work{"$f[2]$f[3]"}; + dbg("stream $stream: abort received\n") if isdbg('msg'); + my $ref = $work{"$fromnode,$stream"}; if ($ref) { - $ref->stop_msg($self->call); + $ref->stop_msg($fromnode); $ref = undef; } last SWITCH; @@ -382,7 +402,7 @@ sub process if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { $_->del_msg(); Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); - DXProt::broadcast_ak1a($line, $self); + DXChannel::broadcast_nodes($line, $self); } } } @@ -580,7 +600,6 @@ sub send_tranche sub queue_msg { my $sort = shift; - my $call = shift; my $ref; my $clref; @@ -617,12 +636,8 @@ sub queue_msg my $dxchan; if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here + next if $ref->{tonode}; # ignore it if it already being processed $clref = Route::get($ref->{to}); -# unless ($clref) { # otherwise look for a homenode -# my $uref = DXUser->get_current($ref->{to}); -# my $hnode = $uref->homenode if $uref; -# $clref = Route::Node::get($hnode) if $hnode; -# } if ($clref) { $dxchan = $clref->dxchan; if ($dxchan) { @@ -634,23 +649,27 @@ sub queue_msg dbg("Route: No dxchan for $ref->{to} " . ref($clref) ) if isdbg('msg'); } } - } - - # otherwise we are dealing with a bulletin or forwarded private message - # compare the gotit list with - # the nodelist up above, if there are sites that haven't got it yet - # then start sending it - what happens when we get loops is anyone's - # guess, use (to, from, time, subject) tuple? - foreach $dxchan (@nodelist) { - my $call = $dxchan->call; - next unless $call; - next if $call eq $main::mycall; - next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}}; - next unless $ref->forward_it($call); # check the forwarding file - - # if we are here we have a node that doesn't have this message - $ref->start_msg($dxchan) if !get_busy($call) && $dxchan->state eq 'normal'; - last; + } else { + + # otherwise we are dealing with a bulletin or forwarded private message + # compare the gotit list with + # the nodelist up above, if there are sites that haven't got it yet + # then start sending it - what happens when we get loops is anyone's + # guess, use (to, from, time, subject) tuple? + foreach $dxchan (@nodelist) { + my $call = $dxchan->call; + next unless $call; + next if $call eq $main::mycall; + next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}}; + next unless $ref->forward_it($call); # check the forwarding file + next if $ref->{tonode}; # ignore it if it already being processed + + # if we are here we have a node that doesn't have this message + if (!get_busy($call) && $dxchan->state eq 'normal') { + $ref->start_msg($dxchan); + last; + } + } } # if all the available nodes are busy then stop @@ -678,15 +697,19 @@ sub start_msg { my ($self, $dxchan) = @_; + confess("trying to start started msg $self->{msgno} nodes: $self->{fromnode} -> $self->{tonode}") if $self->{tonode}; dbg("start msg $self->{msgno}\n") if isdbg('msg'); $self->{linesreq} = 10; $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; $busy{$self->{tonode}} = $self; - $work{$self->{tonode}} = $self; + $work{"$self->{tonode},"} = $self; $self->{lastt} = $main::systime; - $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); + my ($fromnode, $origin); + $fromnode = $self->{fromnode}; + $origin = $self->{origin}; + $dxchan->send(DXProt::pc28($self->{tonode}, $fromnode, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $origin, $self->{rrreq})); } # get the ref of a busy node @@ -699,13 +722,37 @@ sub get_busy # get the busy queue sub get_all_busy { - return values %busy; + return keys %busy; } -# get the forwarding queue +# get a forwarding queue entry sub get_fwq { - return values %work; + my $call = shift; + my $stream = shift || '0'; + return $work{"$call,$stream"}; +} + +# delete a forwarding queue entry +sub del_fwq +{ + my $call = shift; + my $stream = shift || '0'; + return delete $work{"$call,$stream"}; +} + +# set a fwq entry +sub set_fwq +{ + my $call = shift; + my $stream = shift || '0'; + return $work{"$call,$stream"} = shift; +} + +# get the whole forwarding queue +sub get_all_fwq +{ + return keys %work; } # stop a message from continuing, clean it out, unlock interlocks etc @@ -717,8 +764,8 @@ sub stop_msg dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg'); - delete $work{$node}; - delete $work{"$node$stream"} if $stream; + delete $work{"$node,"}; + delete $work{"$node,$stream"} if $stream; $self->workclean; delete $busy{$node}; } @@ -851,6 +898,11 @@ sub do_send_stuff # $DB::single = 1; confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; + if (my @ans = BadWords::check($line)) { + $self->{badcount} += @ans; + Log('msg', $self->call . " used badwords: @ans to @{$loc->{to}} in msg"); + return ($self->msg('e17', @ans), $self->msg('m1')); + } $loc->{subject} = $line; $loc->{lines} = []; $self->state('sendbody'); @@ -903,12 +955,18 @@ sub do_send_stuff $self->func(undef); $self->state('prompt'); } else { + if (my @ans = BadWords::check($line)) { + $self->{badcount} += @ans; + Log('msg', $self->call . " used badwords: @ans to @{$loc->{to}} subject: '$loc->{subject}' in msg"); + Log('msg', "line: $line"); + return ($self->msg('e17', @ans)); + } # i.e. it ain't and end or abort, therefore store the line push @{$loc->{lines}}, length($line) > 0 ? $line : " "; } } - return (1, @out); + return @out; } # return the standard directory line for this ref @@ -1094,7 +1152,7 @@ sub import_msgs my @msg = map { chomp; $_ } ; close(MSG); unlink($fn); - my @out = import_one($DXProt::me, \@msg, $splitit); + my @out = import_one($main::me, \@msg, $splitit); Log('msg', @out); } }