X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=7d5b407213ec6bad7c6b743174d9fe84afca88e1;hb=4a8336156c820698be4933a3e36513750709926d;hp=02795812e40bc5da3445953b0e56e382ff46ef27;hpb=6616898c4373267522bcacca20e58157bd5a538b;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 02795812..7d5b4072 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -13,9 +13,9 @@ package Msg; use strict; use IO::Select; use IO::Socket; -#use DXDebug; +use Carp; -use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain); +use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain %conns); %rd_callbacks = (); %wt_callbacks = (); @@ -47,33 +47,71 @@ sub new my $conn = { rproc => $rproc, inqueue => [], + outqueue => [], state => 0, lineend => "\r\n", csort => 'telnet', + timeval => 60, }; return bless $conn, $class; } +# save it +sub conns +{ + my $pkg = shift; + my $call = shift; + my $ref; + + if (ref $pkg) { + $call = $pkg->{call} unless $call; + return undef unless $call; + confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call}; + $pkg->{call} = $call; + $ref = $conns{$call} = $pkg; + } else { + $ref = $conns{$call}; + } + return $ref; +} + +# this is only called by any dependent processes going away unexpectedly +sub pid_gone +{ + my ($pkg, $pid) = @_; + + my @pid = grep {$_->{pid} == $pid} values %conns; + for (@pid) { + if ($_->{rproc}) { + &{$_->{rproc}}($_, undef, "$pid has gorn"); + } else { + $_->disconnect; + } + } +} + #----------------------------------------------------------------- # Send side routines sub connect { my ($pkg, $to_host, $to_port, $rproc) = @_; + # Create a connection end-point object + my $conn = $pkg; + unless (ref $pkg) { + $conn = $pkg->new($rproc); + } + # Create a new internet socket my $sock = IO::Socket::INET->new ( PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp', - Reuse => 1); + Reuse => 1, + Timeout => $conn->{timeval} / 2); return undef unless $sock; - # Create a connection end-point object - my $conn = $pkg; - unless (ref $pkg) { - $conn = $pkg->new($rproc); - } $conn->{sock} = $sock; if ($conn->{rproc}) { @@ -89,8 +127,18 @@ sub disconnect { $conn->{state} = 'E'; delete $conn->{cmd}; $conn->{timeout}->del_timer if $conn->{timeout}; - return unless defined($sock); + + # be careful to delete the correct one + if (my $call = $conn->{call}) { + my $ref = $conns{$call}; + delete $conns{$call} if $ref && $ref == $conn; + } + set_event_handler ($sock, "read" => undef, "write" => undef); + unless ($^O =~ /^MS/i) { + kill 'TERM', $conn->{pid} if exists $conn->{pid}; + } + return unless defined($sock); shutdown($sock, 3); close($sock); } @@ -118,7 +166,7 @@ sub _send { my ($conn, $flush) = @_; my $sock = $conn->{sock}; return unless defined($sock); - my ($rq) = $conn->{outqueue}; + my $rq = $conn->{outqueue}; # If $flush is set, set the socket to blocking, and send all # messages in the queue - return only if there's an error @@ -244,8 +292,12 @@ sub _rcv { # Complement to _send if ($bytes_read > 0) { if ($msg =~ /\n/) { @lines = split /\r?\n/, $msg; - $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg}; - push @lines, ' ' unless @lines; + if (@lines) { + $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg}; + } else { + $lines[0] = $conn->{msg} if exists $conn->{msg}; + push @lines, '' unless @lines; + } if ($msg =~ /\n$/) { delete $conn->{msg}; } else {