fix a talk bug for t xxx > yyy
[spider.git] / perl / cluster.pl
1 #!/usr/bin/perl -w
2 #
3 # This is the DX cluster 'daemon'. It sits in the middle of its little
4 # web of client routines sucking and blowing data where it may.
5 #
6 # Hence the name of 'spider' (although it may become 'dxspider')
7 #
8 # Copyright (c) 1998 Dirk Koopman G1TLH
9 #
10 # $Id$
11
12
13 require 5.004;
14
15 # make sure that modules are searched in the order local then perl
16 BEGIN {
17         umask 002;
18         
19         # root of directory tree for this system
20         $root = "/spider"; 
21         $root = $ENV{'DXSPIDER_ROOT'} if $ENV{'DXSPIDER_ROOT'};
22         
23         unshift @INC, "$root/perl";     # this IS the right way round!
24         unshift @INC, "$root/local";
25
26         # try to create and lock a lockfile (this isn't atomic but 
27         # should do for now
28         $lockfn = "$root/perl/cluster.lock";       # lock file name
29         if (-e $lockfn) {
30                 open(CLLOCK, "$lockfn") or die "Can't open Lockfile ($lockfn) $!";
31                 my $pid = <CLLOCK>;
32                 chomp $pid;
33                 die "Lockfile ($lockfn) and process $pid exist, another cluster running?" if kill 0, $pid;
34                 close CLLOCK;
35         }
36         open(CLLOCK, ">$lockfn") or die "Can't open Lockfile ($lockfn) $!";
37         print CLLOCK "$$\n";
38         close CLLOCK;
39 }
40
41 use Msg;
42 use DXVars;
43 use DXDebug;
44 use DXLog;
45 use DXLogPrint;
46 use DXUtil;
47 use DXChannel;
48 use DXUser;
49 use DXM;
50 use DXCommandmode;
51 use DXProt;
52 use DXMsg;
53 use DXCluster;
54 use DXCron;
55 use DXConnect;
56 use Prefix;
57 use Bands;
58 use Geomag;
59 use CmdAlias;
60 use Filter;
61 use DXDb;
62 use AnnTalk;
63 use WCY;
64 use DXDupe;
65 use BadWords;
66
67 use Data::Dumper;
68 use Fcntl ':flock'; 
69
70 use Local;
71
72 package main;
73
74 @inqueue = ();                                  # the main input queue, an array of hashes
75 $systime = 0;                                   # the time now (in seconds)
76 $version = "1.44";                              # the version no of the software
77 $starttime = 0;                 # the starting time of the cluster   
78 $lockfn = "cluster.lock";       # lock file name
79 @outstanding_connects = ();     # list of outstanding connects
80       
81 # handle disconnections
82 sub disconnect
83 {
84         my $dxchan = shift;
85         return if !defined $dxchan;
86         $dxchan->disconnect();
87 }
88
89 # send a message to call on conn and disconnect
90 sub already_conn
91 {
92         my ($conn, $call, $mess) = @_;
93         
94         dbg('chan', "-> D $call $mess\n"); 
95         $conn->send_now("D$call|$mess");
96         sleep(1);
97         dbg('chan', "-> Z $call bye\n");
98         $conn->send_now("Z$call|bye"); # this will cause 'client' to disconnect
99         sleep(1);
100         $conn->disconnect();
101 }
102
103 # handle incoming messages
104 sub rec
105 {
106         my ($conn, $msg, $err) = @_;
107         my $dxchan = DXChannel->get_by_cnum($conn); # get the dxconnnect object for this message
108         
109         if (!defined $msg || (defined $err && $err)) {
110                 if ($dxchan) {
111                         if (defined $err) {
112                                 $conn->disconnect;
113                                 undef $conn;
114                                 $dxchan->conn(undef);
115                         }
116                         $dxchan->disconnect;
117                 } elsif ($conn) {
118                         $conn->disconnect;
119                 }
120                 return;
121         }
122         
123         # set up the basic channel info - this needs a bit more thought - there is duplication here
124         if (!defined $dxchan) {
125                 my ($sort, $call, $line) = DXChannel::decode_input(0, $msg);
126                 return unless defined $sort;
127  
128                 # is there one already connected to me - locally? 
129                 my $user = DXUser->get($call);
130                 if (DXChannel->get($call)) {
131                         my $mess = DXM::msg($lang, ($user && $user->is_node) ? 'concluster' : 'conother', $call);
132                         already_conn($conn, $call, $mess);
133                         return;
134                 }
135                 
136                 # is there one already connected elsewhere in the cluster?
137                 if ($user) {
138                         if (($user->is_node || $call eq $myalias) && !DXCluster->get_exact($call)) {
139                                 ;
140                         } else {
141                                 if (DXCluster->get_exact($call)) {
142                                         my $mess = DXM::msg($lang, $user->is_node ? 'concluster' : 'conother', $call);
143                                         already_conn($conn, $call, $mess);
144                                         return;
145                                 }
146                         }
147                         $user->{lang} = $main::lang if !$user->{lang}; # to autoupdate old systems
148                 } else {
149                         if (DXCluster->get_exact($call)) {
150                                 my $mess = DXM::msg($lang, 'conother', $call);
151                                 already_conn($conn, $call, $mess);
152                                 return;
153                         }
154                         $user = DXUser->new($call);
155                 }
156
157                 # is he locked out ?
158                 if ($user->lockout) {
159                         Log('DXCommand', "$call is locked out, disconnected");
160                         $conn->send_now("Z$call|bye"); # this will cause 'client' to disconnect
161                         return;
162                 }
163
164                 # create the channel
165                 $dxchan = DXCommandmode->new($call, $conn, $user) if $user->is_user;
166                 $dxchan = DXProt->new($call, $conn, $user) if $user->is_node;
167                 $dxchan = BBS->new($call, $conn, $user) if $user->is_bbs;
168                 die "Invalid sort of user on $call = $sort" if !$dxchan;
169         }
170         
171         # queue the message and the channel object for later processing
172         if (defined $msg) {
173                 my $self = bless {}, "inqueue";
174                 $self->{dxchan} = $dxchan;
175                 $self->{data} = $msg;
176                 push @inqueue, $self;
177         }
178 }
179
180 sub login
181 {
182         return \&rec;
183 }
184
185 # cease running this program, close down all the connections nicely
186 sub cease
187 {
188         my $dxchan;
189
190         $SIG{'TERM'} = 'IGNORE';
191         $SIG{'INT'} = 'IGNORE';
192         
193         DXUser::sync;
194
195         eval {
196                 Local::finish();   # end local processing
197         };
198         dbg('local', "Local::finish error $@") if $@;
199
200         # disconnect nodes
201         foreach $dxchan (DXChannel->get_all()) {
202                 next unless $dxchan->is_node;
203                 disconnect($dxchan) unless $dxchan == $DXProt::me;
204         }
205         Msg->event_loop(1, 0.05);
206         Msg->event_loop(1, 0.05);
207         Msg->event_loop(1, 0.05);
208         Msg->event_loop(1, 0.05);
209         Msg->event_loop(1, 0.05);
210         Msg->event_loop(1, 0.05);
211
212         # disconnect users
213         foreach $dxchan (DXChannel->get_all()) {
214                 next if $dxchan->is_node;
215                 disconnect($dxchan) unless $dxchan == $DXProt::me;
216         }
217         Msg->event_loop(1, 0.05);
218         Msg->event_loop(1, 0.05);
219         Msg->event_loop(1, 0.05);
220         Msg->event_loop(1, 0.05);
221         Msg->event_loop(1, 0.05);
222         Msg->event_loop(1, 0.05);
223         DXUser::finish();
224         DXDupe::finish();
225
226         # close all databases
227         DXDb::closeall;
228         
229         dbg('chan', "DXSpider version $version ended");
230         Log('cluster', "DXSpider V$version stopped");
231         dbgclose();
232         Logclose();
233         unlink $lockfn;
234 #       $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
235         exit(0);
236 }
237
238 # the reaper of children
239 sub reap
240 {
241         $SIG{'CHLD'} = \&reap;
242         my $cpid = wait;
243         @outstanding_connects = grep {$_->{pid} != $cpid} @outstanding_connects;
244 }
245
246 # this is where the input queue is dealt with and things are dispatched off to other parts of
247 # the cluster
248 sub process_inqueue
249 {
250         my $self = shift @inqueue;
251         return if !$self;
252         
253         my $data = $self->{data};
254         my $dxchan = $self->{dxchan};
255         my $error;
256         my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data);
257         return unless defined $sort;
258         
259         # do the really sexy console interface bit! (Who is going to do the TK interface then?)
260         dbg('chan', "<- $sort $call $line\n") unless $sort eq 'D';
261
262         # handle A records
263         my $user = $dxchan->user;
264         if ($sort eq 'A' || $sort eq 'O') {
265                 $dxchan->start($line, $sort);  
266         } elsif ($sort eq 'I') {
267                 die "\$user not defined for $call" if !defined $user;
268                 # normal input
269                 $dxchan->normal($line);
270                 disconnect($dxchan) if ($dxchan->{state} eq 'bye');
271         } elsif ($sort eq 'Z') {
272                 $dxchan->conn(undef);
273                 disconnect($dxchan);
274         } elsif ($sort eq 'D') {
275                 ;                       # ignored (an echo)
276         } else {
277                 print STDERR atime, " Unknown command letter ($sort) received from $call\n";
278         }
279 }
280
281 sub uptime
282 {
283         my $t = $systime - $starttime;
284         my $days = int $t / 86400;
285         $t -= $days * 86400;
286         my $hours = int $t / 3600;
287         $t -= $hours * 3600;
288         my $mins = int $t / 60;
289         return sprintf "%d %02d:%02d", $days, $hours, $mins;
290 }
291 #############################################################
292 #
293 # The start of the main line of code 
294 #
295 #############################################################
296
297 $starttime = $systime = time;
298
299 # open the debug file, set various FHs to be unbuffered
300 dbginit();
301 foreach (@debug) {
302         dbgadd($_);
303 }
304 STDOUT->autoflush(1);
305
306 Log('cluster', "DXSpider V$version started");
307
308 # banner
309 dbg('err', "DXSpider DX Cluster Version $version", "Copyright (c) 1998-2000 Dirk Koopman G1TLH");
310
311 # load Prefixes
312 dbg('err', "loading prefixes ...");
313 Prefix::load();
314
315 # load band data
316 dbg('err', "loading band data ...");
317 Bands::load();
318
319 # initialise User file system
320 dbg('err', "loading user file system ..."); 
321 DXUser->init($userfn, 1);
322
323 # start listening for incoming messages/connects
324 dbg('err', "starting listener ...");
325 Msg->new_server("$clusteraddr", $clusterport, \&login);
326
327 # load bad words
328 dbg('err', "load badwords: " . (BadWords::load or "Ok"));
329
330 # prime some signals
331 $SIG{INT} = \&cease;
332 $SIG{TERM} = \&cease;
333 $SIG{HUP} = 'IGNORE';
334 $SIG{CHLD} = \&reap;
335
336 $SIG{PIPE} = sub {      dbg('err', "Broken PIPE signal received"); };
337 $SIG{IO} = sub {        dbg('err', "SIGIO received"); };
338 $SIG{WINCH} = $SIG{STOP} = $SIG{CONT} = 'IGNORE';
339 $SIG{KILL} = 'DEFAULT';     # as if it matters....
340
341 # catch the rest with a hopeful message
342 for (keys %SIG) {
343         if (!$SIG{$_}) {
344 #               dbg('chan', "Catching SIG $_");
345                 $SIG{$_} = sub { my $sig = shift;       DXDebug::confess("Caught signal $sig");  }; 
346         }
347 }
348
349 # start dupe system
350 DXDupe::init();
351
352 # read in system messages
353 DXM->init();
354
355 # read in command aliases
356 CmdAlias->init();
357
358 # initialise the Geomagnetic data engine
359 Geomag->init();
360 WCY->init();
361
362 # initial the Spot stuff
363 Spot->init();
364
365 # initialise the protocol engine
366 dbg('err', "reading in duplicate spot and WWV info ...");
367 DXProt->init();
368
369
370 # put in a DXCluster node for us here so we can add users and take them away
371 DXNode->new(0, $mycall, 0, 1, $DXProt::myprot_version); 
372
373 # read in any existing message headers and clean out old crap
374 dbg('err', "reading existing message headers ...");
375 DXMsg->init();
376 DXMsg::clean_old();
377
378 # read in any cron jobs
379 dbg('err', "reading cron jobs ...");
380 DXCron->init();
381
382 # read in database descriptors
383 dbg('err', "reading database descriptors ...");
384 DXDb::load();
385
386 # starting local stuff
387 dbg('err', "doing local initialisation ...");
388 eval {
389         Local::init();
390 };
391 dbg('local', "Local::init error $@") if $@;
392
393 # print various flags
394 #dbg('err', "seful info - \$^D: $^D \$^W: $^W \$^S: $^S \$^P: $^P");
395
396 # this, such as it is, is the main loop!
397 dbg('err', "orft we jolly well go ...");
398
399 #open(DB::OUT, "|tee /tmp/aa");
400
401 for (;;) {
402         my $timenow;
403 #       $DB::trace = 1;
404         
405         Msg->event_loop(1, 0.1);
406         $timenow = time;
407         process_inqueue();                      # read in lines from the input queue and despatch them
408 #       $DB::trace = 0;
409         
410         # do timed stuff, ongoing processing happens one a second
411         if ($timenow != $systime) {
412                 $systime = $timenow;
413                 $cldate = &cldate();
414                 $ztime = &ztime();
415                 DXCron::process();      # do cron jobs
416                 DXCommandmode::process(); # process ongoing command mode stuff
417                 DXProt::process();              # process ongoing ak1a pcxx stuff
418                 DXConnect::process();
419                 DXMsg::process();
420                 DXDb::process();
421                 DXUser::process();
422                 DXDupe::process();
423                 
424                 eval { 
425                         Local::process();       # do any localised processing
426                 };
427                 dbg('local', "Local::process error $@") if $@;
428         }
429         if ($decease) {
430                 last if --$decease <= 0;
431         }
432 }
433 cease(0);
434 exit(0);
435
436