X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=6702f152bc6811334a8cf1f6661791f0d0a2c7b7;hb=f7d2c39f20734d48a5306ac585f1bbee2fc2fbe7;hp=9df7640ecbbfbfecd5a80493469a5abacb1700f5;hpb=ed8842a3d38de2c171329e51612d2dc520ffcc99;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 9df7640e..6702f152 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -10,20 +10,20 @@ package Msg; -require Exporter; -@ISA = qw(Exporter); - use strict; use IO::Select; use IO::Socket; -use Carp; +#use DXDebug; -use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles); +use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain); %rd_callbacks = (); %wt_callbacks = (); $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); +$now = time; +@timerchain = (); + my $blocking_supported = 0; BEGIN { @@ -34,38 +34,66 @@ BEGIN { $blocking_supported = 1 unless $@; } +# +#----------------------------------------------------------------- +# Generalised initializer + +sub new +{ + my ($pkg, $rproc) = @_; + my $obj = ref($pkg); + my $class = $obj || $pkg; + + my $conn = { + rproc => $rproc, + inqueue => [], + outqueue => [], + state => 0, + lineend => "\r\n", + csort => 'telnet', + timeval => 60, + }; + + return bless $conn, $class; +} + #----------------------------------------------------------------- # Send side routines sub connect { - my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_; - + my ($pkg, $to_host, $to_port, $rproc) = @_; + + # Create a connection end-point object + my $conn = $pkg; + unless (ref $pkg) { + $conn = $pkg->new($rproc); + } + # Create a new internet socket - my $sock = IO::Socket::INET->new ( PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp', - Reuse => 1); + Reuse => 1, + Timeout => $conn->{timeval} / 2); return undef unless $sock; - # Create a connection end-point object - my $conn = { - sock => $sock, - rcvd_notification_proc => $rcvd_notification_proc, - }; + $conn->{sock} = $sock; - if ($rcvd_notification_proc) { - my $callback = sub {_rcv($conn, 0)}; + if ($conn->{rproc}) { + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } - return bless $conn, $pkg; + return $conn; } sub disconnect { my $conn = shift; my $sock = delete $conn->{sock}; - return unless defined($sock); + $conn->{state} = 'E'; + delete $conn->{cmd}; + $conn->{timeout}->del_timer if $conn->{timeout}; + return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); shutdown($sock, 3); close($sock); @@ -73,31 +101,28 @@ sub disconnect { sub send_now { my ($conn, $msg) = @_; - _enqueue ($conn, $msg); + $conn->enqueue($msg); $conn->_send (1); # 1 ==> flush } sub send_later { my ($conn, $msg) = @_; - _enqueue($conn, $msg); + $conn->enqueue($msg); my $sock = $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } -sub _enqueue { - my ($conn, $msg) = @_; - # prepend length (encoded as network long) - my $len = length($msg); - $msg = pack ('N', $len) . $msg; - push (@{$conn->{queue}}, $msg); +sub enqueue { + my $conn = shift; + push (@{$conn->{outqueue}}, $_[0]); } sub _send { my ($conn, $flush) = @_; my $sock = $conn->{sock}; return unless defined($sock); - my ($rq) = $conn->{queue}; + my $rq = $conn->{outqueue}; # If $flush is set, set the socket to blocking, and send all # messages in the queue - return only if there's an error @@ -110,9 +135,11 @@ sub _send { while (@$rq) { my $msg = $rq->[0]; - my $bytes_to_write = length($msg) - $offset; + my $mlth = length($msg); + my $bytes_to_write = $mlth - $offset; my $bytes_written = 0; - while ($bytes_to_write) { + confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0; + while ($bytes_to_write > 0) { $bytes_written = syswrite ($sock, $msg, $bytes_to_write, $offset); if (!defined($bytes_written)) { @@ -124,7 +151,9 @@ sub _send { # be called back eventually, and will resume sending return 1; } else { # Uh, oh + delete $conn->{send_offset}; $conn->handle_send_err($!); + $conn->disconnect; return 0; # fail. Message remains in queue .. } } @@ -178,101 +207,90 @@ sub handle_send_err { #----------------------------------------------------------------- # Receive side routines -my ($g_login_proc,$g_pkg); -my $main_socket = 0; sub new_server { - @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n"; + @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n"; my ($pkg, $my_host, $my_port, $login_proc) = @_; - - $main_socket = IO::Socket::INET->new ( + my $self = $pkg->new($login_proc); + + $self->{sock} = IO::Socket::INET->new ( LocalAddr => $my_host, LocalPort => $my_port, Listen => 5, Proto => 'tcp', Reuse => 1); - die "Could not create socket: $! \n" unless $main_socket; - set_event_handler ($main_socket, "read" => \&_new_client); - $g_login_proc = $login_proc; $g_pkg = $pkg; + die "Could not create socket: $! \n" unless $self->{sock}; + set_event_handler ($self->{sock}, "read" => sub { $self->new_client } ); + return $self; } -sub rcv_now { - my ($conn) = @_; - my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now - return wantarray ? ($msg, $err) : $msg; +sub dequeue +{ + my $conn = shift; + my $msg; + + while ($msg = shift @{$conn->{inqueue}}){ + &{$conn->{rproc}}($conn, $msg, $!); + $! = 0; + } } sub _rcv { # Complement to _send - my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush + my $conn = shift; # $rcv_now complement of $flush # Find out how much has already been received, if at all my ($msg, $offset, $bytes_to_read, $bytes_read); my $sock = $conn->{sock}; return unless defined($sock); - if (exists $conn->{msg}) { - $msg = $conn->{msg}; - $offset = length($msg) - 1; # sysread appends to it. - $bytes_to_read = $conn->{bytes_to_read}; - delete $conn->{'msg'}; # have made a copy - } else { - # The typical case ... - $msg = ""; # Otherwise -w complains - $offset = 0 ; - $bytes_to_read = 0 ; # Will get set soon - } - # We want to read the message length in blocking mode. Quite - # unlikely that we'll get blocked too long reading 4 bytes - if (!$bytes_to_read) { # Get new length - my $buf; - $conn->set_blocking(); - $bytes_read = sysread($sock, $buf, 4); - if ($! || ($bytes_read != 4)) { - goto FINISH; - } - $bytes_to_read = unpack ('N', $buf); - } - $conn->set_non_blocking() unless $rcv_now; - while ($bytes_to_read) { - $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset); - if (defined ($bytes_read)) { - if ($bytes_read == 0) { - last; - } - $bytes_to_read -= $bytes_read; - $offset += $bytes_read; - } else { - if (_err_will_block($!)) { - # Should come here only in non-blocking mode - $conn->{msg} = $msg; - $conn->{bytes_to_read} = $bytes_to_read; - return ; # .. _rcv will be called later - # when socket is readable again - } else { - last; - } - } - } - FINISH: - if (length($msg) == 0) { - $conn->disconnect(); + my @lines; + $conn->set_non_blocking(); + $bytes_read = sysread ($sock, $msg, 1024, 0); + if (defined ($bytes_read)) { + if ($bytes_read > 0) { + if ($msg =~ /\n/) { + @lines = split /\r?\n/, $msg; + if (@lines) { + $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg}; + } else { + $lines[0] = $conn->{msg} if exists $conn->{msg}; + push @lines, '' unless @lines; + } + if ($msg =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + push @{$conn->{inqueue}}, @lines if @lines; + } else { + $conn->{msg} .= $msg; + } + } + } else { + if (_err_will_block($!)) { + return ; + } else { + $bytes_read = 0; + } } - if ($rcv_now) { - return ($msg, $!); + +FINISH: + if (defined $bytes_read && $bytes_read == 0) { +# $conn->disconnect(); + &{$conn->{rproc}}($conn, undef, $!); + delete $conn->{inqueue}; } else { - &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); - } + $conn->dequeue; + } } -sub _new_client { - my $sock = $main_socket->accept(); - my $conn = bless { - 'sock' => $sock, - 'state' => 'connected' - }, $g_pkg; - my $rcvd_notification_proc = - &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport()); - if ($rcvd_notification_proc) { - $conn->{rcvd_notification_proc} = $rcvd_notification_proc; - my $callback = sub {_rcv($conn,0)}; +sub new_client { + my $server_conn = shift; + my $sock = $server_conn->{sock}->accept(); + my $conn = $server_conn->new($server_conn->{rproc}); + $conn->{sock} = $sock; + my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport()); + if ($rproc) { + $conn->{rproc} = $rproc; + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed $conn->disconnect(); @@ -281,9 +299,9 @@ sub _new_client { sub close_server { - set_event_handler ($main_socket, "read" => undef); - $main_socket->close; - $main_socket = 0; + my $conn = shift; + set_event_handler ($conn->{sock}, "read" => undef); + $conn->{sock}->close; } #---------------------------------------------------- @@ -315,20 +333,53 @@ sub set_event_handler { } } +sub new_timer +{ + my ($pkg, $time, $proc, $recur) = @_; + my $obj = ref($pkg); + my $class = $obj || $pkg; + my $self = bless { t=>$time + time, proc=>$proc }, $class; + $self->{interval} = $time if $recur; + push @timerchain, $self; + return $self; +} + +sub del_timer +{ + my $self = shift; + @timerchain = grep {$_ != $self} @timerchain; +} + sub event_loop { my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once my ($conn, $r, $w, $rset, $wset); while (1) { - # Quit the loop if no handles left to process + + # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); - ($rset, $wset) = + + ($rset, $wset) = IO::Select->select ($rd_handles, $wt_handles, undef, $timeout); + $now = time; + foreach $r (@$rset) { &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r}; } foreach $w (@$wset) { &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; } + + # handle things on the timer chain + for (@timerchain) { + if ($now >= $_->{t}) { + &{$_->{proc}}(); + $_->{t} = $now + $_->{interval} if exists $_->{interval}; + } + } + + # remove dead timers + @timerchain = grep { $_->{t} > $now } @timerchain; + if (defined($loop_count)) { last unless --$loop_count; }