use strict;
use IO::Select;
use IO::Socket;
-#use DXDebug;
+use Carp;
-use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain);
+use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain %conns);
%rd_callbacks = ();
%wt_callbacks = ();
my $conn = {
rproc => $rproc,
inqueue => [],
+ outqueue => [],
state => 0,
lineend => "\r\n",
csort => 'telnet',
+ timeval => 60,
};
return bless $conn, $class;
}
+# save it
+sub conns
+{
+ my $pkg = shift;
+ my $call = shift;
+ my $ref;
+
+ if (ref $pkg) {
+ $call = $pkg->{call} unless $call;
+ return undef unless $call;
+ confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
+ $pkg->{call} = $call;
+ $ref = $conns{$call} = $pkg;
+ } else {
+ $ref = $conns{$call};
+ }
+ return $ref;
+}
+
+# this is only called by any dependent processes going away unexpectedly
+sub pid_gone
+{
+ my ($pkg, $pid) = @_;
+
+ my @pid = grep {$_->{pid} == $pid} values %conns;
+ for (@pid) {
+ if ($_->{rproc}) {
+ &{$_->{rproc}}($_, undef, "$pid has gorn");
+ } else {
+ $_->disconnect;
+ }
+ }
+}
+
#-----------------------------------------------------------------
# Send side routines
sub connect {
my ($pkg, $to_host, $to_port, $rproc) = @_;
+ # Create a connection end-point object
+ my $conn = $pkg;
+ unless (ref $pkg) {
+ $conn = $pkg->new($rproc);
+ }
+
# Create a new internet socket
my $sock = IO::Socket::INET->new (
PeerAddr => $to_host,
PeerPort => $to_port,
Proto => 'tcp',
- Reuse => 1);
+ Reuse => 1,
+ Timeout => $conn->{timeval} / 2);
return undef unless $sock;
- # Create a connection end-point object
- my $conn = $pkg;
- unless (ref $pkg) {
- $conn = $pkg->new($rproc);
- }
$conn->{sock} = $sock;
if ($conn->{rproc}) {
$conn->{state} = 'E';
delete $conn->{cmd};
$conn->{timeout}->del_timer if $conn->{timeout};
- return unless defined($sock);
+
+ # be careful to delete the correct one
+ if (my $call = $conn->{call}) {
+ my $ref = $conns{$call};
+ delete $conns{$call} if $ref && $ref == $conn;
+ }
+
set_event_handler ($sock, "read" => undef, "write" => undef);
+ unless ($^O =~ /^MS/i) {
+ kill 'TERM', $conn->{pid} if exists $conn->{pid};
+ }
+ return unless defined($sock);
shutdown($sock, 3);
close($sock);
}
my ($conn, $flush) = @_;
my $sock = $conn->{sock};
return unless defined($sock);
- my ($rq) = $conn->{outqueue};
+ my $rq = $conn->{outqueue};
# If $flush is set, set the socket to blocking, and send all
# messages in the queue - return only if there's an error
if ($bytes_read > 0) {
if ($msg =~ /\n/) {
@lines = split /\r?\n/, $msg;
- push @lines, ' ' unless @lines;
-
- $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
+ if (@lines) {
+ $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg};
+ } else {
+ $lines[0] = $conn->{msg} if exists $conn->{msg};
+ push @lines, '' unless @lines;
+ }
if ($msg =~ /\n$/) {
delete $conn->{msg};
} else {