X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=e167269363d6f674ac24a016ce2fbabea51a7c0b;hb=e9a4c82122720cd66da440e4ff6f88aaded8e146;hp=449f1790ca8dcb1b0156e06c31d402784264320a;hpb=586cbb347e7639f5575b48572e75140501a109c0;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 449f1790..e1672693 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -15,8 +15,6 @@ use IO::Select; use IO::Socket; use DXDebug; use Timer; -use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS); -use POSIX qw(F_GETFL F_SETFL O_NONBLOCK); use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns); @@ -28,6 +26,27 @@ $wt_handles = IO::Select->new(); $er_handles = IO::Select->new(); $now = time; +my $blocking_supported = 0; + +BEGIN { + # Checks if blocking is supported + eval { + require POSIX; POSIX->import(qw (F_SETFL F_GETFL O_NONBLOCK)); + }; + $blocking_supported = 1 unless $@; + + # import as many of these errno values as are available + eval { + require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK)); + }; +} + +my $w = $^W; +$^W = 0; +my $eagain = eval {EAGAIN()}; +my $einprogress = eval {EINPROGRESS()}; +my $ewouldblock = eval {EWOULDBLOCK()}; +$^W = $w; # #----------------------------------------------------------------- @@ -47,6 +66,7 @@ sub new lineend => "\r\n", csort => 'telnet', timeval => 60, + blocking => 0, }; $noconns++; @@ -62,8 +82,17 @@ sub set_error set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock}; } +sub set_rproc +{ + my $conn = shift; + my $callback = shift; + $conn->{rproc} = $callback; +} + sub blocking { + return unless $blocking_supported; + my $flags = fcntl ($_[0], F_GETFL, 0); if ($_[1]) { $flags &= ~O_NONBLOCK; @@ -126,18 +155,21 @@ sub connect { my $proto = getprotobyname('tcp'); $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef; - blocking($sock, 0); - my $ip = gethostbyname($to_host); - my $r = $sock->connect($to_port, $ip); - unless ($r) { - return undef unless $! == EINPROGRESS; + if ($conn->{blocking}) { + blocking($sock, 0); + $conn->{blocking} = 0; } + + my $ip = gethostbyname($to_host); +# my $r = $sock->connect($to_port, $ip); + my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); + return undef unless $r || _err_will_block($!); $conn->{sock} = $sock; if ($conn->{rproc}) { - my $callback = sub {_rcv($conn)}; - set_event_handler ($sock, "read" => $callback); + my $callback = sub {$conn->_rcv}; + set_event_handler ($sock, read => $callback); } return $conn; } @@ -149,9 +181,6 @@ sub disconnect { $conn->{disconnecting} = 1; my $sock = delete $conn->{sock}; $conn->{state} = 'E'; - delete $conn->{cmd}; - delete $conn->{eproc}; - delete $conn->{rproc}; $conn->{timeout}->del if $conn->{timeout}; # be careful to delete the correct one @@ -163,11 +192,19 @@ sub disconnect { $call ||= 'unallocated'; dbg('connll', "Connection $call disconnected"); - set_event_handler ($sock, read => undef, write => undef, error => undef); unless ($^O =~ /^MS/i) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } + + # get rid of any references + for (keys %$conn) { + if (ref($conn->{$_})) { + delete $conn->{$_}; + } + } + return unless defined($sock); + set_event_handler ($sock, read => undef, write => undef, error => undef); shutdown($sock, 3); close($sock); } @@ -183,7 +220,7 @@ sub send_later { $conn->enqueue($msg); my $sock = $conn->{sock}; return unless defined($sock); - set_event_handler ($sock, "write" => sub {$conn->_send(0)}); + set_event_handler ($sock, write => sub {$conn->_send(0)}); } sub enqueue { @@ -203,7 +240,10 @@ sub _send { # return to the event loop only after every message, or if it # is likely to block in the middle of a message. - blocking($sock, $flush); + if ($conn->{blocking} != $flush) { + blocking($sock, $flush); + $conn->{blocking} = $flush; + } my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { @@ -240,9 +280,9 @@ sub _send { } # Call me back if queue has not been drained. if (@$rq) { - set_event_handler ($sock, "write" => sub {$conn->_send(0)}); + set_event_handler ($sock, write => sub {$conn->_send(0)}); } else { - set_event_handler ($sock, "write" => undef); + set_event_handler ($sock, write => undef); if (exists $conn->{close_on_empty}) { &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect; @@ -251,8 +291,25 @@ sub _send { 1; # Success } +sub dup_sock +{ + my $conn = shift; + my $oldsock = $conn->{sock}; + my $rc = $rd_callbacks{$oldsock}; + my $wc = $wt_callbacks{$oldsock}; + my $ec = $er_callbacks{$oldsock}; + my $sock = $oldsock->new_from_fd($oldsock, "w+"); + if ($sock) { + set_event_handler($oldsock, read=>undef, write=>undef, error=>undef); + $conn->{sock} = $sock; + set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec); + $oldsock->close; + } +} + sub _err_will_block { - return ($_[0] == EAGAIN || $_[0] == EWOULDBLOCK || $_[0] == EINPROGRESS); + return 0 unless $blocking_supported; + return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress); } sub close_on_empty @@ -276,7 +333,7 @@ sub new_server { Proto => 'tcp', Reuse => 1); die "Could not create socket: $! \n" unless $self->{sock}; - set_event_handler ($self->{sock}, "read" => sub { $self->new_client } ); + set_event_handler ($self->{sock}, read => sub { $self->new_client } ); return $self; } @@ -305,7 +362,10 @@ sub _rcv { # Complement to _send return unless defined($sock); my @lines; - blocking($sock, 0); + if ($conn->{blocking}) { + blocking($sock, 0); + $conn->{blocking} = 0; + } $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { @@ -321,8 +381,8 @@ sub _rcv { # Complement to _send FINISH: if (defined $bytes_read && $bytes_read == 0) { - &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; - $conn->disconnect(); + &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc}; + $conn->disconnect; } else { $conn->dequeue if exists $conn->{msg}; } @@ -337,12 +397,12 @@ sub new_client { $conn->{sort} = 'Incoming'; if ($eproc) { $conn->{eproc} = $eproc; - set_event_handler ($sock, "error" => $eproc); + set_event_handler ($sock, error => $eproc); } if ($rproc) { $conn->{rproc} = $rproc; - my $callback = sub {_rcv($conn)}; - set_event_handler ($sock, "read" => $callback); + my $callback = sub {$conn->_rcv}; + set_event_handler ($sock, read => $callback); } else { # Login failed &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; $conn->disconnect(); @@ -352,7 +412,7 @@ sub new_client { sub close_server { my $conn = shift; - set_event_handler ($conn->{sock}, "read" => undef); + set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef ); $conn->{sock}->close; } @@ -364,6 +424,7 @@ sub close_all_clients } } +# #---------------------------------------------------- # Event loop routines used by both client and server @@ -412,7 +473,6 @@ sub event_loop { last unless ($rd_handles->count() || $wt_handles->count()); ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); - $now = time; foreach $e (@$eset) { &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; @@ -432,6 +492,15 @@ sub event_loop { } } +sub sleep +{ + my ($pkg, $interval) = @_; + my $now = time; + while (time - $now < $interval) { + $pkg->event_loop(10, 0.01); + } +} + sub DESTROY { my $conn = shift;