X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=1c86c70ddd170a3a55b093dafd82087b0a2f5dd0;hb=abc381a196bb683f51f875de04d1ec35008e72ab;hp=d62cb744034377f54592a3a7233571f9e88f842d;hpb=06a6935d583a684869129350fed170d467ad8acc;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index d62cb744..1c86c70d 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -20,14 +20,14 @@ use Mojo::IOLoop::Stream; use DXDebug; use Timer; -use vars qw($now %conns $noconns $cnum $total_in $total_out); +use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout); $total_in = $total_out = 0; $now = time; $cnum = 0; - +$connect_timeout = 5; # #----------------------------------------------------------------- @@ -61,7 +61,7 @@ sub set_error { my $conn = shift; my $callback = shift; - $conn->{sock}->on(error => sub {my ($stream, $err) = @_; $callback->($conn, $err);}); + $conn->{sock}->on(error => sub {$callback->($conn, $_[1]);}); } sub set_on_eof @@ -128,12 +128,40 @@ sub peerhost #----------------------------------------------------------------- # Send side routines -sub connect { - my ($pkg, $to_host, $to_port, $rproc) = @_; +sub _on_connect +{ + my $conn = shift; + my $handle = shift; + undef $conn->{sock}; + my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle); + $sock->on(read => sub {$conn->_rcv($_[1]);} ); + $sock->on(error => sub {$conn->disconnect;}); + $sock->on(close => sub {$conn->disconnect;}); + $sock->timeout(0); + $sock->start; + $conn->{peerhost} = eval { $handle->peerhost; }; + dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll'); + if ($conn->{on_connect}) { + &{$conn->{on_connect}}($conn, $handle); + } +} + +sub is_connected +{ + my $conn = shift; + my $sock = $conn->{sock}; + return ref $sock && $sock->isa('Mojo::IOLoop::Stream'); +} + +sub connect { + my ($pkg, $to_host, $to_port, %args) = @_; + my $timeout = delete $args{timeout} || $connect_timeout; + # Create a connection end-point object my $conn = $pkg; unless (ref $pkg) { + my $rproc = delete $args{rproc}; $conn = $pkg->new($rproc); } $conn->{peerhost} = $to_host; @@ -144,17 +172,17 @@ sub connect { my $sock; $conn->{sock} = $sock = Mojo::IOLoop::Client->new; - $sock->on(connect => sub { dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll');}, - error => {$conn->disconnect}, - close => {$conn->disconnect}); + $sock->on(connect => sub {$conn->_on_connect($_[1])} ); + $sock->on(error => sub {&{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; $conn->disconnect}); + $sock->on(close => sub {$conn->disconnect}); + + # copy any args like on_connect, on_disconnect etc + while (my ($k, $v) = each %args) { + $conn->{$k} = $v; + } - $sock->connect(address => $to_host, port => $to_port); + $sock->connect(address => $to_host, port => $to_port, timeout => $timeout); - dbg((ref $conn) . " connected $conn->{cnum} to $to_host:$to_port") if isdbg('connll'); - - if ($conn->{rproc}) { - $sock->on(read => sub {my ($stream, $msg) = @_; $conn->_rcv($msg);} ); - } return $conn; } @@ -226,6 +254,10 @@ sub disconnect $call ||= 'unallocated'; dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll'); + if ($conn->{on_disconnect}) { + &{$conn->{on_disconnect}}($conn); + } + # get rid of any references for (keys %$conn) { if (ref($conn->{$_})) { @@ -234,7 +266,7 @@ sub disconnect } if (defined($sock)) { - $sock->remove; + $sock->close_gracefully; } unless ($main::is_win) { @@ -256,7 +288,7 @@ sub _send_stuff dbgdump('raw', "$call send $lth: ", $lth); } } - if (defined $sock && !$sock->destroyed) { + if (defined $sock) { $sock->write($data); $total_out = $lth; } else { @@ -275,6 +307,13 @@ sub send_later { goto &send_now; } +sub send_raw +{ + my ($conn, $msg) = @_; + push @{$conn->{outqueue}}, $msg; + _send_stuff($conn); +} + sub enqueue { my $conn = shift; push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : ''; @@ -300,9 +339,10 @@ sub new_server my ($pkg, $my_host, $my_port, $login_proc) = @_; my $conn = $pkg->new($login_proc); - $conn->{sock} = Mojo::IOLoop::Server->new; - $conn->{sock}->on(accept=>sub{$conn->new_client()}); - $conn->{sock}->listen(address=>$my_host, port=>$my_port); + my $sock = $conn->{sock} = Mojo::IOLoop::Server->new; + $sock->on(accept=>sub{$conn->new_client($_[1]);}); + $sock->listen(address=>$my_host, port=>$my_port); + $sock->start; die "Could not create socket: $! \n" unless $conn->{sock}; return $conn; @@ -335,60 +375,37 @@ sub dequeue sub _rcv { # Complement to _send my $conn = shift; # $rcv_now complement of $flush - # Find out how much has already been received, if at all - my ($msg, $offset, $bytes_to_read, $bytes_read); + my $msg = shift; my $sock = $conn->{sock}; return unless defined($sock); my @lines; -# if ($conn->{blocking}) { -# blocking($sock, 0); -# $conn->{blocking} = 0; -# } - $bytes_read = sysread ($sock, $msg, 1024, 0); - if (defined ($bytes_read)) { - if ($bytes_read > 0) { - $total_in += $bytes_read; - if (isdbg('raw')) { - my $call = $conn->{call} || 'none'; - dbgdump('raw', "$call read $bytes_read: ", $msg); - } - if ($conn->{echo}) { - my @ch = split //, $msg; - my $out; - for (@ch) { - if (/[\cH\x7f]/) { - $out .= "\cH \cH"; - $conn->{msg} =~ s/.$//; - } else { - $out .= $_; - $conn->{msg} .= $_; - } - } - if (defined $out) { - set_event_handler ($sock, write => sub{$conn->_send(0)}); - push @{$conn->{outqueue}}, $out; + if (isdbg('raw')) { + my $call = $conn->{call} || 'none'; + my $lth = length $msg; + dbgdump('raw', "$call read $lth: ", $msg); + } + if ($conn->{echo}) { + my @ch = split //, $msg; + my $out; + for (@ch) { + if (/[\cH\x7f]/) { + $out .= "\cH \cH"; + $conn->{msg} =~ s/.$//; + } else { + $out .= $_; + $conn->{msg} .= $_; } - } else { - $conn->{msg} .= $msg; } - } + if (defined $out) { + $conn->send_raw($out); + } } else { - if (_err_will_block($!)) { - return ; - } else { - $bytes_read = 0; - } - } + $conn->{msg} .= $msg; + } -FINISH: - if (defined $bytes_read && $bytes_read == 0) { - &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc}; - $conn->disconnect; - } else { - unless ($conn->{disable_read}) { - $conn->dequeue if exists $conn->{msg}; - } + unless ($conn->{disable_read}) { + $conn->dequeue if exists $conn->{msg}; } } @@ -399,6 +416,8 @@ sub new_client { my $conn = $server_conn->new($server_conn->{rproc}); my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client); $sock->on(read => sub {$conn->_rcv($_[1])}); + $sock->timeout(0); + $sock->start; dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll'); my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport); @@ -412,6 +431,7 @@ sub new_client { &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect(); } + return $conn; } sub close_server