From f0ac8322367c66080b6dbb74da4de72dae126dc3 Mon Sep 17 00:00:00 2001 From: djk Date: Sun, 27 Jun 1999 11:01:31 +0000 Subject: [PATCH] 1. added WWV filtering 2. added timeouts to for forwarding Messages, also added a waiting time for failed (stalled) outgoing messages. 3. Incoming messages will now have priority over outgoing messages to the same node. 4. Added 'catchup' command which will 'catchup' messages to date for a node. This means that when you start forwarding to a node, it doesn't get all the messages queued up that are probably old. --- Changes | 9 ++++ cmd/catchup.pl | 18 ++++++++ cmd/reply.pl | 15 ++++--- cmd/send.pl | 11 ++--- cmd/uncatchup.pl | 18 ++++++++ filter/wwv/GB7MBC.pl.issue | 22 ++++++++++ perl/DXMsg.pm | 85 ++++++++++++++++++++++++++++++-------- perl/DXProt.pm | 49 +++++++++++++++++++--- perl/Messages | 13 +++++- perl/cluster.pl | 1 + 10 files changed, 205 insertions(+), 36 deletions(-) create mode 100644 cmd/catchup.pl create mode 100644 cmd/uncatchup.pl create mode 100644 filter/wwv/GB7MBC.pl.issue diff --git a/Changes b/Changes index 7955dafe..d154f217 100644 --- a/Changes +++ b/Changes @@ -1,3 +1,12 @@ +27Jun99======================================================================= +1. added WWV filtering +2. added timeouts to for forwarding Messages, also added a waiting time for +failed (stalled) outgoing messages. +3. Incoming messages will now have priority over outgoing messages to the +same node. +4. Added 'catchup' command which will 'catchup' messages to date for a node. +This means that when you start forwarding to a node, it doesn't get all the +messages queued up that are probably old. 21Jun99======================================================================= 1. changed regex for cluster->client msgs so that strings like |---| are no longer ignored. diff --git a/cmd/catchup.pl b/cmd/catchup.pl new file mode 100644 index 00000000..4cda1203 --- /dev/null +++ b/cmd/catchup.pl @@ -0,0 +1,18 @@ +# +# catchup some or all of the non-private messages for a node. +# +# in other words mark all messages as being already received +# by this node. +# +# $Id$ +# +# Copyright (c) 1999 Dirk Koopman G1TLH +# + +my ($self, $line) = @_; +my @f = split /\s+/, $line; +my $call = uc shift @f; +my @out; + + +return (1, @out); diff --git a/cmd/reply.pl b/cmd/reply.pl index c5ceaddf..c8ef615d 100644 --- a/cmd/reply.pl +++ b/cmd/reply.pl @@ -57,13 +57,13 @@ if ($self->state eq "prompt") { $oref = DXMsg::get($f[$i]); if (!$oref) { delete $self->{loc}; - return (0, "can't access message $i"); + return (1, $self->msg('m4', $i)); } } else { if (!($oref = DXMsg::get($self->lastread))) { delete $self->{loc}; - #return (0, $self->msg('esend2')); - return (0, "need a message number"); + return (1, $self->msg('m5')); + #return (1, "need a message number"); } } @@ -78,9 +78,12 @@ if ($self->state eq "prompt") { $self->func("DXMsg::do_send_stuff"); $self->state('sendbody'); #push @out, $self->msg('sendsubj'); - push @out, "Reply to: $to"; - push @out, "Subject : $loc->{subject}"; - push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; +# push @out, "Reply to: $to"; +# push @out, "Subject : $loc->{subject}"; +# push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; + push @out, $self->msg('m6', $to); + push @out, $self->msg('m7', $loc->{subject}); + push @out, $self->msg('m8'); } return (1, @out); diff --git a/cmd/send.pl b/cmd/send.pl index 2ebca2a9..ade1dce0 100644 --- a/cmd/send.pl +++ b/cmd/send.pl @@ -68,8 +68,8 @@ if ($self->state eq "prompt") { push @list, $oref->read_msg_body(); $nref->store(\@list); $nref->add_dir(); - #push @out, $self->msg('sendcc', $oref->msgno, $f[$i]); - push @out, "copy of msg $oref->{msgno} sent to $to"; + push @out, $self->msg('m2', $oref->msgno, $to); +# push @out, "copy of msg $oref->{msgno} sent to $to"; } DXMsg::queue_msg(); return (1, @out); @@ -108,7 +108,8 @@ if ($self->state eq "prompt") { foreach $t (@f[ $i..$#f ]) { $t = uc $t; if (grep $_ eq $t, @DXMsg::badmsg) { - push @out, "Sorry, $t is an unacceptable TO address"; +# push @out, "Sorry, $t is an unacceptable TO address"; + push @out, $self->msg('m3', $t); } else { push @to, $t; } @@ -123,8 +124,8 @@ if ($self->state eq "prompt") { # keep calling me for every line until I relinquish control $self->func("DXMsg::do_send_stuff"); $self->state('send1'); - #push @out, $self->msg('sendsubj'); - push @out, "Enter Subject (30 characters) >"; + push @out, $self->msg('m1'); + #push @out, "Enter Subject (30 characters) >"; } return (1, @out); diff --git a/cmd/uncatchup.pl b/cmd/uncatchup.pl new file mode 100644 index 00000000..15edb1a4 --- /dev/null +++ b/cmd/uncatchup.pl @@ -0,0 +1,18 @@ +# +# uncatchup some or all of the non-private messages for a node. +# +# in other words mark messages as NOT being already received +# by this node. +# +# $Id$ +# +# Copyright (c) 1999 Dirk Koopman G1TLH +# + +my ($self, $line) = @_; +my @f = split /\s+/, $line; +my $call = uc shift @f; +my @out; + + +return (1, @out); diff --git a/filter/wwv/GB7MBC.pl.issue b/filter/wwv/GB7MBC.pl.issue new file mode 100644 index 00000000..7f3dfc1a --- /dev/null +++ b/filter/wwv/GB7MBC.pl.issue @@ -0,0 +1,22 @@ +# +# This is an example WWV filter +# +# The element list is:- +# 0 - nominal unix date of spot (ie the day + hour:13) +# 1 - the hour +# 2 - SFI +# 3 - K +# 4 - I +# 5 - text +# 6 - spotter +# 7 - origin +# 8 - incoming interface callsign +# +# this one doesn't filter, it just sets the hop count to 6 and is +# used mainly just to override any isolation from WWV coming from +# the internet. + +$in = [ + [ 1, 0, 'd', 0, 6 ] +]; + diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index 13af2cc0..c1f0ae7a 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -32,7 +32,7 @@ use Carp; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean - @badmsg $badmsgfn $forwardfn @forward); + @badmsg $badmsgfn $forwardfn @forward $timeout $waittime); %work = (); # outstanding jobs @msg = (); # messages we have @@ -41,6 +41,8 @@ $msgdir = "$main::root/msg"; # directory contain the msgs $maxage = 30 * 86400; # the maximum age that a message shall live for if not marked $last_clean = 0; # last time we did a clean @forward = (); # msg forward table +$timeout = 30*60; # forwarding timeout +$waittime = 60*60; # time an aborted outgoing message waits before trying again $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table @@ -66,6 +68,8 @@ $forwardfn = "$msgdir/forward.pl"; # the forwarding table size => '0,Size', msgno => '0,Msgno', keep => '0,Keep this?,yesno', + lastt => '9,Last processed,cldatetime', + waitt => '9,Wait until,cldatetime', ); sub DESTROY @@ -95,6 +99,7 @@ sub alloc $self->{'read'} = shift; $self->{rrreq} = shift; $self->{gotit} = []; + $self->{lastt} = $main::systime; return $self; } @@ -110,16 +115,49 @@ sub workclean delete $ref->{lines}; delete $ref->{file}; delete $ref->{count}; + delete $ref->{lastt} if exists $ref->{lastt}; + delete $ref->{waitt} if exists $ref->{waitt}; } sub process { my ($self, $line) = @_; + + # this is periodic processing + if (undef $self || undef $line) { + + # wander down the work queue stopping any messages that have timed out + for (keys %work) { + my $ref = $work{$_}; + if ($main::systime > $ref->{lastt} + $timeout) { + my $tonode = $ref->{tonode}; + $ref->stop_msg(); + + # delay any outgoing messages that fail + $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall; + } + } + + # clean the message queue + clean_old() if $main::systime - $last_clean > 3600 ; + return; + } + my @f = split /\^/, $line; my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number - + SWITCH: { if ($pcno == 28) { # incoming message + + # first look for any messages in the busy queue + # and cancel them this should both resolve timed out incoming messages + # and crossing of message between nodes, incoming messages have priority + if (exists $busy{$f[2]}) { + my $ref = $busy{$f[2]}; + my $tonode = $ref->{tonode}; + $ref->stop_msg(); + } + my $t = cltounix($f[5], $f[6]); my $stream = next_transno($f[2]); my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); @@ -148,6 +186,7 @@ sub process dbg('msg', "stream $f[3]: $ref->{count} lines received\n"); $ref->{count} = 0; } + $ref->{lastt} = $main::systime; } last SWITCH; } @@ -165,6 +204,7 @@ sub process $ref->{lines} = []; push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); + $ref->{lastt} = $main::systime; } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -176,6 +216,7 @@ sub process if ($ref) { dbg('msg', "tranche ack stream $f[3]\n"); $ref->send_tranche($self); + $ref->{lastt} = $main::systime; } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -202,7 +243,7 @@ sub process my $m; for $m (@msg) { if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) { - $ref->stop_msg($self); + $ref->stop_msg(); my $msgno = $m->{msgno}; dbg('msg', "duplicate message to $msgno\n"); Log('msg', "duplicate message to $msgno"); @@ -212,7 +253,7 @@ sub process # look for 'bad' to addresses if (grep $ref->{to} eq $_, @badmsg) { - $ref->stop_msg($self); + $ref->stop_msg(); dbg('msg', "'Bad' TO address $ref->{to}"); Log('msg', "'Bad' TO address $ref->{to}"); return; @@ -223,11 +264,11 @@ sub process $ref->store($ref->{lines}); add_dir($ref); my $dxchan = DXChannel->get($ref->{to}); - $dxchan->send($dxchan->msg('msgnew')) if $dxchan; + $dxchan->send($dxchan->msg('m9')) if $dxchan; Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}"); } } - $ref->stop_msg($self); + $ref->stop_msg(); queue_msg(0); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream @@ -247,7 +288,7 @@ sub process push @{$ref->{gotit}}, $f[2]; # mark this up as being received $ref->store($ref->{lines}); # re- store the file } - $ref->stop_msg($self); + $ref->stop_msg(); } else { $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -294,7 +335,7 @@ sub process dbg('msg', "stream $f[3]: abort received\n"); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - $ref->stop_msg($self); + $ref->stop_msg(); $ref = undef; } @@ -310,8 +351,6 @@ sub process } } } - - clean_old() if $main::systime - $last_clean > 3600 ; # clean the message queue } @@ -516,6 +555,13 @@ sub queue_msg foreach $ref (@msg) { # firstly, is it private and unread? if so can I find the recipient # in my cluster node list offsite? + + # ignore 'delayed' messages until their waiting time has expired + if (exists $ref->{waitt}) { + next if $ref->{waitt} < $main::systime; + delete $ref->{waitt}; + } + if ($ref->{private}) { if ($ref->{'read'} == 0) { $clref = DXCluster->get_exact($ref->{to}); @@ -606,11 +652,13 @@ sub get_fwq sub stop_msg { my ($self, $dxchan) = @_; - my $node = $dxchan->call; + my $node = $self->{tonode} + my $stream = $self->{stream} if exists $self->{stream}; - dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n"); + + dbg('msg', "stop msg $self->{msgno} -> node $node\n"); delete $work{$node}; - delete $work{"$node$self->{stream}"}; + delete $work{"$node$stream"} if $stream; $self->workclean; delete $busy{$node}; } @@ -743,7 +791,7 @@ sub do_send_stuff $loc->{lines} = []; $self->state('sendbody'); #push @out, $self->msg('sendbody'); - push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit"; + push @out, $self->msg('m8');) } elsif ($self->state eq 'sendbody') { confess "local var gone missing" if !ref $self->{loc}; my $loc = $self->{loc}; @@ -766,12 +814,12 @@ sub do_send_stuff $loc->{rrreq}); $ref->store($loc->{lines}); $ref->add_dir(); - #push @out, $self->msg('sendsent', $to); - push @out, "msgno $ref->{msgno} sent to $to"; + push @out, $self->msg('m11', $ref->{msgno}, $to); + #push @out, "msgno $ref->{msgno} sent to $to"; my $dxchan = DXChannel->get(uc $to); if ($dxchan) { if ($dxchan->is_user()) { - $dxchan->send("New mail has arrived for you"); + $dxchan->send($dxchan->msg('m9')); } } } @@ -780,11 +828,12 @@ sub do_send_stuff delete $loc->{to}; delete $self->{loc}; $self->func(undef); + DXMsg::queue_msg(0); $self->state('prompt'); } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") { #push @out, $self->msg('sendabort'); - push @out, "aborted"; + push @out, $self->msg('m10'); delete $loc->{lines}; delete $loc->{to}; delete $self->{loc}; diff --git a/perl/DXProt.pm b/perl/DXProt.pm index 4349b87c..0d137e52 100644 --- a/perl/DXProt.pm +++ b/perl/DXProt.pm @@ -516,11 +516,11 @@ sub normal $wwvdup{$dupkey} = $d; $field[6] =~ s/-\d+$//o; # remove spotter's ssid - my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..$#field]); + my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..8]); my $r; eval { - $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..$#field]); + $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..8]); }; # dbg('local', "Local::wwv2 error $@") if $@; return if $r; @@ -528,9 +528,9 @@ sub normal # DON'T be silly and send on PC27s! return if $pcno == 27; - # broadcast to the eager users - broadcast_users("WWV de $field[7] <$field[2]>: SFI=$sfi, A=$k, K=$i, $field[6]", 'wwv', $wwv ); - last SWITCH; + # broadcast to the eager world + send_wwv_spot($self, $line, $d, $field[2], $sfi, $k, $i, @field[6..8]); + return; } if ($pcno == 24) { # set here status @@ -836,6 +836,45 @@ sub send_dx_spot } } +sub send_wwv_spot +{ + my $self = shift; + my $line = shift; + my @dxchan = DXChannel->get_all(); + my $dxchan; + + # send it if it isn't the except list and isn't isolated and still has a hop count + # taking into account filtering and so on + foreach $dxchan (@dxchan) { + my $routeit; + my ($filter, $hops) = Filter::it($dxchan->{wwvfilter}, @_, $self->{call} ) if $dxchan->{wwvfilter}; + if ($dxchan->is_ak1a) { + next if $dxchan == $self; + if ($hops) { + $routeit = $line; + $routeit =~ s/\^H\d+\^\~$/\^H$hops\^\~/; + } else { + $routeit = adjust_hops($dxchan, $line); # adjust its hop count by node name + next unless $routeit; + } + if ($filter) { + $dxchan->send($routeit) if $routeit; + } else { + $dxchan->send($routeit) unless $dxchan->{isolate} || $self->{isolate}; + + } + } elsif ($dxchan->is_user && $dxchan->{wwv}) { + my $buf = "WWV de $_[6] <$_[1]>: SFI=$_[2], A=$_[3], K=$_[4], $_[5]"; + $buf .= "\a\a" if $dxchan->{beep}; + if ($dxchan->{state} eq 'prompt' || $dxchan->{state} eq 'convers') { + $dxchan->send($buf) if !$hops || ($hops && $filter); + } else { + $dxchan->delay($buf) if !$hops || ($hops && $filter); + } + } + } +} + sub send_local_config { my $self = shift; diff --git a/perl/Messages b/perl/Messages index f1e7a954..b1ef57ad 100644 --- a/perl/Messages +++ b/perl/Messages @@ -71,9 +71,18 @@ package DXM; lockout => '$_[0] Locked out', lockoutc => '$_[0] Created and Locked out', lockoutun => '$_[0] Unlocked', - m2 => '$_[0] Information: $_[1]', + m1 => 'Enter Subject (30 characters) >', + m2 => 'Copy of msg $_[0] sent to $_[1]', + m3 => 'Sorry, $_[0] is an unacceptable TO address', + m4 => 'Sorry, can\'t access message $_[0]', + m5 => 'Sorry, I need a message number', + m6 => 'Reply to: $_[0]', + m7 => 'Subject : $_[0]', + m8 => 'Enter Message /EX to send or /ABORT to exit', + m9 => 'New mail has arrived for you', + m10 => 'Message Aborted', + m11 => 'Message no $_[0] saved and directed to $_[1]', merge1 => 'Merge request for $_[1] spots and $_[2] WWV sent to $_[0]', - msgnew => 'New mail has arrived for you', namee1 => 'Please enter your name, set/name ', namee2 => 'Can\'t find user $_[0]!', name => 'Your name is now \"$_[0]\"', diff --git a/perl/cluster.pl b/perl/cluster.pl index d1bc6257..1af0388a 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -374,6 +374,7 @@ for (;;) { DXCommandmode::process(); # process ongoing command mode stuff DXProt::process(); # process ongoing ak1a pcxx stuff DXConnect::process(); + DXMsg::process(); eval { Local::process(); # do any localised processing }; -- 2.34.1