X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=0af6791156f92b8c8e9698707fca33202ff3ae18;hb=2c3a20bdcef84e620b0c3c2d306a71ebe17956b0;hp=20e000334ca5d72d103d51e63f17694ea643bc88;hpb=0017002e2dc438d49fcc090dc99b6d22f7037aa7;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 20e00033..0af67911 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -10,15 +10,12 @@ package Msg; -require Exporter; -@ISA = qw(Exporter); - use strict; use IO::Select; use IO::Socket; -use Carp; +#use DXDebug; -use vars qw (%rd_callbacks %wt_callbacks $rd_handles $wt_handles); +use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles); %rd_callbacks = (); %wt_callbacks = (); @@ -56,7 +53,7 @@ sub connect { }; if ($rcvd_notification_proc) { - my $callback = sub {_rcv($conn, 0)}; + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } return bless $conn, $pkg; @@ -67,7 +64,8 @@ sub disconnect { my $sock = delete $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); - close($sock); + shutdown($sock, 3); + close($sock); } sub send_now { @@ -88,8 +86,8 @@ sub _enqueue { my ($conn, $msg) = @_; # prepend length (encoded as network long) my $len = length($msg); - $msg = pack ('N', $len) . $msg; - push (@{$conn->{queue}}, $msg); + $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg; + push (@{$conn->{queue}}, $msg . "\n"); } sub _send { @@ -109,9 +107,11 @@ sub _send { while (@$rq) { my $msg = $rq->[0]; - my $bytes_to_write = length($msg) - $offset; + my $mlth = length($msg); + my $bytes_to_write = $mlth - $offset; my $bytes_written = 0; - while ($bytes_to_write) { + confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0; + while ($bytes_to_write > 0) { $bytes_written = syswrite ($sock, $msg, $bytes_to_write, $offset); if (!defined($bytes_written)) { @@ -123,7 +123,9 @@ sub _send { # be called back eventually, and will resume sending return 1; } else { # Uh, oh + delete $conn->{send_offset}; $conn->handle_send_err($!); + $conn->disconnect; return 0; # fail. Message remains in queue .. } } @@ -165,6 +167,7 @@ sub set_blocking { fcntl ($_[0], F_SETFL(), $flags); } } + sub handle_send_err { # For more meaningful handling of send errors, subclass Msg and # rebless $conn. @@ -193,71 +196,51 @@ sub new_server { $g_login_proc = $login_proc; $g_pkg = $pkg; } -sub rcv_now { - my ($conn) = @_; - my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now - return wantarray ? ($msg, $err) : $msg; -} - sub _rcv { # Complement to _send - my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush + my $conn = shift; # $rcv_now complement of $flush # Find out how much has already been received, if at all my ($msg, $offset, $bytes_to_read, $bytes_read); my $sock = $conn->{sock}; return unless defined($sock); - if (exists $conn->{msg}) { - $msg = $conn->{msg}; - $offset = length($msg) - 1; # sysread appends to it. - $bytes_to_read = $conn->{bytes_to_read}; - delete $conn->{'msg'}; # have made a copy - } else { - # The typical case ... - $msg = ""; # Otherwise -w complains - $offset = 0 ; - $bytes_to_read = 0 ; # Will get set soon - } - # We want to read the message length in blocking mode. Quite - # unlikely that we'll get blocked too long reading 4 bytes - if (!$bytes_to_read) { # Get new length - my $buf; - $conn->set_blocking(); - $bytes_read = sysread($sock, $buf, 4); - if ($! || ($bytes_read != 4)) { - goto FINISH; - } - $bytes_to_read = unpack ('N', $buf); - } - $conn->set_non_blocking() unless $rcv_now; - while ($bytes_to_read) { - $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset); - if (defined ($bytes_read)) { - if ($bytes_read == 0) { - last; - } - $bytes_to_read -= $bytes_read; - $offset += $bytes_read; - } else { - if (_err_will_block($!)) { - # Should come here only in non-blocking mode - $conn->{msg} = $msg; - $conn->{bytes_to_read} = $bytes_to_read; - return ; # .. _rcv will be called later - # when socket is readable again - } else { - last; - } - } - } - FINISH: - if (length($msg) == 0) { - $conn->disconnect(); - } - if ($rcv_now) { - return ($msg, $!); - } else { - &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); + my @lines; + $conn->set_non_blocking(); + $bytes_read = sysread ($sock, $msg, 1024, 0); + if (defined ($bytes_read)) { + if ($bytes_read > 0) { + if ($msg =~ /\n/) { + @lines = split /\n/, $msg; + $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg}; + if ($msg =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + } else { + $conn->{msg} .= $msg; + } + } + } else { + if (_err_will_block($!)) { + return ; + } else { + $bytes_read = 0; + } } + +FINISH: + if (defined $bytes_read && $bytes_read == 0) { +# $conn->disconnect(); + &{$conn->{rcvd_notification_proc}}($conn, undef, $!); + @lines = (); + } + + while (@lines){ + $msg = shift @lines; + $msg =~ s/%([2-9A-F][0-9A-F])/chr(hex($1))/eg; + &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); + $! = 0; + } } sub _new_client { @@ -270,13 +253,20 @@ sub _new_client { &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport()); if ($rcvd_notification_proc) { $conn->{rcvd_notification_proc} = $rcvd_notification_proc; - my $callback = sub {_rcv($conn,0)}; + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed $conn->disconnect(); } } +sub close_server +{ + set_event_handler ($main_socket, "read" => undef); + $main_socket->close; + $main_socket = 0; +} + #---------------------------------------------------- # Event loop routines used by both client and server