From 1283d7c748715a1bf85a781fe4b66f427d949ae9 Mon Sep 17 00:00:00 2001 From: minima Date: Tue, 2 Oct 2001 15:02:16 +0000 Subject: [PATCH] restructure all the busy/fwq stuff and hide them all in subroutines upissue version no --- Changes | 1 + cmd/kill.pl | 2 +- perl/DXMsg.pm | 130 ++++++++++++++++++++++++++++++------------------ perl/cluster.pl | 2 +- 4 files changed, 84 insertions(+), 51 deletions(-) diff --git a/Changes b/Changes index e10e1b04..1dd680fb 100644 --- a/Changes +++ b/Changes @@ -1,6 +1,7 @@ 02Oct01======================================================================= 1. fixed the 'ever lengthening msg' syndrome and probably made the whole thing more reliable at the same time. +2. do some restructuring of the msg system and upissue the version no. 01Oct01======================================================================= 1. made login info not the default 2. change the looping protoection for PC16/17/19/21 and also be more rigorous diff --git a/cmd/kill.pl b/cmd/kill.pl index de533bdc..b98a42f6 100644 --- a/cmd/kill.pl +++ b/cmd/kill.pl @@ -68,7 +68,7 @@ foreach $ref ( @refs) { } my $tonode = $ref->tonode; $ref->stop_msg($tonode) if $tonode; - $ref->del_msg; + $ref->del_msg($self); push @out, $self->msg('m12', $ref->msgno); } diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index e3ea16ad..f66f98d7 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -86,6 +86,7 @@ $importfn = "$msgdir/import"; # import directory keep => '0,Keep this?,yesno', lastt => '5,Last processed,cldatetime', waitt => '5,Wait until,cldatetime', + delete => '5,Awaiting Delete,yesno', ); # allocate a new object @@ -115,19 +116,6 @@ sub alloc return $self; } -sub workclean -{ - my $ref = shift; - delete $ref->{lines}; - delete $ref->{linesreq}; - delete $ref->{tonode}; - delete $ref->{fromnode}; - delete $ref->{stream}; - delete $ref->{file}; - delete $ref->{count}; - delete $ref->{lastt} if exists $ref->{lastt}; - delete $ref->{waitt} if exists $ref->{waitt}; -} sub process { @@ -168,8 +156,8 @@ sub process # and cancel them this should both resolve timed out incoming messages # and crossing of message between nodes, incoming messages have priority - if (exists $busy{$fromnode}) { - my $ref = $busy{$fromnode}; + my $ref = get_busy($fromnode); + if ($ref) { my $otonode = $ref->{tonode} || "unknown"; dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg'); $ref->stop_msg($fromnode); @@ -177,7 +165,7 @@ sub process my $t = cltounix($f[5], $f[6]); $stream = next_transno($fromnode); - my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]); + $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]); # fill in various forwarding state variables $ref->{fromnode} = $fromnode; @@ -188,8 +176,8 @@ sub process $ref->{count} = 0; # no of lines between PC31s dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg'); Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" ); - $work{"$fromnode,$stream"} = $ref; # store in work - $busy{$fromnode} = $ref; # set interlock + set_fwq($fromnode, $stream, $ref); # store in work + set_busy($fromnode, $ref); # set interlock $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack $ref->{lastt} = $main::systime; @@ -204,7 +192,7 @@ sub process } if ($pcno == 29) { # incoming text - my $ref = $work{"$fromnode,$stream"}; + my $ref = get_fwq($fromnode, $stream); if ($ref) { $f[4] =~ s/\%5E/^/g; push @{$ref->{lines}}, $f[4]; @@ -223,16 +211,16 @@ sub process } if ($pcno == 30) { # this is a incoming subject ack - my $ref = $work{"$fromnode,"}; # note no stream at this stage + my $ref = get_fwq($fromnode); # note no stream at this stage if ($ref) { - delete $work{"$fromnode,"}; + del_fwq($fromnode); $ref->{stream} = $stream; $ref->{count} = 0; $ref->{linesreq} = 5; - $work{"$fromnode,$stream"} = $ref; # new ref + set_fwq($fromnode, $stream, $ref); # new ref + set_busy($fromnode, $ref); # interlock dbg("incoming subject ack stream $stream\n") if isdbg('msg'); - $busy{$fromnode} = $ref; # interlock - push @{$ref->{lines}}, ($ref->read_msg_body); + $ref->{lines} = [ $ref->read_msg_body ]; $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { @@ -243,7 +231,7 @@ sub process } if ($pcno == 31) { # acknowledge a tranche of lines - my $ref = $work{"$fromnode,$stream"}; + my $ref = get_fwq($fromnode, $stream); if ($ref) { dbg("tranche ack stream $stream\n") if isdbg('msg'); $ref->send_tranche($self); @@ -257,7 +245,7 @@ sub process if ($pcno == 32) { # incoming EOM dbg("stream $stream: EOM received\n") if isdbg('msg'); - my $ref = $work{"$fromnode,$stream"}; + my $ref = get_fwq($fromnode, $stream); if ($ref) { $self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it @@ -330,11 +318,11 @@ sub process } if ($pcno == 33) { # acknowledge the end of message - my $ref = $work{"$fromnode,$stream"}; + my $ref = get_fwq($fromnode, $stream); if ($ref) { if ($ref->{private}) { # remove it if it private and gone off site# Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted"); - $ref->del_msg; + $ref->{delete}++; } else { Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode"); push @{$ref->{gotit}}, $fromnode; # mark this up as being received @@ -381,7 +369,7 @@ sub process $ref->{count} = 0; # no of lines between PC31s $ref->{file} = 1; $ref->{lastt} = $main::systime; - $work{"$fromnode,$stream"} = $ref; # store in work + set_fwq($fromnode, $stream, $ref); # store in work $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack last SWITCH; @@ -389,7 +377,7 @@ sub process if ($pcno == 42) { # abort transfer dbg("stream $stream: abort received\n") if isdbg('msg'); - my $ref = $work{"$fromnode,$stream"}; + my $ref = get_fwq($fromnode, $stream); if ($ref) { $ref->stop_msg($fromnode); $ref = undef; @@ -400,7 +388,7 @@ sub process if ($pcno == 49) { # global delete on subject for (@msg) { if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) { - $_->del_msg(); + $_->{delete}++; Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted"); DXChannel::broadcast_nodes($line, $self); } @@ -466,15 +454,24 @@ sub store sub del_msg { my $self = shift; + my $dxchan = shift; - # remove it from the active message list - dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); - @msg = grep { $_ != $self } @msg; - - # remove the file - unlink filename($self->{msgno}); - dbg("deleting $self->{msgno}\n") if isdbg('msg'); - dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); + if ($self->{tonode}) { + $self->{delete}++; + } else { + my $call; + if ($dxchan) { + $call = " by " . $dxchan->call; + } else { + $call = ''; + } + + # remove it from the active message list + @msg = grep { $_ != $self } @msg; + + # remove the file + unlink filename($self->{msgno}); + } } # clean out old messages from the message queue @@ -565,7 +562,7 @@ sub read_msg_body if (!open($file, $fn)) { dbg("Error reading $fn $!"); Log('err' ,"Error reading $fn $!"); - return undef; + return (); } @out = map {chomp; $_} <$file>; close($file); @@ -629,6 +626,15 @@ sub queue_msg next; } + # is it being sent anywhere currently? + next if $ref->{tonode}; # ignore it if it already being processed + + # is it awaiting deletion? + if ($ref->{delete}) { + $ref->del_msg; + next; + } + # firstly, is it private and unread? if so can I find the recipient # in my cluster node list offsite? @@ -636,7 +642,6 @@ sub queue_msg my $dxchan; if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here - next if $ref->{tonode}; # ignore it if it already being processed $clref = Route::get($ref->{to}); if ($clref) { $dxchan = $clref->dxchan; @@ -703,8 +708,8 @@ sub start_msg $self->{count} = 0; $self->{tonode} = $dxchan->call; $self->{fromnode} = $main::mycall; - $busy{$self->{tonode}} = $self; - $work{"$self->{tonode},"} = $self; + set_busy($self->{tonode}, $self); + set_fwq($self->{tonode}, undef, $self); $self->{lastt} = $main::systime; my ($fromnode, $origin); $fromnode = $self->{fromnode}; @@ -719,7 +724,19 @@ sub get_busy return $busy{$call}; } -# get the busy queue +sub set_busy +{ + my $call = shift; + return $busy{$call} = shift; +} + +sub del_busy +{ + my $call = shift; + return delete $busy{$call}; +} + +# get the whole busy queue sub get_all_busy { return keys %busy; @@ -760,14 +777,27 @@ sub stop_msg { my $self = shift; my $node = shift; - my $stream = $self->{stream} if exists $self->{stream}; + my $stream = $self->{stream}; dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg'); - delete $work{"$node,"}; - delete $work{"$node,$stream"} if $stream; + del_fwq($node, $stream); $self->workclean; - delete $busy{$node}; + del_busy($node); +} + +sub workclean +{ + my $ref = shift; + delete $ref->{lines}; + delete $ref->{linesreq}; + delete $ref->{tonode}; + delete $ref->{fromnode}; + delete $ref->{stream}; + delete $ref->{file}; + delete $ref->{count}; + delete $ref->{lastt} if exists $ref->{lastt}; + delete $ref->{waitt} if exists $ref->{waitt}; } # get a new transaction number from the file specified @@ -973,8 +1003,10 @@ sub do_send_stuff sub dir { my $ref = shift; + my $flag = $ref->read ? '-' : ' '; + $flag = 'D' if $ref->delete; return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", - $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size, + $ref->msgno, $flag, $ref->private ? 'p' : ' ', $ref->size, $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject; } diff --git a/perl/cluster.pl b/perl/cluster.pl index be78ffda..8798392b 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -105,7 +105,7 @@ use vars qw(@inqueue $systime $version $starttime $lockfn @outstanding_connects @inqueue = (); # the main input queue, an array of hashes $systime = 0; # the time now (in seconds) -$version = "1.48"; # the version no of the software +$version = "1.49"; # the version no of the software $starttime = 0; # the starting time of the cluster #@outstanding_connects = (); # list of outstanding connects @listeners = (); # list of listeners -- 2.34.1