X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=bcb4dc006fc1482f7f2929f35c69f37be55a318a;hb=813a3e444bc223a8c1032348a40948c91b9cb257;hp=c1f0ae7a2ac8ef3ef970edda16f733485721f90c;hpb=f0ac8322367c66080b6dbb74da4de72dae126dc3;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index c1f0ae7a..bcb4dc00 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -32,7 +32,8 @@ use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean - @badmsg $badmsgfn $forwardfn @forward $timeout $waittime); + @badmsg $badmsgfn $forwardfn @forward $timeout $waittime + $queueinterval $lastq); %work = (); # outstanding jobs @msg = (); # messages we have @@ -43,6 +44,9 @@ $last_clean = 0; # last time we did a clean @forward = (); # msg forward table $timeout = 30*60; # forwarding timeout $waittime = 60*60; # time an aborted outgoing message waits before trying again +$queueinterval = 5*60; # run the queue every 5 minutes +$lastq = 0; + $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table @@ -124,20 +128,26 @@ sub process my ($self, $line) = @_; # this is periodic processing - if (undef $self || undef $line) { + if (!$self || !$line) { # wander down the work queue stopping any messages that have timed out - for (keys %work) { - my $ref = $work{$_}; - if ($main::systime > $ref->{lastt} + $timeout) { - my $tonode = $ref->{tonode}; - $ref->stop_msg(); + for (keys %busy) { + my $node = $_; + my $ref = $busy{$_}; + if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) { + $ref->stop_msg($node); # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall; + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; } } - + + # queue some message if the interval timer has gone off + if ($main::systime > $lastq + $queueinterval) { + queue_msg(0); + $lastq = $main::systime; + } + # clean the message queue clean_old() if $main::systime - $last_clean > 3600 ; return; @@ -155,7 +165,7 @@ sub process if (exists $busy{$f[2]}) { my $ref = $busy{$f[2]}; my $tonode = $ref->{tonode}; - $ref->stop_msg(); + $ref->stop_msg($self->call); } my $t = cltounix($f[5], $f[6]); @@ -173,6 +183,7 @@ sub process $work{"$f[2]$stream"} = $ref; # store in work $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $ref->{lastt} = $main::systime; last SWITCH; } @@ -243,7 +254,7 @@ sub process my $m; for $m (@msg) { if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) { - $ref->stop_msg(); + $ref->stop_msg($self->call); my $msgno = $m->{msgno}; dbg('msg', "duplicate message to $msgno\n"); Log('msg', "duplicate message to $msgno"); @@ -253,7 +264,7 @@ sub process # look for 'bad' to addresses if (grep $ref->{to} eq $_, @badmsg) { - $ref->stop_msg(); + $ref->stop_msg($self->call); dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); return; @@ -268,12 +279,11 @@ sub process Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } - $ref->stop_msg(); - queue_msg(0); + $ref->stop_msg($self->call); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } - queue_msg(0); + # queue_msg(0); last SWITCH; } @@ -288,11 +298,10 @@ sub process push @{$ref->{gotit}}, $f[2]; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg(); + $ref->stop_msg($self->call); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } - queue_msg(0); last SWITCH; } @@ -335,7 +344,7 @@ sub process dbg('msg', "stream $f[3]: abort received\n"); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - $ref->stop_msg(); + $ref->stop_msg($self->call); $ref = undef; } @@ -344,9 +353,10 @@ sub process if ($pcno == 49) { # global delete on subject for (@msg) { - if ($_->{subject} eq $f[2]) { + if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { $_->del_msg(); - Log('msg', "Message $_->{msgno} fully deleted by $f[1]"); + Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); + DXProt::broadcast_ak1a($line, $self); } } } @@ -624,8 +634,8 @@ sub start_msg $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$dxchan->call} = $self; - $work{"$self->{tonode}"} = $self; + $busy{$self->{tonode}} = $self; + $work{$self->{tonode}} = $self; $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); } @@ -651,8 +661,8 @@ sub get_fwq # stop a message from continuing, clean it out, unlock interlocks etc sub stop_msg { - my ($self, $dxchan) = @_; - my $node = $self->{tonode} + my $self = shift; + my $node = shift; my $stream = $self->{stream} if exists $self->{stream}; @@ -791,7 +801,7 @@ sub do_send_stuff $loc->{lines} = []; $self->state('sendbody'); #push @out, $self->msg('sendbody'); - push @out, $self->msg('m8');) + push @out, $self->msg('m8'); } elsif ($self->state eq 'sendbody') { confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; @@ -829,7 +839,6 @@ sub do_send_stuff delete $self->{loc}; $self->func(undef); - DXMsg::queue_msg(0); $self->state('prompt'); } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { #push @out, $self->msg('sendabort');