]> dxcluster.org Git - spider.git/commitdiff
nominally working system!
authorDirk Koopman <djk@tobit.co.uk>
Sat, 14 Sep 2013 20:45:23 +0000 (21:45 +0100)
committerDirk Koopman <djk@tobit.co.uk>
Sat, 14 Sep 2013 20:45:23 +0000 (21:45 +0100)
Need to go over AsyncMsg and stuff but that's for tomorrow.

perl/Msg.pm
perl/Version.pm
perl/cluster.pl
perl/console.pl

index d62cb744034377f54592a3a7233571f9e88f842d..b2ee9b2326690a4abc376196d91218a55333e73f 100644 (file)
@@ -20,14 +20,14 @@ use Mojo::IOLoop::Stream;
 use DXDebug;
 use Timer;
 
-use vars qw($now %conns $noconns $cnum $total_in $total_out);
+use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout);
 
 $total_in = $total_out = 0;
 
 $now = time;
 
 $cnum = 0;
-
+$connect_timeout = 5;
 
 #
 #-----------------------------------------------------------------
@@ -61,7 +61,7 @@ sub set_error
 {
        my $conn = shift;
        my $callback = shift;
-       $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);});
+       $conn->{sock}->on(error => sub {$callback->($conn, $_[1]);});
 }
 
 sub set_on_eof
@@ -128,9 +128,35 @@ sub peerhost
 
 #-----------------------------------------------------------------
 # Send side routines
-sub connect {
-    my ($pkg, $to_host, $to_port, $rproc) = @_;
 
+sub _on_connect
+{
+       my $conn = shift;
+       my $handle = shift;
+       undef $conn->{sock};
+       my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+       $sock->on(read => sub {$conn->_rcv($_[1]);} );
+       $sock->on(error => sub {$conn->disconnect;});
+       $sock->on(close => sub {$conn->disconnect;});
+       $sock->timeout(0);
+       $sock->start;
+       dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
+       if ($conn->{on_connect}) {
+               &{$conn->{on_connect}}($conn);
+       }
+}
+
+sub is_connected
+{
+       my $conn = shift;
+       my $sock = $conn->{sock};
+       return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
+}
+
+sub connect {
+    my ($pkg, $to_host, $to_port, $rproc,  %args) = @_;
+       my $timeout = delete $args{timeout} || $connect_timeout;
+       
     # Create a connection end-point object
     my $conn = $pkg;
        unless (ref $pkg) {
@@ -144,17 +170,17 @@ sub connect {
        
        my $sock;
        $conn->{sock} = $sock = Mojo::IOLoop::Client->new;
-       $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');}, 
-                         error => {$conn->disconnect},
-                         close => {$conn->disconnect});
+       $sock->on(connect => sub {$conn->_on_connect($_[1])} );
+       $sock->on(error => sub {$conn->disconnect});
+       $sock->on(close => sub {$conn->disconnect});
+
+       # copy any args like on_connect, on_disconnect etc
+       while (my ($k, $v) = each %args) {
+               $conn->{$k} = $v;
+       }
        
-       $sock->connect(address => $to_host, port => $to_port);
+       $sock->connect(address => $to_host, port => $to_port, timeout => $timeout);
        
-       dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');
-
-    if ($conn->{rproc}) {
-               $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} );
-    }
     return $conn;
 }
 
@@ -226,6 +252,10 @@ sub disconnect
        $call ||= 'unallocated';
        dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll');
        
+       if ($conn->{on_disconnect}) {
+               &{$conn->{on_disconnect}}($conn);
+       }
+
        # get rid of any references
        for (keys %$conn) {
                if (ref($conn->{$_})) {
@@ -234,7 +264,7 @@ sub disconnect
        }
 
        if (defined($sock)) {
-               $sock->remove;
+               $sock->close_gracefully;
        }
        
        unless ($main::is_win) {
@@ -256,7 +286,7 @@ sub _send_stuff
                                dbgdump('raw', "$call send $lth: ", $lth);
                        }
                }
