- my ($self, $line) = @_;
- my @f = split /[\^\~]/, $line;
- my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
-
- SWITCH: {
- if ($pcno == 28) { # incoming message
- my $t = cltounix($f[5], $f[6]);
- my $stream = next_transno($f[2]);
- my $ref = DXMsg->alloc($f[1], $f[2], $f[3], $f[4], $t, $f[7], $f[8], $f[10], $f[11], $f[13], $stream);
- dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
- $work{"$f[1]$f[2]$stream"} = $ref; # store in work
- $self->send(DXProt::pc30($f[2], $f[1], $stream));
- $ref->{count} = 0; # no of lines between PC31s
- last SWITCH;
- }
-
- if ($pcno == 29) { # incoming text
- my $ref = $work{"$f[1]$f[2]$f[3]"};
- if ($ref) {
- push @{$ref->{lines}}, $f[4];
- $ref->{count}++;
- if ($ref->{count} >= $ref->{linesreq}) {
- $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
- dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
- $ref->{count} = 0;
- }
- }
- last SWITCH;
- }
-
- if ($pcno == 30) {
- last SWITCH;
- }
-
- if ($pcno == 31) {
- last SWITCH;
- }
-
- if ($pcno == 32) { # incoming EOM
- dbg('msg', "stream $f[3]: EOM received\n");
- my $ref = $work{"$f[1]$f[2]$f[3]"};
- if ($ref) {
- $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
- $ref->store(); # store it (whatever that may mean)
- delete $work{"$f[1]$f[2]$f[3]"}; # remove the reference from the work vector
- }
- last SWITCH;
- }
-
- if ($pcno == 33) {
- last SWITCH;
- }
-
- if ($pcno == 40) { # this is a file request
- $f[3] =~ s/\\/\//og; # change the slashes
- $f[3] =~ s/\.//og; # remove dots
- $f[3] = lc $f[3]; # to lower case;
- dbg('msg', "incoming file $f[3]\n");
- last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|data\/msg)\//; # prevent access to executables
-
- # create any directories
- my @part = split /\//, $f[3];
- my $part;
- my $fn = "$main::root";
- pop @part; # remove last part
- foreach $part (@part) {
- $fn .= "/$part";
- next if -e $fn;
- last SWITCH if !mkdir $fn, 0777;
- dbg('msg', "created directory $fn\n");
- }
- my $stream = next_transno($f[2]);
- my $ref = DXMsg->alloc($f[1], $f[2], "$main::root/$f[3]", undef, time, !$f[4], undef, $f[5], 0, ' ', $stream);
- $ref->{file} = 1;
- $work{"$f[1]$f[2]$stream"} = $ref; # store in work
- $self->send(DXProt::pc30($f[2], $f[1], $stream));
- $ref->{count} = 0; # no of lines between PC31s
-
- last SWITCH;
- }
- }
+ my ($self, $line) = @_;
+
+ # this is periodic processing
+ if (!$self || !$line) {
+
+ # wander down the work queue stopping any messages that have timed out
+ for (keys %busy) {
+ my $node = $_;
+ my $ref = $busy{$_};
+ if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) {
+ $ref->stop_msg($node);
+
+ # delay any outgoing messages that fail
+ $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
+ }
+ }
+
+ # queue some message if the interval timer has gone off
+ if ($main::systime > $lastq + $queueinterval) {
+ queue_msg(0);
+ $lastq = $main::systime;
+ }
+
+ # clean the message queue
+ clean_old() if $main::systime - $last_clean > 3600 ;
+ return;
+ }
+
+ my @f = split /\^/, $line;
+ my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
+
+ SWITCH: {
+ if ($pcno == 28) { # incoming message
+
+ # first look for any messages in the busy queue
+ # and cancel them this should both resolve timed out incoming messages
+ # and crossing of message between nodes, incoming messages have priority
+ if (exists $busy{$f[2]}) {
+ my $ref = $busy{$f[2]};
+ my $tonode = $ref->{tonode};
+ $ref->stop_msg($self->call);
+ }
+
+ my $t = cltounix($f[5], $f[6]);
+ my $stream = next_transno($f[2]);
+ my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
+
+ # fill in various forwarding state variables
+ $ref->{fromnode} = $f[2];
+ $ref->{tonode} = $f[1];
+ $ref->{rrreq} = $f[11];
+ $ref->{linesreq} = $f[10];
+ $ref->{stream} = $stream;
+ $ref->{count} = 0; # no of lines between PC31s
+ dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
+ $work{"$f[2]$stream"} = $ref; # store in work
+ $busy{$f[2]} = $ref; # set interlock
+ $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
+ $ref->{lastt} = $main::systime;
+ last SWITCH;
+ }
+
+ if ($pcno == 29) { # incoming text
+ my $ref = $work{"$f[2]$f[3]"};
+ if ($ref) {
+ push @{$ref->{lines}}, $f[4];
+ $ref->{count}++;
+ if ($ref->{count} >= $ref->{linesreq}) {
+ $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
+ dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
+ $ref->{count} = 0;
+ }
+ $ref->{lastt} = $main::systime;
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 30) { # this is a incoming subject ack
+ my $ref = $work{$f[2]}; # note no stream at this stage
+ if ($ref) {
+ delete $work{$f[2]};
+ $ref->{stream} = $f[3];
+ $ref->{count} = 0;
+ $ref->{linesreq} = 5;
+ $work{"$f[2]$f[3]"} = $ref; # new ref
+ dbg('msg', "incoming subject ack stream $f[3]\n");
+ $busy{$f[2]} = $ref; # interlock
+ $ref->{lines} = [];
+ push @{$ref->{lines}}, ($ref->read_msg_body);
+ $ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
+ } else {
+ $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 31) { # acknowledge a tranche of lines
+ my $ref = $work{"$f[2]$f[3]"};
+ if ($ref) {
+ dbg('msg', "tranche ack stream $f[3]\n");
+ $ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
+ } else {
+ $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 32) { # incoming EOM
+ dbg('msg', "stream $f[3]: EOM received\n");
+ my $ref = $work{"$f[2]$f[3]"};
+ if ($ref) {
+ $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
+
+ # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
+ # store the file or message
+ # remove extraneous rubbish from the hash
+ # remove it from the work in progress vector
+ # stuff it on the msg queue
+ if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines
+ if ($ref->{file}) {
+ $ref->store($ref->{lines});
+ } else {
+
+ # does an identical message already exist?
+ my $m;
+ for $m (@msg) {
+ if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) {
+ $ref->stop_msg($self->call);
+ my $msgno = $m->{msgno};
+ dbg('msg', "duplicate message to $msgno\n");
+ Log('msg', "duplicate message to $msgno");
+ return;
+ }
+ }
+
+ # look for 'bad' to addresses
+ if (grep $ref->{to} eq $_, @badmsg) {
+ $ref->stop_msg($self->call);
+ dbg('msg', "'Bad' TO address $ref->{to}");
+ Log('msg', "'Bad' TO address $ref->{to}");
+ return;
+ }
+
+ $ref->{msgno} = next_transno("Msgno");
+ push @{$ref->{gotit}}, $f[2]; # mark this up as being received
+ $ref->store($ref->{lines});
+ add_dir($ref);
+ my $dxchan = DXChannel->get($ref->{to});
+ $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
+ Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
+ }
+ }
+ $ref->stop_msg($self->call);
+ } else {
+ $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
+ }
+ # queue_msg(0);
+ last SWITCH;
+ }
+
+ if ($pcno == 33) { # acknowledge the end of message
+ my $ref = $work{"$f[2]$f[3]"};
+ if ($ref) {
+ if ($ref->{private}) { # remove it if it private and gone off site#
+ Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
+ $ref->del_msg;
+ } else {
+ Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
+ push @{$ref->{gotit}}, $f[2]; # mark this up as being received
+ $ref->store($ref->{lines}); # re- store the file
+ }
+ $ref->stop_msg($self->call);
+ } else {
+ $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
+ }
+
+ # send next one if present
+ queue_msg(0);
+ last SWITCH;
+ }
+
+ if ($pcno == 40) { # this is a file request
+ $f[3] =~ s/\\/\//og; # change the slashes
+ $f[3] =~ s/\.//og; # remove dots
+ $f[3] =~ s/^\///o; # remove the leading /
+ $f[3] = lc $f[3]; # to lower case;
+ dbg('msg', "incoming file $f[3]\n");
+ $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
+
+ # create any directories
+ my @part = split /\//, $f[3];
+ my $part;
+ my $fn = "$main::root";
+ pop @part; # remove last part
+ foreach $part (@part) {
+ $fn .= "/$part";
+ next if -e $fn;
+ last SWITCH if !mkdir $fn, 0777;
+ dbg('msg', "created directory $fn\n");
+ }
+ my $stream = next_transno($f[2]);
+ my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
+
+ # forwarding variables
+ $ref->{fromnode} = $f[1];
+ $ref->{tonode} = $f[2];
+ $ref->{linesreq} = $f[5];
+ $ref->{stream} = $stream;
+ $ref->{count} = 0; # no of lines between PC31s
+ $ref->{file} = 1;
+ $work{"$f[2]$stream"} = $ref; # store in work
+ $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
+
+ last SWITCH;
+ }
+
+ if ($pcno == 42) { # abort transfer
+ dbg('msg', "stream $f[3]: abort received\n");
+ my $ref = $work{"$f[2]$f[3]"};
+ if ($ref) {
+ $ref->stop_msg($self->call);
+ $ref = undef;
+ }
+
+ last SWITCH;
+ }
+
+ if ($pcno == 49) { # global delete on subject
+ for (@msg) {
+ if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
+ $_->del_msg();
+ Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
+ DXProt::broadcast_ak1a($line, $self);
+ }
+ }
+ }
+ }