X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=3262a44df98f7c13af304c97c8ece3d735154b0c;hb=8aa1d223307c50d8bbaa0ed4ef915f8c7365bc6e;hp=c1f0ae7a2ac8ef3ef970edda16f733485721f90c;hpb=f0ac8322367c66080b6dbb74da4de72dae126dc3;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index c1f0ae7a..3262a44d 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -32,7 +32,8 @@ use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean - @badmsg $badmsgfn $forwardfn @forward $timeout $waittime); + @badmsg $badmsgfn $forwardfn @forward $timeout $waittime + $queueinterval $lastq); %work = (); # outstanding jobs @msg = (); # messages we have @@ -43,33 +44,36 @@ $last_clean = 0; # last time we did a clean @forward = (); # msg forward table $timeout = 30*60; # forwarding timeout $waittime = 60*60; # time an aborted outgoing message waits before trying again +$queueinterval = 2*60; # run the queue every 2 minutes +$lastq = 0; + $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table %valid = ( - fromnode => '9,From Node', - tonode => '9,To Node', + fromnode => '5,From Node', + tonode => '5,To Node', to => '0,To', from => '0,From', t => '0,Msg Time,cldatetime', - private => '9,Private', + private => '5,Private', subject => '0,Subject', linesreq => '0,Lines per Gob', - rrreq => '9,Read Confirm', + rrreq => '5,Read Confirm', origin => '0,Origin', lines => '5,Data', stream => '9,Stream No', - count => '9,Gob Linecnt', - file => '9,File?,yesno', - gotit => '9,Got it Nodes,parray', - lines => '9,Lines,parray', - 'read' => '9,Times read', + count => '5,Gob Linecnt', + file => '5,File?,yesno', + gotit => '5,Got it Nodes,parray', + lines => '5,Lines,parray', + 'read' => '5,Times read', size => '0,Size', msgno => '0,Msgno', keep => '0,Keep this?,yesno', - lastt => '9,Last processed,cldatetime', - waitt => '9,Wait until,cldatetime', + lastt => '5,Last processed,cldatetime', + waitt => '5,Wait until,cldatetime', ); sub DESTROY @@ -124,20 +128,27 @@ sub process my ($self, $line) = @_; # this is periodic processing - if (undef $self || undef $line) { + if (!$self || !$line) { # wander down the work queue stopping any messages that have timed out - for (keys %work) { - my $ref = $work{$_}; - if ($main::systime > $ref->{lastt} + $timeout) { - my $tonode = $ref->{tonode}; - $ref->stop_msg(); + for (keys %busy) { + my $node = $_; + my $ref = $busy{$_}; + if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) { + dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + $ref->stop_msg($node); # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall; + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; } } - + + # queue some message if the interval timer has gone off + if ($main::systime > $lastq + $queueinterval) { + queue_msg(0); + $lastq = $main::systime; + } + # clean the message queue clean_old() if $main::systime - $last_clean > 3600 ; return; @@ -155,7 +166,8 @@ sub process if (exists $busy{$f[2]}) { my $ref = $busy{$f[2]}; my $tonode = $ref->{tonode}; - $ref->stop_msg(); + dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]"); + $ref->stop_msg($self->call); } my $t = cltounix($f[5], $f[6]); @@ -173,6 +185,14 @@ sub process $work{"$f[2]$stream"} = $ref; # store in work $busy{$f[2]} = $ref; # set interlock $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $ref->{lastt} = $main::systime; + + # look to see whether this is a non private message sent to a known callsign + my $uref = DXUser->get_current($ref->{to}); + if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { + $ref->{private} = 1; + dbg('msg', "set bull to $ref->{to} to private"); + } last SWITCH; } @@ -187,6 +207,9 @@ sub process $ref->{count} = 0; } $ref->{lastt} = $main::systime; + } else { + dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" ); + $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; } @@ -206,6 +229,7 @@ sub process $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { + dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -218,6 +242,7 @@ sub process $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { + dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -243,7 +268,7 @@ sub process my $m; for $m (@msg) { if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) { - $ref->stop_msg(); + $ref->stop_msg($self->call); my $msgno = $m->{msgno}; dbg('msg', "duplicate message to $msgno\n"); Log('msg', "duplicate message to $msgno"); @@ -253,7 +278,7 @@ sub process # look for 'bad' to addresses if (grep $ref->{to} eq $_, @badmsg) { - $ref->stop_msg(); + $ref->stop_msg($self->call); dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); return; @@ -264,16 +289,16 @@ sub process $ref->store($ref->{lines}); add_dir($ref); my $dxchan = DXChannel->get($ref->{to}); - $dxchan->send($dxchan->msg('m9')) if $dxchan; + $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user; Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } - $ref->stop_msg(); - queue_msg(0); + $ref->stop_msg($self->call); } else { + dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } - queue_msg(0); + # queue_msg(0); last SWITCH; } @@ -288,10 +313,13 @@ sub process push @{$ref->{gotit}}, $f[2]; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg(); + $ref->stop_msg($self->call); } else { + dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" ); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } + + # send next one if present queue_msg(0); last SWITCH; } @@ -325,6 +353,7 @@ sub process $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; + $ref->{lastt} = $main::systime; $work{"$f[2]$stream"} = $ref; # store in work $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack @@ -335,7 +364,7 @@ sub process dbg('msg', "stream $f[3]: abort received\n"); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - $ref->stop_msg(); + $ref->stop_msg($self->call); $ref = undef; } @@ -344,9 +373,10 @@ sub process if ($pcno == 49) { # global delete on subject for (@msg) { - if ($_->{subject} eq $f[2]) { + if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { $_->del_msg(); - Log('msg', "Message $_->{msgno} fully deleted by $f[1]"); + Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); + DXProt::broadcast_ak1a($line, $self); } } } @@ -558,7 +588,7 @@ sub queue_msg # ignore 'delayed' messages until their waiting time has expired if (exists $ref->{waitt}) { - next if $ref->{waitt} < $main::systime; + next if $ref->{waitt} > $main::systime; delete $ref->{waitt}; } @@ -624,8 +654,9 @@ sub start_msg $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$dxchan->call} = $self; - $work{"$self->{tonode}"} = $self; + $busy{$self->{tonode}} = $self; + $work{$self->{tonode}} = $self; + $self->{lastt} = $main::systime; $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq})); } @@ -651,8 +682,8 @@ sub get_fwq # stop a message from continuing, clean it out, unlock interlocks etc sub stop_msg { - my ($self, $dxchan) = @_; - my $node = $self->{tonode} + my $self = shift; + my $node = shift; my $stream = $self->{stream} if exists $self->{stream}; @@ -791,7 +822,7 @@ sub do_send_stuff $loc->{lines} = []; $self->state('sendbody'); #push @out, $self->msg('sendbody'); - push @out, $self->msg('m8');) + push @out, $self->msg('m8'); } elsif ($self->state eq 'sendbody') { confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; @@ -829,7 +860,6 @@ sub do_send_stuff delete $self->{loc}; $self->func(undef); - DXMsg::queue_msg(0); $self->state('prompt'); } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { #push @out, $self->msg('sendabort');