-               if (defined $sock && !$sock->destroyed) {
+               if (defined $sock) {
                        $sock->write($data);
                        $total_out = $lth;
                } else {
@@ -275,6 +305,13 @@ sub send_later {
        goto &send_now;
 }
 
+sub send_raw
+{
+    my ($conn, $msg) = @_;
+       push @{$conn->{outqueue}}, $msg;
+       _send_stuff($conn);
+}
+
 sub enqueue {
     my $conn = shift;
     push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : '';
@@ -300,9 +337,10 @@ sub new_server
        my ($pkg, $my_host, $my_port, $login_proc) = @_;
        my $conn = $pkg->new($login_proc);
        
-    $conn->{sock} = Mojo::IOLoop::Server->new;
-       $conn->{sock}->on(accept=>sub{$conn->new_client()});
-       $conn->{sock}->listen(address=>$my_host, port=>$my_port);
+    my $sock = $conn->{sock} = Mojo::IOLoop::Server->new;
+       $sock->on(accept=>sub{$conn->new_client($_[1]);});
+       $sock->listen(address=>$my_host, port=>$my_port);
+       $sock->start;
        
     die "Could not create socket: $! \n" unless $conn->{sock};
        return $conn;
@@ -335,60 +373,37 @@ sub dequeue
 
 sub _rcv {                     # Complement to _send
     my $conn = shift; # $rcv_now complement of $flush
-    # Find out how much has already been received, if at all
-    my ($msg, $offset, $bytes_to_read, $bytes_read);
+       my $msg = shift;
     my $sock = $conn->{sock};
     return unless defined($sock);
 
        my @lines;
-#      if ($conn->{blocking}) {
-#              blocking($sock, 0);
-#              $conn->{blocking} = 0;
-#      }
-       $bytes_read = sysread ($sock, $msg, 1024, 0);
-       if (defined ($bytes_read)) {
-               if ($bytes_read > 0) {
-                       $total_in += $bytes_read;
-                       if (isdbg('raw')) {
-                               my $call = $conn->{call} || 'none';
-                               dbgdump('raw', "$call read $bytes_read: ", $msg);
-                       }
-                       if ($conn->{echo}) {
-                               my @ch = split //, $msg;
-                               my $out;
-                               for (@ch) {
-                                       if (/[\cH\x7f]/) {
-                                               $out .= "\cH \cH";
-                                               $conn->{msg} =~ s/.$//;
-                                       } else {
-                                               $out .= $_;
-                                               $conn->{msg} .= $_;
-                                       }
-                               }
-                               if (defined $out) {
-                                       set_event_handler ($sock, write => sub{$conn->_send(0)});
-                                       push @{$conn->{outqueue}}, $out;
+       if (isdbg('raw')) {
+               my $call = $conn->{call} || 'none';
+               my $lth = length $msg;
+               dbgdump('raw', "$call read $lth: ", $msg);
+       }
+       if ($conn->{echo}) {
+               my @ch = split //, $msg;
+                       my $out;
+                       for (@ch) {
+                               if (/[\cH\x7f]/) {
+                                       $out .= "\cH \cH";
+                                       $conn->{msg} =~ s/.$//;
+                               } else {
+                                       $out .= $_;
+                                       $conn->{msg} .= $_;
                                }
-                       } else {
-                               $conn->{msg} .= $msg;
                        }
-               } 
+                       if (defined $out) {
+                               $conn->send_raw($out);
+                       }
        } else {
-               if (_err_will_block($!)) {
-                       return ; 
-               } else {
-                       $bytes_read = 0;
-               }
-    }
+               $conn->{msg} .= $msg;
+       }
 
-FINISH:
-    if (defined $bytes_read && $bytes_read == 0) {
-               &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
-               $conn->disconnect;
-    } else {
-               unless ($conn->{disable_read}) {
-                       $conn->dequeue if exists $conn->{msg};
-               }
+       unless ($conn->{disable_read}) {
+               $conn->dequeue if exists $conn->{msg};
        }
 }
 
@@ -399,6 +414,8 @@ sub new_client {
        my $conn = $server_conn->new($server_conn->{rproc});
        my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client);
        $sock->on(read => sub {$conn->_rcv($_[1])});
+       $sock->timeout(0);
+       $sock->start;
        dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll');
 
        my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport);
@@ -412,6 +429,7 @@ sub new_client {
                &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
                $conn->disconnect();
        }
+       return $conn;
 }
 
 sub close_server
index 27bd1c0fa38554ab80da11da15d286463fc92907..f7145cf7465e28b08992fff5759c9e4008ef6a18 100644 (file)
@@ -11,7 +11,7 @@ use vars qw($version $subversion $build $gitversion);
 
 $version = '1.57';
 $subversion = '0';
-$build = '1';
-$gitversion = '06a6935';
+$build = '2';
+$gitversion = '61e7e87';
 
 1;
index 51d1455b41eb3b35b01cc48e793a87ea0c1d43fd..ddcf721448582d18f1100ec363586cf677c511ee 100755 (executable)
@@ -593,7 +593,7 @@ $script->run($main::me) if $script;
 
 my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop);
 
-Mojo::IOLoop->start;
+Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
 
 cease(0);
 exit(0);
index 0a6d7404ba62c066539afd4697ba2d2fafba6771..ea46553f39acce2122c0f103f3a038e9c815f5cf 100755 (executable)
@@ -26,6 +26,8 @@ BEGIN {
        $is_win = ($^O =~ /^MS/ || $^O =~ /^OS-2/) ? 1 : 0; # is it Windows?
 }
 
+use Mojo::IOLoop;
+
 use Msg;
 use IntMsg;
 use DXVars;
@@ -54,6 +56,9 @@ $khistpos = 0;
 $spos = $pos = $lth = 0;
 $inbuf = "";
 @time = ();
+$lastmin = 0;
+$idle = 0;
+
 
 #$SIG{WINCH} = sub {@time = gettimeofday};
 
@@ -443,6 +448,46 @@ sub rec_stdin
        $bot->refresh();
 }
 
+sub idle_loop
+{
+       my $t;
+       
+       $t = time;
+       if ($t > $lasttime) {
+               my ($min)= (gmtime($t))[1];
+               if ($min != $lastmin) {
+                       show_screen();
+                       $lastmin = $min;
+               }
+               $lasttime = $t;
+       }
+       my $ch = $bot->getch();
+       if (@time && tv_interval(\@time, [gettimeofday]) >= 1) {
+               next;
+       }
+       if (defined $ch) {
+               if ($ch ne '-1') {
+                       rec_stdin($ch);
+               }
+       }
+       $top->refresh() if $top->is_wintouched;
+       $bot->refresh();
+}
+
+sub on_connect
+{
+       my $conn = shift;
+       $conn->send_later("A$call|$connsort width=$cols");
+       $conn->send_later("I$call|set/page $maxshist");
+       #$conn->send_later("I$call|set/nobeep");
+}
+
+sub on_disconnect
+{
+       $conn = shift;
+       Mojo::IOLoop->remove($idle);
+       Mojo::IOLoop->stop;
+}
 
 #
 # deal with args
@@ -464,23 +509,6 @@ if ($call eq $mycall) {
 
 dbginit();
 
-$conn = IntMsg->connect("$clusteraddr", $clusterport, \&rec_socket);
-if (! $conn) {
-       if (-r "$data/offline") {
-               open IN, "$data/offline" or die;
-               while (<IN>) {
-                       print $_;
-               }
-               close IN;
-       } else {
-               print "Sorry, the cluster $mycall is currently off-line\n";
-       }
-       exit(0);
-}
-
-$conn->set_error(sub{cease(0)});
-
-
 unless ($DB::VERSION) {
        $SIG{'INT'} = \&sig_term;
        $SIG{'TERM'} = \&sig_term;
@@ -493,40 +521,17 @@ do_resize();
 
 $SIG{__DIE__} = \&sig_term;
 
-$conn->send_later("A$call|$connsort width=$cols");
-$conn->send_later("I$call|set/page $maxshist");
-#$conn->send_later("I$call|set/nobeep");
-
-#Msg->set_event_handler(\*STDIN, "read" => \&rec_stdin);
-
 $Text::Wrap::Columns = $cols;
 
 my $lastmin = 0;
-for (;;) {
-       my $t;
-       Msg->event_loop(1, 0.01);
-       $t = time;
-       if ($t > $lasttime) {
-               my ($min)= (gmtime($t))[1];
-               if ($min != $lastmin) {
-                       show_screen();
-                       $lastmin = $min;
-               }
-               $lasttime = $t;
-       }
-       my $ch = $bot->getch();
-       if (@time && tv_interval(\@time, [gettimeofday]) >= 1) {
-#              mydbg("Got Resize");
-#              do_resize();
-               next;
-       }
-       if (defined $ch) {
-               if ($ch ne '-1') {
-                       rec_stdin($ch);
-               }
-       }
-       $top->refresh() if $top->is_wintouched;
-       $bot->refresh();
-}
 
-exit(0);
+
+$conn = IntMsg->connect($clusteraddr, $clusterport, \&rec_socket);
+$conn->{on_connect} = \&on_connect;
+$conn->{on_disconnect} = \&on_disconnect;
+
+$idle = Mojo::IOLoop->recurring(0.100 => \&idle_loop);
+Mojo::IOLoop->start;
+
+
+cease(0);