#
# I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
#
-# $Id$
+#
#
package Msg;
use strict;
-use IO::Select;
-use IO::Socket;
-#use DXDebug;
-
-use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
-
-%rd_callbacks = ();
-%wt_callbacks = ();
-$rd_handles = IO::Select->new();
-$wt_handles = IO::Select->new();
-my $blocking_supported = 0;
-
-BEGIN {
- # Checks if blocking is supported
- eval {
- require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
+
+use DXUtil;
+
+use Mojo::IOLoop;
+use Mojo::IOLoop::Stream;
+
+use DXDebug;
+use DXTimer;
+
+use vars qw($now %conns $noconns $cnum $total_in $total_out $total_lines_in $total_lines_out $connect_timeout $disc_waittime);
+
+$total_in = $total_out = 0;
+$total_lines_in = $total_lines_out = 0;
+
+$now = time;
+
+$cnum = 0;
+$connect_timeout = 5;
+$disc_waittime = 1.5;
+
+our %delqueue;
+
+#
+#-----------------------------------------------------------------
+# Generalised initializer
+
+sub new
+{
+ my ($pkg, $rproc) = @_;
+ my $obj = ref($pkg);
+ my $class = $obj || $pkg;
+
+ my $conn = {
+ rproc => $rproc,
+ inqueue => [],
+ outqueue => [],
+ state => 0,
+ lineend => "\r\n",
+ csort => 'telnet',
+ timeval => 60,
+ blocking => 0,
+ cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
+ linesin => 0,
+ linesout => 0,
+ datain => 0,
+ dataout => 0,
};
- $blocking_supported = 1 unless $@;
+
+ $noconns++;
+
+ dbg("$class Connection created (total $noconns)") if isdbg('connll');
+ return bless $conn, $class;
+}
+
+sub set_error
+{
+ my $conn = shift;
+ my $callback = shift;
+ $conn->{sock}->on(error => sub {$callback->($_[1]);});
+}
+
+sub set_on_eof
+{
+ my $conn = shift;
+ my $callback = shift;
+ $conn->{sock}->on(close => sub {$callback->()});
+}
+
+sub set_rproc
+{
+ my $conn = shift;
+ my $callback = shift;
+ $conn->{rproc} = $callback;
+}
+
+# save it
+sub conns
+{
+ my $pkg = shift;
+ my $call = shift;
+ my $ref;
+
+ if (ref $pkg) {
+ $call = $pkg->{call} unless $call;
+ return undef unless $call;
+ dbg((ref $pkg) . " changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
+ delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call;
+ $pkg->{call} = $call;
+ $ref = $conns{$call} = $pkg;
+ dbg((ref $pkg) . " Connection $pkg->{cnum} $call stored") if isdbg('connll');
+ } else {
+ $ref = $conns{$call};
+ }
+ return $ref;
+}
+
+# this is only called by any dependent processes going away unexpectedly
+sub pid_gone
+{
+ my ($pkg, $pid) = @_;
+
+ my @pid = grep {$_->{pid} == $pid} values %conns;
+ foreach my $p (@pid) {
+ &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
+ $p->disconnect;
+ }
+}
+
+sub ax25
+{
+ my $conn = shift;
+ return $conn->{csort} eq 'ax25';
+}
+
+sub peerhost
+{
+ my $conn = shift;
+ unless ($conn->{peerhost}) {
+ $conn->{peerhost} ||= 'ax25' if $conn->ax25;
+ $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
+ $conn->{peerhost} ||= 'UNKNOWN';
+ }
+ return $conn->{peerhost};
}
#-----------------------------------------------------------------
# Send side routines
-sub connect {
- my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
-
- # Create a new internet socket
-
- my $sock = IO::Socket::INET->new (
- PeerAddr => $to_host,
- PeerPort => $to_port,
- Proto => 'tcp',
- Reuse => 1);
-
- return undef unless $sock;
+sub _on_connect
+{
+ my $conn = shift;
+ my $handle = shift;
+ undef $conn->{sock};
+ my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+ $sock->on(read => sub {$conn->_rcv($_[1]);} );
+ $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;});
+ $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;});
+ $sock->timeout(0);
+ $sock->start;
+ $conn->{peerhost} = eval { $handle->peerhost; };
+ dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg ('connect');
+ if ($conn->{on_connect}) {
+ &{$conn->{on_connect}}($conn, $handle);
+ }
+}
+
+sub is_connected
+{
+ my $conn = shift;
+ my $sock = $conn->{sock};
+ return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
+}
+
+sub connect {
+ my ($pkg, $to_host, $to_port, %args) = @_;
+ my $timeout = delete $args{timeout} || $connect_timeout;
+
# Create a connection end-point object
- my $conn = {
- sock => $sock,
- rcvd_notification_proc => $rcvd_notification_proc,
- };
-
- if ($rcvd_notification_proc) {
- my $callback = sub {_rcv($conn)};
- set_event_handler ($sock, "read" => $callback);
- }
- return bless $conn, $pkg;
+ my $conn = $pkg;
+ unless (ref $pkg) {
+ my $rproc = delete $args{rproc};
+ $conn = $pkg->new($rproc);
+ }
+ $conn->{peerhost} = $to_host;
+ $conn->{peerport} = $to_port;
+ $conn->{sort} = 'Outgoing';
+
+ dbg((ref $conn) . " connecting $conn->{cnum} to $to_host:$to_port") if isdbg('connll');
+
+ my $sock;
+ $conn->{sock} = $sock = Mojo::IOLoop::Client->new;
+ $sock->on(connect => sub {
+ $conn->_on_connect($_[1])
+ } );
+ $sock->on(error => sub {
+ &{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc};
+ delete $conn->{sock};
+ $conn->disconnect
+ });
+ $sock->on(close => sub {
+ delete $conn->{sock};
+ $conn->disconnect}
+ );
+
+ # copy any args like on_connect, on_disconnect etc
+ while (my ($k, $v) = each %args) {
+ $conn->{$k} = $v;
+ }
+
+ $sock->connect(address => $to_host, port => $to_port, timeout => $timeout);
+
+ return $conn;
}
-sub disconnect {
+sub start_program
+{
+ my ($conn, $line, $sort) = @_;
+ my $pid;
+
+# local $^F = 10000; # make sure it ain't closed on exec
+# my ($a, $b) = $io_socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
+# if ($a && $b) {
+# $a->autoflush(1);
+# $b->autoflush(1);
+# $pid = fork;
+# if (defined $pid) {
+# if ($pid) {
+# close $b;
+# $conn->{sock} = $a;
+# $conn->{csort} = $sort;
+# $conn->{lineend} = "\cM" if $sort eq 'ax25';
+# $conn->{pid} = $pid;
+# if ($conn->{rproc}) {
+# my $callback = sub {$conn->_rcv};
+# Msg::set_event_handler ($a, read => $callback);
+# }
+# dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect');
+# } else {
+# $^W = 0;
+# dbgclose();
+# STDIN->close;
+# STDOUT->close;
+# STDOUT->close;
+# *STDIN = IO::File->new_from_fd($b, 'r') or die;
+# *STDOUT = IO::File->new_from_fd($b, 'w') or die;
+# *STDERR = IO::File->new_from_fd($b, 'w') or die;
+# close $a;
+# unless ($main::is_win) {
+# # $SIG{HUP} = 'IGNORE';
+# $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT';
+# alarm(0);
+# }
+# exec "$line" or dbg("exec '$line' failed $!");
+# }
+# } else {
+# dbg("cannot fork for $line");
+# }
+# } else {
+# dbg("no socket pair $! for $line");
+# }
+ return $pid;
+}
+
+sub disconnect
+{
+ my $conn = shift;
+ my $count = $conn->{disconnecting}++;
+ my $dbg = isdbg('connll');
+ my ($pkg, $fn, $line) = caller if $dbg;
+
+ if ($count >= 2) {
+ dbgtrace((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line FORCING CLOSE ") if $dbg;
+ _close_it($conn);
+ return;
+ }
+ dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ") if $dbg;
+ return if $count;
+
+ # remove this conn from the active queue
+ # be careful to delete the correct one
+ my $call;
+ if ($call = $conn->{call}) {
+ my $ref = $conns{$call};
+ delete $conns{$call} if $ref && $ref == $conn;
+ }
+ $call ||= 'unallocated';
+
+ $delqueue{$conn} = $conn; # save this connection until everything is finished
+ my $sock = $conn->{sock};
+ if ($sock) {
+ if ($sock->{buffer}) {
+ my $lth = length $sock->{buffer};
+ Mojo::IOLoop->timer($disc_waittime, sub {
+ dbg("Buffer contained $lth characters, coordinated for $disc_waittime secs, now disconnecting $call") if $dbg;
+ _close_it($conn);
+ });
+ } else {
+ dbg("Buffer empty, just close $call") if $dbg;
+ _close_it($conn);
+ }
+ }
+ else {
+ dbg((ref $conn) . " socket missing on $conn->{call}") if $dbg;
+ _close_it($conn);
+ }
+}
+
+sub _close_it
+{
my $conn = shift;
my $sock = delete $conn->{sock};
- return unless defined($sock);
- set_event_handler ($sock, "read" => undef, "write" => undef);
- shutdown($sock, 3);
- close($sock);
+ $conn->{state} = 'E';
+ $conn->{timeout}->del if $conn->{timeout};
+
+ my $call = $conn->{call};
+
+ if (isdbg('connll')) {
+ my ($pkg, $fn, $line) = caller;
+ dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
+ }
+
+
+ dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll');
+
+ if ($conn->{on_disconnect}) {
+ &{$conn->{on_disconnect}}($conn);
+ }
+
+ if ($sock) {
+ dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll');
+ $sock->close_gracefully if $sock->can('close_gracefully');
+ }
+
+ # get rid of any references
+ for (keys %$conn) {
+ if (ref($conn->{$_})) {
+ delete $conn->{$_};
+ }
+ }
+
+ delete $delqueue{$conn}; # finally remove the $conn
+
+ unless ($main::is_win) {
+ kill 'TERM', $conn->{pid} if exists $conn->{pid};
+ }
+}
+
+sub _send_stuff
+{
+ my $conn = shift;
+ my $rq = $conn->{outqueue};
+ my $sock = $conn->{sock};
+ return unless defined $sock;
+ return if $conn->{disconnecting};
+
+ while (@$rq) {
+ my $data = shift @$rq;
+ my $lth = length $data;
+ my $call = $conn->{call} || 'none';
+ if (isdbg('raw')) {
+ dbgdump('raw', "$call send $lth:", $data);
+ }
+ if (defined $sock) {
+ $sock->write($data);
+ $total_out += $lth;
+ $conn->{dataout} += $lth;
+ ++$conn->{linesout};
+ ++$total_lines_out;
+ } else {
+ dbg("_send_stuff $call ending data ignored: $data");
+ }
+ }
}
sub send_now {
my ($conn, $msg) = @_;
- _enqueue ($conn, $msg);
- $conn->_send (1); # 1 ==> flush
+ $conn->enqueue($msg);
+ _send_stuff($conn);
}
sub send_later {
- my ($conn, $msg) = @_;
- _enqueue($conn, $msg);
- my $sock = $conn->{sock};
- return unless defined($sock);
- set_event_handler ($sock, "write" => sub {$conn->_send(0)});
+ goto &send_now;
}
-sub _enqueue {
+sub send_raw
+{
my ($conn, $msg) = @_;
- # prepend length (encoded as network long)
- my $len = length($msg);
- $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg;
- push (@{$conn->{queue}}, $msg . "\n");
+ push @{$conn->{outqueue}}, $msg;
+ _send_stuff($conn);
}
-sub _send {
- my ($conn, $flush) = @_;
- my $sock = $conn->{sock};
- return unless defined($sock);
- my ($rq) = $conn->{queue};
-
- # If $flush is set, set the socket to blocking, and send all
- # messages in the queue - return only if there's an error
- # If $flush is 0 (deferred mode) make the socket non-blocking, and
- # return to the event loop only after every message, or if it
- # is likely to block in the middle of a message.
-
- $flush ? $conn->set_blocking() : $conn->set_non_blocking();
- my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
-
- while (@$rq) {
- my $msg = $rq->[0];
- my $mlth = length($msg);
- my $bytes_to_write = $mlth - $offset;
- my $bytes_written = 0;
- confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
- while ($bytes_to_write > 0) {
- $bytes_written = syswrite ($sock, $msg,
- $bytes_to_write, $offset);
- if (!defined($bytes_written)) {
- if (_err_will_block($!)) {
- # Should happen only in deferred mode. Record how
- # much we have already sent.
- $conn->{send_offset} = $offset;
- # Event handler should already be set, so we will
- # be called back eventually, and will resume sending
- return 1;
- } else { # Uh, oh
- delete $conn->{send_offset};
- $conn->handle_send_err($!);
- $conn->disconnect;
- return 0; # fail. Message remains in queue ..
- }
- }
- $offset += $bytes_written;
- $bytes_to_write -= $bytes_written;
- }
- delete $conn->{send_offset};
- $offset = 0;
- shift @$rq;
- last unless $flush; # Go back to select and wait
- # for it to fire again.
- }
- # Call me back if queue has not been drained.
- if (@$rq) {
- set_event_handler ($sock, "write" => sub {$conn->_send(0)});
- } else {
- set_event_handler ($sock, "write" => undef);
- }
- 1; # Success
-}
-
-sub _err_will_block {
- if ($blocking_supported) {
- return ($_[0] == EAGAIN());
- }
- return 0;
-}
-sub set_non_blocking { # $conn->set_blocking
- if ($blocking_supported) {
- # preserve other fcntl flags
- my $flags = fcntl ($_[0], F_GETFL(), 0);
- fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
- }
-}
-sub set_blocking {
- if ($blocking_supported) {
- my $flags = fcntl ($_[0], F_GETFL(), 0);
- $flags &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
- fcntl ($_[0], F_SETFL(), $flags);
- }
-}
-
-sub handle_send_err {
- # For more meaningful handling of send errors, subclass Msg and
- # rebless $conn.
- my ($conn, $err_msg) = @_;
- warn "Error while sending: $err_msg \n";
- set_event_handler ($conn->{sock}, "write" => undef);
+sub enqueue {
+ my $conn = shift;
+ push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : '';
+}
+
+sub _err_will_block
+{
+ return 0;
+}
+
+sub close_on_empty
+{
+ my $conn = shift;
+ $conn->{sock}->on(drain => sub {$conn->disconnect;});
}
#-----------------------------------------------------------------
# Receive side routines
-my ($g_login_proc,$g_pkg);
-my $main_socket = 0;
-sub new_server {
- @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
- my ($pkg, $my_host, $my_port, $login_proc) = @_;
-
- $main_socket = IO::Socket::INET->new (
- LocalAddr => $my_host,
- LocalPort => $my_port,
- Listen => 5,
- Proto => 'tcp',
- Reuse => 1);
- die "Could not create socket: $! \n" unless $main_socket;
- set_event_handler ($main_socket, "read" => \&_new_client);
- $g_login_proc = $login_proc; $g_pkg = $pkg;
+sub new_server
+{
+# @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
+ my ($pkg, $my_host, $my_port, $login_proc) = @_;
+ my $conn = $pkg->new($login_proc);
+
+ my $sock = $conn->{sock} = Mojo::IOLoop::Server->new;
+ $sock->on(accept=>sub{$conn->new_client($_[1]);});
+ $sock->listen(address=>$my_host, port=>$my_port);
+ $sock->start;
+
+ die "Could not create socket: $! \n" unless $conn->{sock};
+ return $conn;
+}
+
+
+sub nolinger
+{
+ my $conn = shift;
+}
+
+sub dequeue
+{
+ my $conn = shift;
+ return if $conn->{disconnecting};
+
+ if ($conn->{msg} =~ /\cJ/) {
+ my @lines = split /\cM?\cJ/, $conn->{msg};
+ if ($conn->{msg} =~ /\cM?\cJ$/) {
+ delete $conn->{msg};
+ } else {
+ $conn->{msg} = pop @lines;
+ }
+ $conn->{linesin} += @lines;
+ $total_lines_in += @lines;
+ for (@lines) {
+ last if $conn->{disconnecting};
+ &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
+ }
+ }
}
sub _rcv { # Complement to _send
my $conn = shift; # $rcv_now complement of $flush
- # Find out how much has already been received, if at all
- my ($msg, $offset, $bytes_to_read, $bytes_read);
+ my $msg = shift;
my $sock = $conn->{sock};
return unless defined($sock);
+ return if $conn->{disconnecting};
- my @lines;
- $conn->set_non_blocking();
- $bytes_read = sysread ($sock, $msg, 1024, 0);
- if (defined ($bytes_read)) {
- if ($bytes_read > 0) {
- if ($msg =~ /\n/) {
- @lines = split /\n/, $msg;
- $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg};
- if ($msg =~ /\n$/) {
- delete $conn->{msg};
+ $total_in += length $msg;
+ $conn->{datain} += length $msg;
+
+ if (isdbg('raw')) {
+ my $call = $conn->{call} || 'none';
+ my $lth = length $msg;
+ dbgdump('raw', "$call read $lth: ", $msg);
+ }
+ if ($conn->{echo}) {
+ my @ch = split //, $msg;
+ my $out;
+ for (@ch) {
+ if (/[\cH\x7f]/) {
+ $out .= "\cH \cH";
+ $conn->{msg} =~ s/.$//;
} else {
- $conn->{msg} = pop @lines;
+ $out .= $_;
+ $conn->{msg} .= $_;
}
- } else {
- $conn->{msg} .= $msg;
}
- }
+ if (defined $out) {
+ $conn->send_raw($out);
+ }
} else {
- if (_err_will_block($!)) {
- return ;
- } else {
- $bytes_read = 0;
- }
- }
-
-FINISH:
- if (defined $bytes_read && $bytes_read == 0) {
-# $conn->disconnect();
- &{$conn->{rcvd_notification_proc}}($conn, undef, $!);
- @lines = ();
- }
-
- while (@lines){
- $msg = shift @lines;
- $msg =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
- &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
- $! = 0;
+ $conn->{msg} .= $msg;
+ }
+
+ unless ($conn->{disable_read}) {
+ $conn->dequeue if exists $conn->{msg};
}
}
-sub _new_client {
- my $sock = $main_socket->accept();
- my $conn = bless {
- 'sock' => $sock,
- 'state' => 'connected'
- }, $g_pkg;
- my $rcvd_notification_proc =
- &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
- if ($rcvd_notification_proc) {
- $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
- my $callback = sub {_rcv($conn)};
- set_event_handler ($sock, "read" => $callback);
- } else { # Login failed
- $conn->disconnect();
- }
+sub new_client {
+ my $server_conn = shift;
+ my $handle = shift;
+
+ my $conn = $server_conn->new($server_conn->{rproc});
+ my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+ $sock->on(read => sub {$conn->_rcv($_[1])});
+ $sock->timeout(0);
+ $sock->start;
+ $conn->{peerhost} = $handle->peerhost || 'unknown';
+ $conn->{peerhost} =~ s|^::ffff:||; # chop off leading pseudo IPV6 stuff on dual stack listeners
+ $conn->{peerport} = $handle->peerport || 0;
+ dbg((ref $conn) . " accept $conn->{cnum} from $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg('connect');
+ my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost}, $conn->{peerport});
+ $conn->{sort} = 'Incoming';
+ if ($eproc) {
+ $conn->{eproc} = $eproc;
+ }
+ if ($rproc) {
+ $conn->{rproc} = $rproc;
+ } else { # Login failed
+ &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
+ $conn->disconnect();
+ }
+ return $conn;
}
sub close_server
{
- set_event_handler ($main_socket, "read" => undef);
- $main_socket->close;
- $main_socket = 0;
+ my $conn = shift;
+ delete $conn->{sock};
+}
+
+# close all clients (this is for forking really)
+sub close_all_clients
+{
+ foreach my $conn (values %conns) {
+ $conn->disconnect;
+ }
+}
+
+sub disable_read
+{
+ my $conn = shift;
+ return defined $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
}
+
+#
#----------------------------------------------------
# Event loop routines used by both client and server
sub set_event_handler {
- shift unless ref($_[0]); # shift if first arg is package name
- my ($handle, %args) = @_;
- my $callback;
- if (exists $args{'write'}) {
- $callback = $args{'write'};
- if ($callback) {
- $wt_callbacks{$handle} = $callback;
- $wt_handles->add($handle);
- } else {
- delete $wt_callbacks{$handle};
- $wt_handles->remove($handle);
- }
- }
- if (exists $args{'read'}) {
- $callback = $args{'read'};
- if ($callback) {
- $rd_callbacks{$handle} = $callback;
- $rd_handles->add($handle);
- } else {
- delete $rd_callbacks{$handle};
- $rd_handles->remove($handle);
- }
- }
-}
-
-sub event_loop {
- my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
- my ($conn, $r, $w, $rset, $wset);
- while (1) {
- # Quit the loop if no handles left to process
- last unless ($rd_handles->count() || $wt_handles->count());
- ($rset, $wset) =
- IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
- foreach $r (@$rset) {
- &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
- }
- foreach $w (@$wset) {
- &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
- }
- if (defined($loop_count)) {
- last unless --$loop_count;
- }
- }
+ my $sock = shift;
+ my %args = @_;
+ my ($pkg, $fn, $line) = caller;
+ my $s;
+ foreach (my ($k,$v) = each %args) {
+ $s .= "$k => $v, ";
+ }
+ $s =~ s/[\s,]$//;
+ dbg("Msg::set_event_handler called from ${pkg}::${fn} line $line doing $s");
+}
+
+sub sleep
+{
+ my ($pkg, $interval) = @_;
+ my $now = time;
+ while (time - $now < $interval) {
+ sleep 1;
+ }
+}
+
+sub DESTROY
+{
+ my $conn = shift;
+ my $call = $conn->{call} || 'unallocated';
+
+ if (isdbg('connll')) {
+ my ($pkg, $fn, $line) = caller;
+ dbgtrace((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line ");
+ }
+
+ my $call = $conn->{call} || 'unallocated';
+ my $host = $conn->{peerhost} || '';
+ my $port = $conn->{peerport} || '';
+ my $sock = $conn->{sock};
+
+ if ($sock) {
+ $sock->close_gracefully if $sock->can('close_gracefully');
+ delete $conn->{sock};
+ }
+
+ $noconns--;
+ dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');
}
1;