X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=d5631904971148fae3c246cbff338d4033b4cb2b;hb=fdc49835d7dc5573453567bd41e52c5e580ad8e7;hp=ee2e7e5ec67c52798041a9cd92e67b6be228be6e;hpb=35b5c7d7fa637ed0722613d81b36000822b1be03;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index ee2e7e5e..d5631904 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -15,8 +15,6 @@ package DXMsg; -@ISA = qw(DXProt DXChannel); - use DXUtil; use DXChannel; use DXUser; @@ -82,13 +80,6 @@ $importfn = "$msgdir/import"; # import directory waitt => '5,Wait until,cldatetime', ); -sub DESTROY -{ - my $self = shift; - undef $self->{lines}; - undef $self->{gotit}; -} - # allocate a new object # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper sub alloc @@ -110,6 +101,7 @@ sub alloc $self->{rrreq} = shift; $self->{gotit} = []; $self->{lastt} = $main::systime; + $self->{lines} = []; return $self; } @@ -122,7 +114,6 @@ sub workclean delete $ref->{tonode}; delete $ref->{fromnode}; delete $ref->{stream}; - delete $ref->{lines}; delete $ref->{file}; delete $ref->{count}; delete $ref->{lastt} if exists $ref->{lastt}; @@ -138,19 +129,6 @@ sub process if ($main::systime >= $lastq + $queueinterval) { - # wander down the work queue stopping any messages that have timed out - for (keys %busy) { - my $node = $_; - my $ref = $busy{$_}; - if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { - dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); - $ref->stop_msg($node); - - # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; - } - } - # queue some message if the interval timer has gone off queue_msg(0); @@ -202,7 +180,7 @@ sub process # look to see whether this is a non private message sent to a known callsign my $uref = DXUser->get_current($ref->{to}); - if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { + if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { $ref->{private} = 1; dbg('msg', "set bull to $ref->{to} to private"); } @@ -212,6 +190,7 @@ sub process if ($pcno == 29) { # incoming text my $ref = $work{"$f[2]$f[3]"}; if ($ref) { + $f[4] =~ s/\%5E/^/g; push @{$ref->{lines}}, $f[4]; $ref->{count}++; if ($ref->{count} >= $ref->{linesreq}) { @@ -237,7 +216,6 @@ sub process $work{"$f[2]$f[3]"} = $ref; # new ref dbg('msg', "incoming subject ack stream $f[3]\n"); $busy{$f[2]} = $ref; # interlock - $ref->{lines} = []; push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); $ref->{lastt} = $main::systime; @@ -293,8 +271,7 @@ sub process $ref->swop_it($self->call); # look for 'bad' to addresses -# if (grep $ref->{to} eq $_, @badmsg) { - if ($ref->dump_it($self->call)) { + if ($ref->dump_it) { $ref->stop_msg($self->call); dbg('msg', "'Bad' message $ref->{to}"); Log('msg', "'Bad' message $ref->{to}"); @@ -407,12 +384,7 @@ sub store { my $ref = shift; my $lines = shift; - - # we only proceed if there are actually any lines in the file -# if (!$lines || @{$lines} == 0) { -# return; -# } - + if ($ref->{file}) { # a file dbg('msg', "To be stored in $ref->{to}\n"); @@ -463,15 +435,13 @@ sub del_msg my $self = shift; # remove it from the active message list - @msg = map { $_ != $self ? $_ : () } @msg; - - # belt and braces (one day I will ask someone if this is REALLY necessary) - delete $self->{gotit}; - delete $self->{list}; + dbg('msg', "\@msg = " . scalar @msg . " before delete"); + @msg = grep { $_ != $self } @msg; # remove the file unlink filename($self->{msgno}); dbg('msg', "deleting $self->{msgno}\n"); + dbg('msg', "\@msg = " . scalar @msg . " after delete"); } # clean out old messages from the message queue @@ -480,18 +450,18 @@ sub clean_old my $ref; # mark old messages for deletion + dbg('msg', "\@msg = " . scalar @msg . " before delete"); foreach $ref (@msg) { - if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) { + if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) { $ref->{deleteme} = 1; - delete $ref->{gotit}; - delete $ref->{list}; unlink filename($ref->{msgno}); dbg('msg', "deleting old $ref->{msgno}\n"); } } # remove them all from the active message list - @msg = map { $_->{deleteme} ? () : $_ } @msg; + @msg = grep { !$_->{deleteme} } @msg; + dbg('msg', "\@msg = " . scalar @msg . " after delete"); $last_clean = $main::systime; } @@ -600,15 +570,13 @@ sub queue_msg my $call = shift; my $ref; my $clref; - my @nodelist = DXChannel::get_all_ak1a(); # bat down the message list looking for one that needs to go off site and whose # nearest node is not busy. dbg('msg', "queue msg ($sort)\n"); + my @nodelist = DXChannel::get_all_nodes; foreach $ref (@msg) { - # firstly, is it private and unread? if so can I find the recipient - # in my cluster node list offsite? # ignore 'delayed' messages until their waiting time has expired if (exists $ref->{waitt}) { @@ -616,8 +584,24 @@ sub queue_msg delete $ref->{waitt}; } + # any time outs? + if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { + my $node = $ref->{tonode}; + dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + Log('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + $ref->stop_msg($node); + + # delay any outgoing messages that fail + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; + delete $ref->{lastt}; + next; + } + + # firstly, is it private and unread? if so can I find the recipient + # in my cluster node list offsite? + # deal with routed private messages - my $noderef; + my $dxchan; if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here $clref = DXCluster->get_exact($ref->{to}); @@ -626,10 +610,10 @@ sub queue_msg my $hnode = $uref->homenode if $uref; $clref = DXCluster->get_exact($hnode) if $hnode; } - if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) { + if ($clref && !grep { $clref->dxchan == $_ } DXCommandmode::get_all()) { next if $clref->call eq $main::mycall; # i.e. it lives here - $noderef = $clref->{dxchan}; - $ref->start_msg($noderef) if !get_busy($noderef->call) && $noderef->state eq 'normal'; + $dxchan = $clref->dxchan; + $ref->start_msg($dxchan) if $dxchan && !get_busy($dxchan->call) && $dxchan->state eq 'normal'; } } @@ -638,13 +622,15 @@ sub queue_msg # the nodelist up above, if there are sites that haven't got it yet # then start sending it - what happens when we get loops is anyone's # guess, use (to, from, time, subject) tuple? - foreach $noderef (@nodelist) { - next if $noderef->call eq $main::mycall; - next if grep { $_ eq $noderef->call } @{$ref->{gotit}}; - next unless $ref->forward_it($noderef->call); # check the forwarding file + foreach $dxchan (@nodelist) { + my $call = $dxchan->call; + next unless $call; + next if $call eq $main::mycall; + next if ref $ref->{gotit} && grep $_ eq $call, @{$ref->{gotit}}; + next unless $ref->forward_it($call); # check the forwarding file # if we are here we have a node that doesn't have this message - $ref->start_msg($noderef) if !get_busy($noderef->call) && $noderef->state eq 'normal'; + $ref->start_msg($dxchan) if !get_busy($call) && $dxchan->state eq 'normal'; last; } @@ -674,7 +660,7 @@ sub start_msg my ($self, $dxchan) = @_; dbg('msg', "start msg $self->{msgno}\n"); - $self->{linesreq} = 5; + $self->{linesreq} = 10; $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; @@ -761,7 +747,7 @@ sub init @msg = (); for (sort @dir) { - next unless /^m\d+$/o; + next unless /^m\d\d\d\d\d\d$/; $ref = read_msg_header("$msgdir/$_"); unless ($ref) { @@ -772,7 +758,7 @@ sub init } # delete any messages to 'badmsg.pl' places - if (grep $ref->{to} eq $_, @badmsg) { + if ($ref->dump_it) { dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); $ref->del_msg; @@ -987,7 +973,6 @@ sub forward_it sub dump_it { my $ref = shift; - my $call = shift; my $i; for ($i = 0; $i < @badmsg; $i += 3) {