$main::build += $VERSION;
$main::branch += $BRANCH;
-use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
+use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean $residencetime
@badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime
$queueinterval $lastq $importfn $minchunk $maxchunk $bulltopriv);
$minchunk = 4800; # minimum chunk size for a split message
$maxchunk = 6000; # maximum chunk size
$bulltopriv = 1; # convert msgs with callsigns to private if they are bulls
+$residencetime = 2*86400; # keep deleted messages for this amount of time
$badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store
lastt => '5,Last processed,cldatetime',
waitt => '5,Wait until,cldatetime',
delete => '5,Awaiting Delete,yesno',
+ deletetime => '5,Deletion Time,cldatetime',
);
# allocate a new object
$self->{origin} = shift;
$self->{'read'} = shift;
$self->{rrreq} = shift;
+ $self->{delete} = shift;
+ $self->{deletetime} = shift;
$self->{gotit} = [];
# $self->{lastt} = $main::systime;
$self->{lines} = [];
# clean the message queue
clean_old() if $main::systime - $last_clean > 3600 ;
+ $last_clean = $main::systime;
return;
}
my @f = split /\^/, $line;
my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
my ($tonode, $fromnode) = @f[1, 2];
- my $stream = $f[3] if $pcno > 29 && $pcno <= 33;
+ my $stream = $f[3] if ($pcno >= 29 && $pcno <= 33) || $pcno == 42;
SWITCH: {
if ($pcno == 28) { # incoming message
my $ref = get_fwq($fromnode, $stream);
if ($ref) {
$f[4] =~ s/\%5E/^/g;
- push @{$ref->{lines}}, $f[4];
+ if (@{$ref->{lines}}) {
+ push @{$ref->{lines}}, $f[4];
+ } else {
+ # temporarily store any R: lines so that we end up with
+ # only the first and last ones stored.
+ if ($f[4] =~ m|^R:\d{6}/\d{4}|) {
+ push @{$ref->{tempr}}, $f[4];
+ } else {
+ if (exists $ref->{tempr}) {
+ push @{$ref->{lines}}, shift @{$ref->{tempr}};
+ push @{$ref->{lines}}, pop @{$ref->{tempr}} if @{$ref->{tempr}};
+ delete $ref->{tempr};
+ }
+ push @{$ref->{lines}}, $f[4];
+ }
+ }
$ref->{count}++;
if ($ref->{count} >= $ref->{linesreq}) {
$self->send(DXProt::pc31($fromnode, $tonode, $stream));
if ($ref) {
if ($ref->{private}) { # remove it if it private and gone off site#
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted");
- $ref->{delete}++;
+ $ref->mark_delete;
} else {
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode");
push @{$ref->{gotit}}, $fromnode; # mark this up as being received
if ($pcno == 49) { # global delete on subject
for (@msg) {
if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
- $_->{delete}++;
+ $_->mark_delete;
Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
DXChannel::broadcast_nodes($line, $self);
}
if (defined $fh) {
my $rr = $ref->{rrreq} ? '1' : '0';
my $priv = $ref->{private} ? '1': '0';
- print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n";
+ my $del = $ref->{delete} ? '1' : '0';
+ my $delt = $ref->{deletetime} || '0';
+ print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr^$del^$delt\n";
print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
my $line;
$ref->{size} = 0;
if ($self->{tonode}) {
$self->{delete}++;
+ $self->{deletetime} = 0;
} else {
my $call;
if ($dxchan) {
}
}
+sub mark_delete
+{
+ my $ref = shift;
+ my $t = shift;
+ $t = $main::systime + $residencetime unless defined $t;
+
+ $ref->{delete}++;
+ $ref->{deletetime} = $t;
+ $ref->store( [$ref->read_msg_body] );
+}
+
# clean out old messages from the message queue
sub clean_old
{
my $ref;
# mark old messages for deletion
- dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg');
foreach $ref (@msg) {
if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
- $ref->{deleteme} = 1;
- unlink filename($ref->{msgno});
- dbg("deleting old $ref->{msgno}\n") if isdbg('msg');
+
+ # this is for IMMEDIATE destruction
+ $ref->{delete}++;
+ $ref->{deletetime} = 0;
}
}
-
- # remove them all from the active message list
- @msg = grep { !$_->{deleteme} } @msg;
- dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg');
- $last_clean = $main::systime;
}
# read in a message header
next if $ref->{tonode}; # ignore it if it already being processed
# is it awaiting deletion?
- if ($ref->{delete}) {
+ if ($ref->{delete} && $main::systime > $ref->{deletetime}) {
$ref->del_msg;
next;
}
+ next if $ref->{delete};
# firstly, is it private and unread? if so can I find the recipient
# in my cluster node list offsite?
delete $ref->{stream};
delete $ref->{file};
delete $ref->{count};
- delete $ref->{lastt} if exists $ref->{lastt};
- delete $ref->{waitt} if exists $ref->{waitt};
+ delete $ref->{tempr};
+ delete $ref->{lastt};
+ delete $ref->{waitt};
}
# get a new transaction number from the file specified
# first line;
my $line = shift @$ref;
- my @f = split /\s+/, $line;
+ my @f = split /\b/, $line;
+ @f = map {s/\s+//g; length $_ ? $_ : ()} @f;
+
unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
my $m = "invalid first line in import '$line'";
dbg($m) if isdbg('msg');
;
} elsif ($notincalls && ($f eq 'RR')) {
$rr = '1';
- } elsif ($f eq '@' && @f) { # this is bbs syntax, for AT
- next;
+ } elsif (($f =~ /^[\@\.\#\$]$/ || $f eq '.#') && @f) { # this is bbs syntax, for AT
+ shift @f;
} elsif ($f eq '<' && @f) { # this is bbs syntax for from call
$from = uc shift @f;
} elsif ($f =~ /^\$/) { # this is bbs syntax for a bid
next;
- } elsif ($f =~ /^<\S+/) { # this is bbs syntax for from call
- ($from) = $f =~ /^<(\S+)$/;
- } elsif ($f =~ /^\@\S+/) { # this is bbs syntax for origin
- ($origin) = $f =~ /^\@(\S+)$/;
+ } elsif ($f =~ /^<(\S+)/) { # this is bbs syntax for from call
+ $from = $1;
+ } elsif ($f =~ /^\$\S+/) { # this is bbs syntax for bid
+ ;
} else {
# callsign ?