use DXDebug;
use Timer;
-use vars qw($now %conns $noconns $cnum $total_in $total_out);
+use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout);
$total_in = $total_out = 0;
$now = time;
$cnum = 0;
-
+$connect_timeout = 5;
#
#-----------------------------------------------------------------
{
my $conn = shift;
my $callback = shift;
- $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);});
+ $conn->{sock}->on(error => sub {$callback->($conn, $_[1]);});
}
sub set_on_eof
#-----------------------------------------------------------------
# Send side routines
-sub connect {
- my ($pkg, $to_host, $to_port, $rproc) = @_;
+sub _on_connect
+{
+ my $conn = shift;
+ my $handle = shift;
+ undef $conn->{sock};
+ my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+ $sock->on(read => sub {$conn->_rcv($_[1]);} );
+ $sock->on(error => sub {$conn->disconnect;});
+ $sock->on(close => sub {$conn->disconnect;});
+ $sock->timeout(0);
+ $sock->start;
+ dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
+ if ($conn->{on_connect}) {
+ &{$conn->{on_connect}}($conn);
+ }
+}
+
+sub is_connected
+{
+ my $conn = shift;
+ my $sock = $conn->{sock};
+ return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
+}
+
+sub connect {
+ my ($pkg, $to_host, $to_port, $rproc, %args) = @_;
+ my $timeout = delete $args{timeout} || $connect_timeout;
+
# Create a connection end-point object
my $conn = $pkg;
unless (ref $pkg) {
my $sock;
$conn->{sock} = $sock = Mojo::IOLoop::Client->new;
- $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');},
- error => {$conn->disconnect},
- close => {$conn->disconnect});
+ $sock->on(connect => sub {$conn->_on_connect($_[1])} );
+ $sock->on(error => sub {$conn->disconnect});
+ $sock->on(close => sub {$conn->disconnect});
+
+ # copy any args like on_connect, on_disconnect etc
+ while (my ($k, $v) = each %args) {
+ $conn->{$k} = $v;
+ }
- $sock->connect(address => $to_host, port => $to_port);
+ $sock->connect(address => $to_host, port => $to_port, timeout => $timeout);
- dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');
-
- if ($conn->{rproc}) {
- $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} );
- }
return $conn;
}
$call ||= 'unallocated';
dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll');
+ if ($conn->{on_disconnect}) {
+ &{$conn->{on_disconnect}}($conn);
+ }
+
# get rid of any references
for (keys %$conn) {
if (ref($conn->{$_})) {
}
if (defined($sock)) {
- $sock->remove;
+ $sock->close_gracefully;
}
unless ($main::is_win) {
dbgdump('raw', "$call send $lth: ", $lth);
}
}
- if (defined $sock && !$sock->destroyed) {
+ if (defined $sock) {
$sock->write($data);
$total_out = $lth;
} else {
goto &send_now;
}
+sub send_raw
+{
+ my ($conn, $msg) = @_;
+ push @{$conn->{outqueue}}, $msg;
+ _send_stuff($conn);
+}
+
sub enqueue {
my $conn = shift;
push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : '';
my ($pkg, $my_host, $my_port, $login_proc) = @_;
my $conn = $pkg->new($login_proc);
- $conn->{sock} = Mojo::IOLoop::Server->new;
- $conn->{sock}->on(accept=>sub{$conn->new_client()});
- $conn->{sock}->listen(address=>$my_host, port=>$my_port);
+ my $sock = $conn->{sock} = Mojo::IOLoop::Server->new;
+ $sock->on(accept=>sub{$conn->new_client($_[1]);});
+ $sock->listen(address=>$my_host, port=>$my_port);
+ $sock->start;
die "Could not create socket: $! \n" unless $conn->{sock};
return $conn;
sub _rcv { # Complement to _send
my $conn = shift; # $rcv_now complement of $flush
- # Find out how much has already been received, if at all
- my ($msg, $offset, $bytes_to_read, $bytes_read);
+ my $msg = shift;
my $sock = $conn->{sock};
return unless defined($sock);
my @lines;
-# if ($conn->{blocking}) {
-# blocking($sock, 0);
-# $conn->{blocking} = 0;
-# }
- $bytes_read = sysread ($sock, $msg, 1024, 0);
- if (defined ($bytes_read)) {
- if ($bytes_read > 0) {
- $total_in += $bytes_read;
- if (isdbg('raw')) {
- my $call = $conn->{call} || 'none';
- dbgdump('raw', "$call read $bytes_read: ", $msg);
- }
- if ($conn->{echo}) {
- my @ch = split //, $msg;
- my $out;
- for (@ch) {
- if (/[\cH\x7f]/) {
- $out .= "\cH \cH";
- $conn->{msg} =~ s/.$//;
- } else {
- $out .= $_;
- $conn->{msg} .= $_;
- }
- }
- if (defined $out) {
- set_event_handler ($sock, write => sub{$conn->_send(0)});
- push @{$conn->{outqueue}}, $out;
+ if (isdbg('raw')) {
+ my $call = $conn->{call} || 'none';
+ my $lth = length $msg;
+ dbgdump('raw', "$call read $lth: ", $msg);
+ }
+ if ($conn->{echo}) {
+ my @ch = split //, $msg;
+ my $out;
+ for (@ch) {
+ if (/[\cH\x7f]/) {
+ $out .= "\cH \cH";
+ $conn->{msg} =~ s/.$//;
+ } else {
+ $out .= $_;
+ $conn->{msg} .= $_;
}
- } else {
- $conn->{msg} .= $msg;
}
- }
+ if (defined $out) {
+ $conn->send_raw($out);
+ }
} else {
- if (_err_will_block($!)) {
- return ;
- } else {
- $bytes_read = 0;
- }
- }
+ $conn->{msg} .= $msg;
+ }
-FINISH:
- if (defined $bytes_read && $bytes_read == 0) {
- &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
- $conn->disconnect;
- } else {
- unless ($conn->{disable_read}) {
- $conn->dequeue if exists $conn->{msg};
- }
+ unless ($conn->{disable_read}) {
+ $conn->dequeue if exists $conn->{msg};
}
}
my $conn = $server_conn->new($server_conn->{rproc});
my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client);
$sock->on(read => sub {$conn->_rcv($_[1])});
+ $sock->timeout(0);
+ $sock->start;
dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll');
my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport);
&{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
$conn->disconnect();
}
+ return $conn;
}
sub close_server
$is_win = ($^O =~ /^MS/ || $^O =~ /^OS-2/) ? 1 : 0; # is it Windows?
}
+use Mojo::IOLoop;
+
use Msg;
use IntMsg;
use DXVars;
$spos = $pos = $lth = 0;
$inbuf = "";
@time = ();
+$lastmin = 0;
+$idle = 0;
+
#$SIG{WINCH} = sub {@time = gettimeofday};
$bot->refresh();
}
+sub idle_loop
+{
+ my $t;
+
+ $t = time;
+ if ($t > $lasttime) {
+ my ($min)= (gmtime($t))[1];
+ if ($min != $lastmin) {
+ show_screen();
+ $lastmin = $min;
+ }
+ $lasttime = $t;
+ }
+ my $ch = $bot->getch();
+ if (@time && tv_interval(\@time, [gettimeofday]) >= 1) {
+ next;
+ }
+ if (defined $ch) {
+ if ($ch ne '-1') {
+ rec_stdin($ch);
+ }
+ }
+ $top->refresh() if $top->is_wintouched;
+ $bot->refresh();
+}
+
+sub on_connect
+{
+ my $conn = shift;
+ $conn->send_later("A$call|$connsort width=$cols");
+ $conn->send_later("I$call|set/page $maxshist");
+ #$conn->send_later("I$call|set/nobeep");
+}
+
+sub on_disconnect
+{
+ $conn = shift;
+ Mojo::IOLoop->remove($idle);
+ Mojo::IOLoop->stop;
+}
#
# deal with args
dbginit();
-$conn = IntMsg->connect("$clusteraddr", $clusterport, \&rec_socket);
-if (! $conn) {
- if (-r "$data/offline") {
- open IN, "$data/offline" or die;
- while (<IN>) {
- print $_;
- }
- close IN;
- } else {
- print "Sorry, the cluster $mycall is currently off-line\n";
- }
- exit(0);
-}
-
-$conn->set_error(sub{cease(0)});
-
-
unless ($DB::VERSION) {
$SIG{'INT'} = \&sig_term;
$SIG{'TERM'} = \&sig_term;
$SIG{__DIE__} = \&sig_term;
-$conn->send_later("A$call|$connsort width=$cols");
-$conn->send_later("I$call|set/page $maxshist");
-#$conn->send_later("I$call|set/nobeep");
-
-#Msg->set_event_handler(\*STDIN, "read" => \&rec_stdin);
-
$Text::Wrap::Columns = $cols;
my $lastmin = 0;
-for (;;) {
- my $t;
- Msg->event_loop(1, 0.01);
- $t = time;
- if ($t > $lasttime) {
- my ($min)= (gmtime($t))[1];
- if ($min != $lastmin) {
- show_screen();
- $lastmin = $min;
- }
- $lasttime = $t;
- }
- my $ch = $bot->getch();
- if (@time && tv_interval(\@time, [gettimeofday]) >= 1) {
-# mydbg("Got Resize");
-# do_resize();
- next;
- }
- if (defined $ch) {
- if ($ch ne '-1') {
- rec_stdin($ch);
- }
- }
- $top->refresh() if $top->is_wintouched;
- $bot->refresh();
-}
-exit(0);
+
+$conn = IntMsg->connect($clusteraddr, $clusterport, \&rec_socket);
+$conn->{on_connect} = \&on_connect;
+$conn->{on_disconnect} = \&on_disconnect;
+
+$idle = Mojo::IOLoop->recurring(0.100 => \&idle_loop);
+Mojo::IOLoop->start;
+
+
+cease(0);