X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=e5fa41a8f3bce122e574f0728660d31899875fe9;hb=c1eb1d4013a7d748c0fc22f778ddb719dc151a1b;hp=c1f0ae7a2ac8ef3ef970edda16f733485721f90c;hpb=f0ac8322367c66080b6dbb74da4de72dae126dc3;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index c1f0ae7a..e5fa41a8 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -124,17 +124,17 @@ sub process my ($self, $line) = @_; # this is periodic processing - if (undef $self || undef $line) { + if (!$self || !$line) { # wander down the work queue stopping any messages that have timed out - for (keys %work) { - my $ref = $work{$_}; - if ($main::systime > $ref->{lastt} + $timeout) { - my $tonode = $ref->{tonode}; - $ref->stop_msg(); + for (keys %busy) { + my $node = $_; + my $ref = $busy{$_}; + if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) { + $ref->stop_msg($node); # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall; + $ref->{waitt} = $main::systime + $waittime if $node ne $main::mycall; } } @@ -155,7 +155,7 @@ sub process if (exists $busy{$f[2]}) { my $ref = $busy{$f[2]}; my $tonode = $ref->{tonode}; - $ref->stop_msg(); + $ref->stop_msg($self->call); } my $t = cltounix($f[5], $f[6]); @@ -243,7 +243,7 @@ sub process my $m; for $m (@msg) { if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) { - $ref->stop_msg(); + $ref->stop_msg($self->call); my $msgno = $m->{msgno}; dbg('msg', "duplicate message to $msgno\n"); Log('msg', "duplicate message to $msgno"); @@ -253,7 +253,7 @@ sub process # look for 'bad' to addresses if (grep $ref->{to} eq $_, @badmsg) { - $ref->stop_msg(); + $ref->stop_msg($self->call); dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); return; @@ -268,7 +268,7 @@ sub process Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } - $ref->stop_msg(); + $ref->stop_msg($self->call); queue_msg(0); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream @@ -288,7 +288,7 @@ sub process push @{$ref->{gotit}}, $f[2]; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg(); + $ref->stop_msg($self->call); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -335,7 +335,7 @@ sub process dbg('msg', "stream $f[3]: abort received\n"); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - $ref->stop_msg(); + $ref->stop_msg($self->call); $ref = undef; } @@ -344,9 +344,10 @@ sub process if ($pcno == 49) { # global delete on subject for (@msg) { - if ($_->{subject} eq $f[2]) { + if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { $_->del_msg(); - Log('msg', "Message $_->{msgno} fully deleted by $f[1]"); + Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); + DXProt::broadcast_ak1a($line, $self); } } } @@ -624,8 +625,8 @@ sub start_msg $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$dxchan->call} = $self; - $work{"$self->{tonode}"} = $self; + $busy{$self->{tonode}} = $self; + $work{$self->{tonode}} = $self; $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); } @@ -651,8 +652,8 @@ sub get_fwq # stop a message from continuing, clean it out, unlock interlocks etc sub stop_msg { - my ($self, $dxchan) = @_; - my $node = $self->{tonode} + my $self = shift; + my $node = shift; my $stream = $self->{stream} if exists $self->{stream}; @@ -791,7 +792,7 @@ sub do_send_stuff $loc->{lines} = []; $self->state('sendbody'); #push @out, $self->msg('sendbody'); - push @out, $self->msg('m8');) + push @out, $self->msg('m8'); } elsif ($self->state eq 'sendbody') { confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc};