+27Jun99=======================================================================
+1. added WWV filtering
+2. added timeouts to for forwarding Messages, also added a waiting time for
+failed (stalled) outgoing messages.
+3. Incoming messages will now have priority over outgoing messages to the
+same node.
+4. Added 'catchup' command which will 'catchup' messages to date for a node.
+This means that when you start forwarding to a node, it doesn't get all the
+messages queued up that are probably old.
21Jun99=======================================================================
1. changed regex for cluster->client msgs so that strings like |---| are no
longer ignored.
--- /dev/null
+#
+# catchup some or all of the non-private messages for a node.
+#
+# in other words mark all messages as being already received
+# by this node.
+#
+# $Id$
+#
+# Copyright (c) 1999 Dirk Koopman G1TLH
+#
+
+my ($self, $line) = @_;
+my @f = split /\s+/, $line;
+my $call = uc shift @f;
+my @out;
+
+
+return (1, @out);
$oref = DXMsg::get($f[$i]);
if (!$oref) {
delete $self->{loc};
- return (0, "can't access message $i");
+ return (1, $self->msg('m4', $i));
}
} else {
if (!($oref = DXMsg::get($self->lastread))) {
delete $self->{loc};
- #return (0, $self->msg('esend2'));
- return (0, "need a message number");
+ return (1, $self->msg('m5'));
+ #return (1, "need a message number");
}
}
$self->func("DXMsg::do_send_stuff");
$self->state('sendbody');
#push @out, $self->msg('sendsubj');
- push @out, "Reply to: $to";
- push @out, "Subject : $loc->{subject}";
- push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+# push @out, "Reply to: $to";
+# push @out, "Subject : $loc->{subject}";
+# push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+ push @out, $self->msg('m6', $to);
+ push @out, $self->msg('m7', $loc->{subject});
+ push @out, $self->msg('m8');
}
return (1, @out);
push @list, $oref->read_msg_body();
$nref->store(\@list);
$nref->add_dir();
- #push @out, $self->msg('sendcc', $oref->msgno, $f[$i]);
- push @out, "copy of msg $oref->{msgno} sent to $to";
+ push @out, $self->msg('m2', $oref->msgno, $to);
+# push @out, "copy of msg $oref->{msgno} sent to $to";
}
DXMsg::queue_msg();
return (1, @out);
foreach $t (@f[ $i..$#f ]) {
$t = uc $t;
if (grep $_ eq $t, @DXMsg::badmsg) {
- push @out, "Sorry, $t is an unacceptable TO address";
+# push @out, "Sorry, $t is an unacceptable TO address";
+ push @out, $self->msg('m3', $t);
} else {
push @to, $t;
}
# keep calling me for every line until I relinquish control
$self->func("DXMsg::do_send_stuff");
$self->state('send1');
- #push @out, $self->msg('sendsubj');
- push @out, "Enter Subject (30 characters) >";
+ push @out, $self->msg('m1');
+ #push @out, "Enter Subject (30 characters) >";
}
return (1, @out);
--- /dev/null
+#
+# uncatchup some or all of the non-private messages for a node.
+#
+# in other words mark messages as NOT being already received
+# by this node.
+#
+# $Id$
+#
+# Copyright (c) 1999 Dirk Koopman G1TLH
+#
+
+my ($self, $line) = @_;
+my @f = split /\s+/, $line;
+my $call = uc shift @f;
+my @out;
+
+
+return (1, @out);
--- /dev/null
+#
+# This is an example WWV filter
+#
+# The element list is:-
+# 0 - nominal unix date of spot (ie the day + hour:13)
+# 1 - the hour
+# 2 - SFI
+# 3 - K
+# 4 - I
+# 5 - text
+# 6 - spotter
+# 7 - origin
+# 8 - incoming interface callsign
+#
+# this one doesn't filter, it just sets the hop count to 6 and is
+# used mainly just to override any isolation from WWV coming from
+# the internet.
+
+$in = [
+ [ 1, 0, 'd', 0, 6 ]
+];
+
use strict;
use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
- @badmsg $badmsgfn $forwardfn @forward);
+ @badmsg $badmsgfn $forwardfn @forward $timeout $waittime);
%work = (); # outstanding jobs
@msg = (); # messages we have
$maxage = 30 * 86400; # the maximum age that a message shall live for if not marked
$last_clean = 0; # last time we did a clean
@forward = (); # msg forward table
+$timeout = 30*60; # forwarding timeout
+$waittime = 60*60; # time an aborted outgoing message waits before trying again
$badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store
$forwardfn = "$msgdir/forward.pl"; # the forwarding table
size => '0,Size',
msgno => '0,Msgno',
keep => '0,Keep this?,yesno',
+ lastt => '9,Last processed,cldatetime',
+ waitt => '9,Wait until,cldatetime',
);
sub DESTROY
$self->{'read'} = shift;
$self->{rrreq} = shift;
$self->{gotit} = [];
+ $self->{lastt} = $main::systime;
return $self;
}
delete $ref->{lines};
delete $ref->{file};
delete $ref->{count};
+ delete $ref->{lastt} if exists $ref->{lastt};
+ delete $ref->{waitt} if exists $ref->{waitt};
}
sub process
{
my ($self, $line) = @_;
+
+ # this is periodic processing
+ if (undef $self || undef $line) {
+
+ # wander down the work queue stopping any messages that have timed out
+ for (keys %work) {
+ my $ref = $work{$_};
+ if ($main::systime > $ref->{lastt} + $timeout) {
+ my $tonode = $ref->{tonode};
+ $ref->stop_msg();
+
+ # delay any outgoing messages that fail
+ $ref->{waitt} = $main::systime + $waittime if $tonode ne $main::mycall;
+ }
+ }
+
+ # clean the message queue
+ clean_old() if $main::systime - $last_clean > 3600 ;
+ return;
+ }
+
my @f = split /\^/, $line;
my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
-
+
SWITCH: {
if ($pcno == 28) { # incoming message
+
+ # first look for any messages in the busy queue
+ # and cancel them this should both resolve timed out incoming messages
+ # and crossing of message between nodes, incoming messages have priority
+ if (exists $busy{$f[2]}) {
+ my $ref = $busy{$f[2]};
+ my $tonode = $ref->{tonode};
+ $ref->stop_msg();
+ }
+
my $t = cltounix($f[5], $f[6]);
my $stream = next_transno($f[2]);
my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
$ref->{count} = 0;
}
+ $ref->{lastt} = $main::systime;
}
last SWITCH;
}
$ref->{lines} = [];
push @{$ref->{lines}}, ($ref->read_msg_body);
$ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
}
if ($ref) {
dbg('msg', "tranche ack stream $f[3]\n");
$ref->send_tranche($self);
+ $ref->{lastt} = $main::systime;
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
}
my $m;
for $m (@msg) {
if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from}) {
- $ref->stop_msg($self);
+ $ref->stop_msg();
my $msgno = $m->{msgno};
dbg('msg', "duplicate message to $msgno\n");
Log('msg', "duplicate message to $msgno");
# look for 'bad' to addresses
if (grep $ref->{to} eq $_, @badmsg) {
- $ref->stop_msg($self);
+ $ref->stop_msg();
dbg('msg', "'Bad' TO address $ref->{to}");
Log('msg', "'Bad' TO address $ref->{to}");
return;
$ref->store($ref->{lines});
add_dir($ref);
my $dxchan = DXChannel->get($ref->{to});
- $dxchan->send($dxchan->msg('msgnew')) if $dxchan;
+ $dxchan->send($dxchan->msg('m9')) if $dxchan;
Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
}
}
- $ref->stop_msg($self);
+ $ref->stop_msg();
queue_msg(0);
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
push @{$ref->{gotit}}, $f[2]; # mark this up as being received
$ref->store($ref->{lines}); # re- store the file
}
- $ref->stop_msg($self);
+ $ref->stop_msg();
} else {
$self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
}
dbg('msg', "stream $f[3]: abort received\n");
my $ref = $work{"$f[2]$f[3]"};
if ($ref) {
- $ref->stop_msg($self);
+ $ref->stop_msg();
$ref = undef;
}
}
}
}
-
- clean_old() if $main::systime - $last_clean > 3600 ; # clean the message queue
}
foreach $ref (@msg) {
# firstly, is it private and unread? if so can I find the recipient
# in my cluster node list offsite?
+
+ # ignore 'delayed' messages until their waiting time has expired
+ if (exists $ref->{waitt}) {
+ next if $ref->{waitt} < $main::systime;
+ delete $ref->{waitt};
+ }
+
if ($ref->{private}) {
if ($ref->{'read'} == 0) {
$clref = DXCluster->get_exact($ref->{to});
sub stop_msg
{
my ($self, $dxchan) = @_;
- my $node = $dxchan->call;
+ my $node = $self->{tonode}
+ my $stream = $self->{stream} if exists $self->{stream};
- dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
+
+ dbg('msg', "stop msg $self->{msgno} -> node $node\n");
delete $work{$node};
- delete $work{"$node$self->{stream}"};
+ delete $work{"$node$stream"} if $stream;
$self->workclean;
delete $busy{$node};
}
$loc->{lines} = [];
$self->state('sendbody');
#push @out, $self->msg('sendbody');
- push @out, "Enter Message /EX (^Z) to send or /ABORT (^Y) to exit";
+ push @out, $self->msg('m8');)
} elsif ($self->state eq 'sendbody') {
confess "local var gone missing" if !ref $self->{loc};
my $loc = $self->{loc};
$loc->{rrreq});
$ref->store($loc->{lines});
$ref->add_dir();
- #push @out, $self->msg('sendsent', $to);
- push @out, "msgno $ref->{msgno} sent to $to";
+ push @out, $self->msg('m11', $ref->{msgno}, $to);
+ #push @out, "msgno $ref->{msgno} sent to $to";
my $dxchan = DXChannel->get(uc $to);
if ($dxchan) {
if ($dxchan->is_user()) {
- $dxchan->send("New mail has arrived for you");
+ $dxchan->send($dxchan->msg('m9'));
}
}
}
delete $loc->{to};
delete $self->{loc};
$self->func(undef);
+
DXMsg::queue_msg(0);
$self->state('prompt');
} elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
#push @out, $self->msg('sendabort');
- push @out, "aborted";
+ push @out, $self->msg('m10');
delete $loc->{lines};
delete $loc->{to};
delete $self->{loc};
$wwvdup{$dupkey} = $d;
$field[6] =~ s/-\d+$//o; # remove spotter's ssid
- my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..$#field]);
+ my $wwv = Geomag::update($d, $field[2], $sfi, $k, $i, @field[6..8]);
my $r;
eval {
- $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..$#field]);
+ $r = Local::wwv($self, $field[1], $field[2], $sfi, $k, $i, @field[6..8]);
};
# dbg('local', "Local::wwv2 error $@") if $@;
return if $r;
# DON'T be silly and send on PC27s!
return if $pcno == 27;
- # broadcast to the eager users
- broadcast_users("WWV de $field[7] <$field[2]>: SFI=$sfi, A=$k, K=$i, $field[6]", 'wwv', $wwv );
- last SWITCH;
+ # broadcast to the eager world
+ send_wwv_spot($self, $line, $d, $field[2], $sfi, $k, $i, @field[6..8]);
+ return;
}
if ($pcno == 24) { # set here status
}
}
+sub send_wwv_spot
+{
+ my $self = shift;
+ my $line = shift;
+ my @dxchan = DXChannel->get_all();
+ my $dxchan;
+
+ # send it if it isn't the except list and isn't isolated and still has a hop count
+ # taking into account filtering and so on
+ foreach $dxchan (@dxchan) {
+ my $routeit;
+ my ($filter, $hops) = Filter::it($dxchan->{wwvfilter}, @_, $self->{call} ) if $dxchan->{wwvfilter};
+ if ($dxchan->is_ak1a) {
+ next if $dxchan == $self;
+ if ($hops) {
+ $routeit = $line;
+ $routeit =~ s/\^H\d+\^\~$/\^H$hops\^\~/;
+ } else {
+ $routeit = adjust_hops($dxchan, $line); # adjust its hop count by node name
+ next unless $routeit;
+ }
+ if ($filter) {
+ $dxchan->send($routeit) if $routeit;
+ } else {
+ $dxchan->send($routeit) unless $dxchan->{isolate} || $self->{isolate};
+
+ }
+ } elsif ($dxchan->is_user && $dxchan->{wwv}) {
+ my $buf = "WWV de $_[6] <$_[1]>: SFI=$_[2], A=$_[3], K=$_[4], $_[5]";
+ $buf .= "\a\a" if $dxchan->{beep};
+ if ($dxchan->{state} eq 'prompt' || $dxchan->{state} eq 'convers') {
+ $dxchan->send($buf) if !$hops || ($hops && $filter);
+ } else {
+ $dxchan->delay($buf) if !$hops || ($hops && $filter);
+ }
+ }
+ }
+}
+
sub send_local_config
{
my $self = shift;
lockout => '$_[0] Locked out',
lockoutc => '$_[0] Created and Locked out',
lockoutun => '$_[0] Unlocked',
- m2 => '$_[0] Information: $_[1]',
+ m1 => 'Enter Subject (30 characters) >',
+ m2 => 'Copy of msg $_[0] sent to $_[1]',
+ m3 => 'Sorry, $_[0] is an unacceptable TO address',
+ m4 => 'Sorry, can\'t access message $_[0]',
+ m5 => 'Sorry, I need a message number',
+ m6 => 'Reply to: $_[0]',
+ m7 => 'Subject : $_[0]',
+ m8 => 'Enter Message /EX to send or /ABORT to exit',
+ m9 => 'New mail has arrived for you',
+ m10 => 'Message Aborted',
+ m11 => 'Message no $_[0] saved and directed to $_[1]',
merge1 => 'Merge request for $_[1] spots and $_[2] WWV sent to $_[0]',
- msgnew => 'New mail has arrived for you',
namee1 => 'Please enter your name, set/name <your name>',
namee2 => 'Can\'t find user $_[0]!',
name => 'Your name is now \"$_[0]\"',
DXCommandmode::process(); # process ongoing command mode stuff
DXProt::process(); # process ongoing ak1a pcxx stuff
DXConnect::process();
+ DXMsg::process();
eval {
Local::process(); # do any localised processing
};