- my ($self, $line) = @_;
- my @f = split /[\^\~]/, $line;
- my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
-
- SWITCH: {
- if ($pcno == 28) { # incoming message
- my $t = cltounix($f[5], $f[6]);
- my $stream = next_transno($f[2]);
- my $ref = DXMsg->alloc($f[1], $f[2], $f[3], $f[4], $t, $f[7], $f[8], $f[10], $f[11], $f[13], $stream);
- dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
- $work{"$f[1]$f[2]$stream"} = $ref; # store in work
- $self->send(DXProt::pc30($f[2], $f[1], $stream));
- $ref->{count} = 0; # no of lines between PC31s
- last SWITCH;
- }
-
- if ($pcno == 29) { # incoming text
- my $ref = $work{"$f[1]$f[2]$f[3]"};
- if ($ref) {
- push @{$ref->{lines}}, $f[4];
- $ref->{count}++;
- if ($ref->{count} >= $ref->{linesreq}) {
- $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
- dbg('msg', "stream $f[3]: $ref->{linereq} lines received\n");
- $ref->{count} = 0;
- }
- }
- last SWITCH;
- }
-
- if ($pcno == 30) {
- last SWITCH;
- }
-
- if ($pcno == 31) {
- last SWITCH;
- }
-
- if ($pcno == 32) { # incoming EOM
- dbg('msg', "stream $f[3]: EOM received\n");
- my $ref = $work{"$f[1]$f[2]$f[3]"};
- if ($ref) {
- $self->send(DXProt::pc33($f[2], $f[1], $f[3]));# acknowledge it
- $ref->store(); # store it (whatever that may mean)
- delete $work{"$f[1]$f[2]$f[3]"}; # remove the reference from the work vector
- }
- last SWITCH;
- }
-
- if ($pcno == 33) {
- last SWITCH;
- }
-
- if ($pcno == 40) { # this is a file request
- $f[3] =~ s/\\/\//og; # change the slashes
- $f[3] =~ s/\.//og; # remove dots
- $f[3] = lc $f[3]; # to lower case;
- dbg('msg', "incoming file $f[3]\n");
- last SWITCH if $f[3] =~ /^\/(perl|cmd|local_cmd|src|lib|include|sys|data\/msg)\//; # prevent access to executables
-
- # create any directories
- my @part = split /\//, $f[3];
- my $part;
- my $fn = "$main::root";
- pop @part; # remove last part
- foreach $part (@part) {
- $fn .= "/$part";
- next if -e $fn;
- last SWITCH if !mkdir $fn, 0777;
- dbg('msg', "created directory $fn\n");
- }
- my $stream = next_transno($f[2]);
- my $ref = DXMsg->alloc($f[1], $f[2], "$main::root/$f[3]", undef, time, !$f[4], undef, $f[5], 0, ' ', $stream);
- $ref->{file} = 1;
- $work{"$f[1]$f[2]$stream"} = $ref; # store in work
- $self->send(DXProt::pc30($f[2], $f[1], $stream));
- $ref->{count} = 0; # no of lines between PC31s
-
- last SWITCH;
- }
- }
+ my ($self, $line) = @_;
+
+ # this is periodic processing
+ if (!$self || !$line) {
+
+ if ($main::systime >= $lastq + $queueinterval) {
+
+ # queue some message if the interval timer has gone off
+ queue_msg(0);
+
+ # import any messages in the import directory
+ import_msgs();
+
+ $lastq = $main::systime;
+ }
+
+ # clean the message queue
+ clean_old() if $main::systime - $last_clean > 3600 ;
+ $last_clean = $main::systime;
+ return;
+ }
+
+ my @f = split /\^/, $line;
+ my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
+ my ($tonode, $fromnode) = @f[1, 2];
+ my $stream = $f[3] if ($pcno >= 29 && $pcno <= 33) || $pcno == 42;
+
+ SWITCH: {
+ if ($pcno == 28) { # incoming message
+
+ # sort out various extant protocol errors that occur
+ my $origin = $f[13];
+ $origin = $self->call unless $origin && $origin gt ' ';
+
+ # first look for any messages in the busy queue
+ # and cancel them this should both resolve timed out incoming messages
+ # and crossing of message between nodes, incoming messages have priority
+
+ my $ref = get_busy($fromnode);
+ if ($ref) {
+ my $otonode = $ref->{tonode} || "unknown";
+ dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg');
+ $ref->stop_msg($fromnode);
+ }
+
+ my $t = cltounix($f[5], $f[6]);
+ $stream = next_transno($fromnode);
+ $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]);
+
+ # fill in various forwarding state variables
+ $ref->{fromnode} = $fromnode;
+ $ref->{tonode} = $tonode;
+ $ref->{rrreq} = $f[11];
+ $ref->{linesreq} = $f[10];
+ $ref->{stream} = $stream;
+ $ref->{count} = 0; # no of lines between PC31s
+ dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg');
+ Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" );
+ set_fwq($fromnode, $stream, $ref); # store in work
+ set_busy($fromnode, $ref); # set interlock
+ $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
+ $ref->{lastt} = $main::systime;
+
+ # look to see whether this is a non private message sent to a known callsign
+ my $uref = DXUser->get_current($ref->{to});
+ if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
+ $ref->{private} = 1;
+ dbg("set bull to $ref->{to} to private") if isdbg('msg');
+ Log('msg', "set bull to $ref->{to} to private");
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 29) { # incoming text
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ $f[4] =~ s/\%5E/^/g;
+ if (@{$ref->{lines}}) {
+ push @{$ref->{lines}}, $f[4];
+ } else {
+ # temporarily store any R: lines so that we end up with
+ # only the first and last ones stored.
+ if ($f[4] =~ m|^R:\d{6}/\d{4}|) {
+ push @{$ref->{tempr}}, $f[4];
+ } else {
+ if (exists $ref->{tempr}) {
+ push @{$ref->{lines}}, shift @{$ref->{tempr}};
+ push @{$ref->{lines}}, pop @{$ref->{tempr}} if @{$ref->{tempr}};
+ delete $ref->{tempr};
+ }
+ push @{$ref->{lines}}, $f[4];
+ }
+ }
+ $ref->{count}++;
+ if ($ref->{count} >= $ref->{linesreq}) {
+ $self->send(DXProt::pc31($fromnode, $tonode, $stream));
+ dbg("stream $stream: $ref->{count} lines received\n") if isdbg('msg');
+ $ref->{count} = 0;
+ }
+ $ref->{lastt} = $main::systime;
+ } else {
+ dbg("PC29 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 30) { # this is a incoming subject ack
+ my $ref = get_fwq($fromnode); # note no stream at this stage
+ if ($ref) {
+ del_fwq($fromnode);
+ $ref->{stream} = $stream;
+ $ref->{count} = 0;
+ $ref->{linesreq} = 5;
+ set_fwq($fromnode, $stream, $ref); # new ref
+ set_busy($fromnode, $ref); # interlock
+ dbg("incoming subject ack stream $stream\n") if isdbg('msg');
+ $ref->{lines} = [ $ref->read_msg_body ];
+ $ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
+ } else {
+ dbg("PC30 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 31) { # acknowledge a tranche of lines
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ dbg("tranche ack stream $stream\n") if isdbg('msg');
+ $ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
+ } else {
+ dbg("PC31 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 32) { # incoming EOM
+ dbg("stream $stream: EOM received\n") if isdbg('msg');
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ $self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it
+
+ # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
+ # store the file or message
+ # remove extraneous rubbish from the hash
+ # remove it from the work in progress vector
+ # stuff it on the msg queue
+ if ($ref->{lines}) {
+ if ($ref->{file}) {
+ $ref->store($ref->{lines});
+ } else {
+
+ # does an identical message already exist?
+ my $m;
+ for $m (@msg) {
+ if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
+ $ref->stop_msg($fromnode);
+ my $msgno = $m->{msgno};
+ dbg("duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno") if isdbg('msg');
+ Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to msg: $msgno");
+ return;
+ }
+ }
+
+ # swop addresses
+ $ref->swop_it($self->call);
+
+ # look for 'bad' to addresses
+ if ($ref->dump_it($self->call)) {
+ $ref->stop_msg($fromnode);
+ dbg("'Bad' message $ref->{to}") if isdbg('msg');
+ Log('msg', "'Bad' message $ref->{to}");
+ return;
+ }
+
+ # check the message for bad words
+ my @words;
+ for (@{$ref->{lines}}) {
+ push @words, BadWords::check($_);
+ }
+ push @words, BadWords::check($ref->{subject});
+ if (@words) {
+ dbg("$ref->{from} swore: '@words' -> $ref->{to} '$ref->{subject}' origin: $ref->{origin} via " . $self->call) if isdbg('msg');
+ Log('msg',"$ref->{from} swore: '@words' -> $ref->{to} origin: $ref->{origin} via " . $self->call);
+ Log('msg',"subject: $ref->{subject}");
+ for (@{$ref->{lines}}) {
+ Log('msg', "line: $_");
+ }
+ $ref->stop_msg($fromnode);
+ return;
+ }
+
+ $ref->{msgno} = next_transno("Msgno");
+ push @{$ref->{gotit}}, $fromnode; # mark this up as being received
+ $ref->store($ref->{lines});
+ add_dir($ref);
+ my $dxchan = DXChannel->get($ref->{to});
+ $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
+ Log('msg', "Message $ref->{msgno} from $ref->{from} received from $fromnode for $ref->{to}");
+ }
+ }
+ $ref->stop_msg($fromnode);
+ } else {
+ dbg("PC32 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+ # queue_msg(0);
+ last SWITCH;
+ }
+
+ if ($pcno == 33) { # acknowledge the end of message
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ if ($ref->{private}) { # remove it if it private and gone off site#
+ Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted");
+ $ref->mark_delete;
+ } else {
+ Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode");
+ push @{$ref->{gotit}}, $fromnode; # mark this up as being received
+ $ref->store($ref->{lines}); # re- store the file
+ }
+ $ref->stop_msg($fromnode);
+ } else {
+ dbg("PC33 from unknown stream $stream from $fromnode") if isdbg('msg');
+ $self->send(DXProt::pc42($fromnode, $tonode, $stream)); # unknown stream
+ }
+
+ # send next one if present
+ queue_msg(0);
+ last SWITCH;
+ }
+
+ if ($pcno == 40) { # this is a file request
+ $f[3] =~ s/\\/\//og; # change the slashes
+ $f[3] =~ s/\.//og; # remove dots
+ $f[3] =~ s/^\///o; # remove the leading /
+ $f[3] = lc $f[3]; # to lower case;
+ dbg("incoming file $f[3]\n") if isdbg('msg');
+ $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
+
+ # create any directories
+ my @part = split /\//, $f[3];
+ my $part;
+ my $fn = "$main::root";
+ pop @part; # remove last part
+ foreach $part (@part) {
+ $fn .= "/$part";
+ next if -e $fn;
+ last SWITCH if !mkdir $fn, 0777;
+ dbg("created directory $fn\n") if isdbg('msg');
+ }
+ my $stream = next_transno($fromnode);
+ my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
+
+ # forwarding variables
+ $ref->{fromnode} = $tonode;
+ $ref->{tonode} = $fromnode;
+ $ref->{linesreq} = $f[5];
+ $ref->{stream} = $stream;
+ $ref->{count} = 0; # no of lines between PC31s
+ $ref->{file} = 1;
+ $ref->{lastt} = $main::systime;
+ set_fwq($fromnode, $stream, $ref); # store in work
+ $self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
+
+ last SWITCH;
+ }
+
+ if ($pcno == 42) { # abort transfer
+ dbg("stream $stream: abort received\n") if isdbg('msg');
+ my $ref = get_fwq($fromnode, $stream);
+ if ($ref) {
+ $ref->stop_msg($fromnode);
+ $ref = undef;
+ }
+ last SWITCH;
+ }
+
+ if ($pcno == 49) { # global delete on subject
+ for (@msg) {
+ if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
+ $_->mark_delete;
+ Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
+ DXChannel::broadcast_nodes($line, $self);
+ }
+ }
+ }
+ }