X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=00569928165bb2a630862b4d1fd9603ee05f6d8a;hb=7046b8ba37863c3040cee17e46d100675e720eaf;hp=5aef961ee4f4162593228d0c1abd71a41dd52fef;hpb=60bd15a823797c01182ebfb8b6b3a5bf10065b42;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 5aef961e..00569928 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -14,7 +14,7 @@ use strict; use vars qw($VERSION $BRANCH); $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ ); -$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0; +$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ || (0,0)); $main::build += $VERSION; $main::branch += $BRANCH; @@ -23,7 +23,7 @@ use IO::Socket; use DXDebug; use Timer; -use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum); +use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out); %rd_callbacks = (); %wt_callbacks = (); @@ -31,12 +31,14 @@ use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $e $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); $er_handles = IO::Select->new(); +$total_in = $total_out = 0; $now = time; BEGIN { # Checks if blocking is supported eval { + local $^W; require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL)) }; if ($@ || $main::is_win) { @@ -50,12 +52,14 @@ BEGIN { # import as many of these errno values as are available eval { + local $^W; require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK)); }; unless ($^O eq 'MSWin32') { if ($] >= 5.6) { eval { + local $^W; require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY)); }; } else { @@ -72,7 +76,9 @@ BEGIN { eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };'; eval '*F_GETFL = sub { 0 };'; eval '*F_SETFL = sub { 0 };'; - $blocking_supported = 1; + eval '*IPPROTO_TCP = sub { 6 };'; + eval '*TCP_NODELAY = sub { 1 };'; + $blocking_supported = 0; # it appears that this DOESN'T work :-( } } @@ -207,8 +213,10 @@ sub connect { blocking($sock, 0); $conn->{blocking} = 0; + # does the host resolve? my $ip = gethostbyname($to_host); -# my $r = $sock->connect($to_port, $ip); + return undef unless $ip; + my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); return undef unless $r || _err_will_block($!); @@ -221,7 +229,57 @@ sub connect { return $conn; } -sub disconnect { +sub start_program +{ + my ($conn, $line, $sort) = @_; + my $pid; + + local $^F = 10000; # make sure it ain't closed on exec + my ($a, $b) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC); + if ($a && $b) { + $a->autoflush(1); + $b->autoflush(1); + $pid = fork; + if (defined $pid) { + if ($pid) { + close $b; + $conn->{sock} = $a; + $conn->{csort} = $sort; + $conn->{lineend} = "\cM" if $sort eq 'ax25'; + $conn->{pid} = $pid; + if ($conn->{rproc}) { + my $callback = sub {$conn->_rcv}; + Msg::set_event_handler ($a, read => $callback); + } + dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect'); + } else { + $^W = 0; + dbgclose(); + STDIN->close; + STDOUT->close; + STDOUT->close; + *STDIN = IO::File->new_from_fd($b, 'r') or die; + *STDOUT = IO::File->new_from_fd($b, 'w') or die; + *STDERR = IO::File->new_from_fd($b, 'w') or die; + close $a; + unless ($main::is_win) { + # $SIG{HUP} = 'IGNORE'; + $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT'; + alarm(0); + } + exec "$line" or dbg("exec '$line' failed $!"); + } + } else { + dbg("cannot fork for $line"); + } + } else { + dbg("no socket pair $! for $line"); + } + return $pid; +} + +sub disconnect +{ my $conn = shift; return if exists $conn->{disconnecting}; @@ -255,7 +313,6 @@ sub disconnect { unless ($main::is_win) { kill 'TERM', $conn->{pid} if exists $conn->{pid}; } - } sub send_now { @@ -321,6 +378,7 @@ sub _send { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call send $bytes_written: ", $msg); } + $total_out += $bytes_written; $offset += $bytes_written; $bytes_to_write -= $bytes_written; } @@ -392,26 +450,26 @@ sub nolinger { my $conn = shift; - if (isdbg('sock')) { - my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); - my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); - my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); - dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); - } - - setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0)) or confess "setsockopt linger: $!"; - setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1) or confess "setsockopt keepalive: $!"; unless ($main::is_win) { - setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1) or confess "setsockopt: $!"; + if (isdbg('sock')) { + my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); + my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); + my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); + dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); + } + + eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1)} or dbg("setsockopt keepalive: $!"); + eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0))} or dbg("setsockopt linger: $!"); + eval {setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)} or eval {setsockopt($conn->{sock}, SOL_SOCKET, TCP_NODELAY, 1)} or dbg("setsockopt tcp_nodelay: $!"); + $conn->{sock}->autoflush(0); + + if (isdbg('sock')) { + my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); + my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); + my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); + dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); + } } - $conn->{sock}->autoflush(0); - - if (isdbg('sock')) { - my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); - my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); - my $n = unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); - dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); - } } sub dequeue @@ -446,6 +504,7 @@ sub _rcv { # Complement to _send $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { + $total_in += $bytes_read; if (isdbg('raw')) { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call read $bytes_read: ", $msg); @@ -580,24 +639,35 @@ sub set_event_handler { } sub event_loop { - my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once + my ($pkg, $loop_count, $timeout, $wronly) = @_; # event_loop(1) to process events once my ($conn, $r, $w, $e, $rset, $wset, $eset); while (1) { # Quit the loop if no handles left to process - last unless ($rd_handles->count() || $wt_handles->count()); + if ($wronly) { + last unless $wt_handles->count(); - ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); - - foreach $e (@$eset) { - &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; - } - foreach $r (@$rset) { - &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r}; - } - foreach $w (@$wset) { - &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; - } + ($rset, $wset, $eset) = IO::Select->select(undef, $wt_handles, undef, $timeout); + + foreach $w (@$wset) { + &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; + } + } else { + + last unless ($rd_handles->count() || $wt_handles->count()); + + ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); + + foreach $e (@$eset) { + &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; + } + foreach $r (@$rset) { + &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r}; + } + foreach $w (@$wset) { + &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; + } + } Timer::handler;