keep => '0,Keep this?,yesno',
lastt => '5,Last processed,cldatetime',
waitt => '5,Wait until,cldatetime',
+ delete => '5,Awaiting Delete,yesno',
);
# allocate a new object
return $self;
}
-sub workclean
-{
- my $ref = shift;
- delete $ref->{lines};
- delete $ref->{linesreq};
- delete $ref->{tonode};
- delete $ref->{fromnode};
- delete $ref->{stream};
- delete $ref->{file};
- delete $ref->{count};
- delete $ref->{lastt} if exists $ref->{lastt};
- delete $ref->{waitt} if exists $ref->{waitt};
-}
sub process
{
# and cancel them this should both resolve timed out incoming messages
# and crossing of message between nodes, incoming messages have priority
- if (exists $busy{$fromnode}) {
- my $ref = $busy{$fromnode};
+ my $ref = get_busy($fromnode);
+ if ($ref) {
my $otonode = $ref->{tonode} || "unknown";
dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg');
$ref->stop_msg($fromnode);
my $t = cltounix($f[5], $f[6]);
$stream = next_transno($fromnode);
- my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]);
+ $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]);
# fill in various forwarding state variables
$ref->{fromnode} = $fromnode;
$ref->{count} = 0; # no of lines between PC31s
dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg');
Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" );
- $work{"$fromnode,$stream"} = $ref; # store in work
- $busy{$fromnode} = $ref; # set interlock
+ set_fwq($fromnode, $stream, $ref); # store in work
+ set_busy($fromnode, $ref); # set interlock
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
$ref->{lastt} = $main::systime;
}
if ($pcno == 29) { # incoming text
- my $ref = $work{"$fromnode,$stream"};
+ my $ref = get_fwq($fromnode, $stream);
if ($ref) {
$f[4] =~ s/\%5E/^/g;
push @{$ref->{lines}}, $f[4];
}
if ($pcno == 30) { # this is a incoming subject ack
- my $ref = $work{"$fromnode,"}; # note no stream at this stage
+ my $ref = get_fwq($fromnode); # note no stream at this stage
if ($ref) {
- delete $work{"$fromnode,"};
+ del_fwq($fromnode);
$ref->{stream} = $stream;
$ref->{count} = 0;
$ref->{linesreq} = 5;
- $work{"$fromnode,$stream"} = $ref; # new ref
+ set_fwq($fromnode, $stream, $ref); # new ref
+ set_busy($fromnode, $ref); # interlock
dbg("incoming subject ack stream $stream\n") if isdbg('msg');
- $busy{$fromnode} = $ref; # interlock
- push @{$ref->{lines}}, ($ref->read_msg_body);
+ $ref->{lines} = [ $ref->read_msg_body ];
$ref->send_tranche($self);
$ref->{lastt} = $main::systime;
} else {
}
if ($pcno == 31) { # acknowledge a tranche of lines
- my $ref = $work{"$fromnode,$stream"};
+ my $ref = get_fwq($fromnode, $stream);
if ($ref) {
dbg("tranche ack stream $stream\n") if isdbg('msg');
$ref->send_tranche($self);
if ($pcno == 32) { # incoming EOM
dbg("stream $stream: EOM received\n") if isdbg('msg');
- my $ref = $work{"$fromnode,$stream"};
+ my $ref = get_fwq($fromnode, $stream);
if ($ref) {
$self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it
}
if ($pcno == 33) { # acknowledge the end of message
- my $ref = $work{"$fromnode,$stream"};
+ my $ref = get_fwq($fromnode, $stream);
if ($ref) {
if ($ref->{private}) { # remove it if it private and gone off site#
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted");
- $ref->del_msg;
+ $ref->{delete}++;
} else {
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode");
push @{$ref->{gotit}}, $fromnode; # mark this up as being received
$ref->{count} = 0; # no of lines between PC31s
$ref->{file} = 1;
$ref->{lastt} = $main::systime;
- $work{"$fromnode,$stream"} = $ref; # store in work
+ set_fwq($fromnode, $stream, $ref); # store in work
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
last SWITCH;
if ($pcno == 42) { # abort transfer
dbg("stream $stream: abort received\n") if isdbg('msg');
- my $ref = $work{"$fromnode,$stream"};
+ my $ref = get_fwq($fromnode, $stream);
if ($ref) {
$ref->stop_msg($fromnode);
$ref = undef;
if ($pcno == 49) { # global delete on subject
for (@msg) {
if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
- $_->del_msg();
+ $_->{delete}++;
Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
DXChannel::broadcast_nodes($line, $self);
}
sub del_msg
{
my $self = shift;
+ my $dxchan = shift;
- # remove it from the active message list
- dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg');
- @msg = grep { $_ != $self } @msg;
-
- # remove the file
- unlink filename($self->{msgno});
- dbg("deleting $self->{msgno}\n") if isdbg('msg');
- dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg');
+ if ($self->{tonode}) {
+ $self->{delete}++;
+ } else {
+ my $call;
+ if ($dxchan) {
+ $call = " by " . $dxchan->call;
+ } else {
+ $call = '';
+ }
+
+ # remove it from the active message list
+ @msg = grep { $_ != $self } @msg;
+
+ # remove the file
+ unlink filename($self->{msgno});
+ }
}
# clean out old messages from the message queue
if (!open($file, $fn)) {
dbg("Error reading $fn $!");
Log('err' ,"Error reading $fn $!");
- return undef;
+ return ();
}
@out = map {chomp; $_} <$file>;
close($file);
next;
}
+ # is it being sent anywhere currently?
+ next if $ref->{tonode}; # ignore it if it already being processed
+
+ # is it awaiting deletion?
+ if ($ref->{delete}) {
+ $ref->del_msg;
+ next;
+ }
+
# firstly, is it private and unread? if so can I find the recipient
# in my cluster node list offsite?
my $dxchan;
if ($ref->{private}) {
next if $ref->{'read'}; # if it is read, it is stuck here
- next if $ref->{tonode}; # ignore it if it already being processed
$clref = Route::get($ref->{to});
if ($clref) {
$dxchan = $clref->dxchan;
$self->{count} = 0;
$self->{tonode} = $dxchan->call;
$self->{fromnode} = $main::mycall;
- $busy{$self->{tonode}} = $self;
- $work{"$self->{tonode},"} = $self;
+ set_busy($self->{tonode}, $self);
+ set_fwq($self->{tonode}, undef, $self);
$self->{lastt} = $main::systime;
my ($fromnode, $origin);
$fromnode = $self->{fromnode};
return $busy{$call};
}
-# get the busy queue
+sub set_busy
+{
+ my $call = shift;
+ return $busy{$call} = shift;
+}
+
+sub del_busy
+{
+ my $call = shift;
+ return delete $busy{$call};
+}
+
+# get the whole busy queue
sub get_all_busy
{
return keys %busy;
{
my $self = shift;
my $node = shift;
- my $stream = $self->{stream} if exists $self->{stream};
+ my $stream = $self->{stream};
dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg');
- delete $work{"$node,"};
- delete $work{"$node,$stream"} if $stream;
+ del_fwq($node, $stream);
$self->workclean;
- delete $busy{$node};
+ del_busy($node);
+}
+
+sub workclean
+{
+ my $ref = shift;
+ delete $ref->{lines};
+ delete $ref->{linesreq};
+ delete $ref->{tonode};
+ delete $ref->{fromnode};
+ delete $ref->{stream};
+ delete $ref->{file};
+ delete $ref->{count};
+ delete $ref->{lastt} if exists $ref->{lastt};
+ delete $ref->{waitt} if exists $ref->{waitt};
}
# get a new transaction number from the file specified
sub dir
{
my $ref = shift;
+ my $flag = $ref->read ? '-' : ' ';
+ $flag = 'D' if $ref->delete;
return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s",
- $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
+ $ref->msgno, $flag, $ref->private ? 'p' : ' ', $ref->size,
$ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
}