X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=d1f2589b0aabb58d96af1d44a7b6f6b51d459bdf;hb=f27e9460a85b5ba3ec8b51d14808220023b70917;hp=839b5e453313e05db16f6f536ba26d3e4b1cdd31;hpb=2bbb601161d60a41289bd19b73fc35c24d5c6b71;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 839b5e45..d1f2589b 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -15,10 +15,8 @@ use IO::Select; use IO::Socket; use DXDebug; use Timer; -use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS); -use POSIX qw(F_GETFL F_SETFL O_NONBLOCK); -use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns); +use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported); %rd_callbacks = (); %wt_callbacks = (); @@ -29,6 +27,33 @@ $er_handles = IO::Select->new(); $now = time; +BEGIN { + # Checks if blocking is supported + eval { + require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL)) + }; + if ($@ || $main::is_win) { + print STDERR "POSIX Blocking *** NOT *** supported $@\n"; + $blocking_supported = 0; + } else { + $blocking_supported = 1; + print STDERR "POSIX Blocking enabled\n"; + } + + + # import as many of these errno values as are available + eval { + require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK)); + }; +} + +my $w = $^W; +$^W = 0; +my $eagain = eval {EAGAIN()}; +my $einprogress = eval {EINPROGRESS()}; +my $ewouldblock = eval {EWOULDBLOCK()}; +$^W = $w; + # #----------------------------------------------------------------- # Generalised initializer @@ -47,6 +72,7 @@ sub new lineend => "\r\n", csort => 'telnet', timeval => 60, + blocking => 0, }; $noconns++; @@ -71,6 +97,8 @@ sub set_rproc sub blocking { + return unless $blocking_supported; + my $flags = fcntl ($_[0], F_GETFL, 0); if ($_[1]) { $flags &= ~O_NONBLOCK; @@ -134,12 +162,12 @@ sub connect { $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef; blocking($sock, 0); + $conn->{blocking} = 0; + my $ip = gethostbyname($to_host); # my $r = $sock->connect($to_port, $ip); my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); - unless ($r) { - return undef unless $! == EINPROGRESS; - } + return undef unless $r || _err_will_block($!); $conn->{sock} = $sock; @@ -168,7 +196,7 @@ sub disconnect { $call ||= 'unallocated'; dbg('connll', "Connection $call disconnected"); - unless ($^O =~ /^MS/i) { + unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } @@ -216,7 +244,10 @@ sub _send { # return to the event loop only after every message, or if it # is likely to block in the middle of a message. - blocking($sock, $flush); + if ($conn->{blocking} != $flush) { + blocking($sock, $flush); + $conn->{blocking} = $flush; + } my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { @@ -264,8 +295,25 @@ sub _send { 1; # Success } +sub dup_sock +{ + my $conn = shift; + my $oldsock = $conn->{sock}; + my $rc = $rd_callbacks{$oldsock}; + my $wc = $wt_callbacks{$oldsock}; + my $ec = $er_callbacks{$oldsock}; + my $sock = $oldsock->new_from_fd($oldsock, "w+"); + if ($sock) { + set_event_handler($oldsock, read=>undef, write=>undef, error=>undef); + $conn->{sock} = $sock; + set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec); + $oldsock->close; + } +} + sub _err_will_block { - return ($_[0] == EAGAIN || $_[0] == EWOULDBLOCK || $_[0] == EINPROGRESS); + return 0 unless $blocking_supported; + return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress); } sub close_on_empty @@ -318,7 +366,10 @@ sub _rcv { # Complement to _send return unless defined($sock); my @lines; - blocking($sock, 0); + if ($conn->{blocking}) { + blocking($sock, 0); + $conn->{blocking} = 0; + } $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { @@ -344,22 +395,28 @@ FINISH: sub new_client { my $server_conn = shift; my $sock = $server_conn->{sock}->accept(); - my $conn = $server_conn->new($server_conn->{rproc}); - $conn->{sock} = $sock; - my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport()); - $conn->{sort} = 'Incoming'; - if ($eproc) { - $conn->{eproc} = $eproc; - set_event_handler ($sock, error => $eproc); + if ($sock) { + my $conn = $server_conn->new($server_conn->{rproc}); + $conn->{sock} = $sock; + blocking($sock, 0); + $conn->{blocking} = 0; + my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport()); + $conn->{sort} = 'Incoming'; + if ($eproc) { + $conn->{eproc} = $eproc; + set_event_handler ($sock, error => $eproc); + } + if ($rproc) { + $conn->{rproc} = $rproc; + my $callback = sub {$conn->_rcv}; + set_event_handler ($sock, read => $callback); + } else { # Login failed + &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; + $conn->disconnect(); + } + } else { + dbg('err', "Msg: error on accept ($!)"); } - if ($rproc) { - $conn->{rproc} = $rproc; - my $callback = sub {$conn->_rcv}; - set_event_handler ($sock, read => $callback); - } else { # Login failed - &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc}; - $conn->disconnect(); - } } sub close_server @@ -377,6 +434,7 @@ sub close_all_clients } } +# #---------------------------------------------------- # Event loop routines used by both client and server @@ -424,7 +482,7 @@ sub event_loop { # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); - ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); + ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); foreach $e (@$eset) { &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; @@ -444,6 +502,15 @@ sub event_loop { } } +sub sleep +{ + my ($pkg, $interval) = @_; + my $now = time; + while (time - $now < $interval) { + $pkg->event_loop(10, 0.01); + } +} + sub DESTROY { my $conn = shift;