X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=ec07d61da6878851f6b8f11f19aba403e5729cbd;hb=0bbd79b480f385485ce6b0be25f9dee47e955eb1;hp=c730773aff702f9c1cbb4d79cad00932dd2f5036;hpb=2f1b948ea733e0ece1909a31987dc8f03044e851;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index c730773a..ec07d61d 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -15,8 +15,6 @@ use IO::Select; use IO::Socket; use DXDebug; use Timer; -use Errno qw(EWOULDBLOCK EAGAIN EINPROGRESS); -use POSIX qw(F_GETFL F_SETFL O_NONBLOCK); use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns); @@ -28,6 +26,27 @@ $wt_handles = IO::Select->new(); $er_handles = IO::Select->new(); $now = time; +my $blocking_supported = 0; + +BEGIN { + # Checks if blocking is supported + eval { + require POSIX; POSIX->import(qw (F_SETFL F_GETFL O_NONBLOCK)); + }; + $blocking_supported = 1 unless $@; + + # import as many of these errno values as are available + eval { + require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK)); + }; +} + +my $w = $^W; +$^W = 0; +my $eagain = eval {EAGAIN()}; +my $einprogress = eval {EINPROGRESS()}; +my $ewouldblock = eval {EWOULDBLOCK()}; +$^W = $w; # #----------------------------------------------------------------- @@ -71,6 +90,8 @@ sub set_rproc sub blocking { + return unless $blocking_supported; + my $flags = fcntl ($_[0], F_GETFL, 0); if ($_[1]) { $flags &= ~O_NONBLOCK; @@ -135,10 +156,9 @@ sub connect { blocking($sock, 0); my $ip = gethostbyname($to_host); - my $r = $sock->connect($to_port, $ip); - unless ($r) { - return undef unless $! == EINPROGRESS; - } +# my $r = $sock->connect($to_port, $ip); + my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); + return undef unless $r || _err_will_block($!); $conn->{sock} = $sock; @@ -167,8 +187,6 @@ sub disconnect { $call ||= 'unallocated'; dbg('connll', "Connection $call disconnected"); - set_event_handler ($sock, read => undef, write => undef, error => undef); - unless ($^O =~ /^MS/i) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } @@ -181,6 +199,7 @@ sub disconnect { } return unless defined($sock); + set_event_handler ($sock, read => undef, write => undef, error => undef); shutdown($sock, 3); close($sock); } @@ -264,8 +283,25 @@ sub _send { 1; # Success } +sub dup_sock +{ + my $conn = shift; + my $oldsock = $conn->{sock}; + my $rc = $rd_callbacks{$oldsock}; + my $wc = $wt_callbacks{$oldsock}; + my $ec = $er_callbacks{$oldsock}; + my $sock = $oldsock->new_from_fd($oldsock, "w+"); + if ($sock) { + set_event_handler($oldsock, read=>undef, write=>undef, error=>undef); + $conn->{sock} = $sock; + set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec); + $oldsock->close; + } +} + sub _err_will_block { - return ($_[0] == EAGAIN || $_[0] == EWOULDBLOCK || $_[0] == EINPROGRESS); + return 0 unless $blocking_supported; + return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress); } sub close_on_empty @@ -425,7 +461,6 @@ sub event_loop { last unless ($rd_handles->count() || $wt_handles->count()); ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); - $now = time; foreach $e (@$eset) { &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};