X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;ds=sidebyside;f=perl%2FMsg.pm;h=b626e27c67958eb9822f12491298d349094c31c4;hb=7fc04d3328f15fdeca0e86f8771c4b0b2d431df6;hp=20e000334ca5d72d103d51e63f17694ea643bc88;hpb=0017002e2dc438d49fcc090dc99b6d22f7037aa7;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index 20e00033..b626e27c 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -56,7 +56,7 @@ sub connect { }; if ($rcvd_notification_proc) { - my $callback = sub {_rcv($conn, 0)}; + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } return bless $conn, $pkg; @@ -67,7 +67,8 @@ sub disconnect { my $sock = delete $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); - close($sock); + shutdown($sock, 3); + close($sock); } sub send_now { @@ -88,8 +89,8 @@ sub _enqueue { my ($conn, $msg) = @_; # prepend length (encoded as network long) my $len = length($msg); - $msg = pack ('N', $len) . $msg; - push (@{$conn->{queue}}, $msg); + $msg =~ s/(\x00-\x2f\x7e-\xff%])/sprintf("%%%02X", ord($1))/eg; + push (@{$conn->{queue}}, $msg . "\n"); } sub _send { @@ -109,9 +110,11 @@ sub _send { while (@$rq) { my $msg = $rq->[0]; - my $bytes_to_write = length($msg) - $offset; + my $mlth = length($msg); + my $bytes_to_write = $mlth - $offset; my $bytes_written = 0; - while ($bytes_to_write) { + confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0; + while ($bytes_to_write > 0) { $bytes_written = syswrite ($sock, $msg, $bytes_to_write, $offset); if (!defined($bytes_written)) { @@ -123,7 +126,9 @@ sub _send { # be called back eventually, and will resume sending return 1; } else { # Uh, oh + delete $conn->{send_offset}; $conn->handle_send_err($!); + $conn->disconnect; return 0; # fail. Message remains in queue .. } } @@ -165,6 +170,7 @@ sub set_blocking { fcntl ($_[0], F_SETFL(), $flags); } } + sub handle_send_err { # For more meaningful handling of send errors, subclass Msg and # rebless $conn. @@ -193,71 +199,51 @@ sub new_server { $g_login_proc = $login_proc; $g_pkg = $pkg; } -sub rcv_now { - my ($conn) = @_; - my ($msg, $err) = _rcv ($conn, 1); # 1 ==> rcv now - return wantarray ? ($msg, $err) : $msg; -} - sub _rcv { # Complement to _send - my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush + my $conn = shift; # $rcv_now complement of $flush # Find out how much has already been received, if at all my ($msg, $offset, $bytes_to_read, $bytes_read); my $sock = $conn->{sock}; return unless defined($sock); - if (exists $conn->{msg}) { - $msg = $conn->{msg}; - $offset = length($msg) - 1; # sysread appends to it. - $bytes_to_read = $conn->{bytes_to_read}; - delete $conn->{'msg'}; # have made a copy - } else { - # The typical case ... - $msg = ""; # Otherwise -w complains - $offset = 0 ; - $bytes_to_read = 0 ; # Will get set soon - } - # We want to read the message length in blocking mode. Quite - # unlikely that we'll get blocked too long reading 4 bytes - if (!$bytes_to_read) { # Get new length - my $buf; - $conn->set_blocking(); - $bytes_read = sysread($sock, $buf, 4); - if ($! || ($bytes_read != 4)) { - goto FINISH; - } - $bytes_to_read = unpack ('N', $buf); - } - $conn->set_non_blocking() unless $rcv_now; - while ($bytes_to_read) { - $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset); - if (defined ($bytes_read)) { - if ($bytes_read == 0) { - last; - } - $bytes_to_read -= $bytes_read; - $offset += $bytes_read; - } else { - if (_err_will_block($!)) { - # Should come here only in non-blocking mode - $conn->{msg} = $msg; - $conn->{bytes_to_read} = $bytes_to_read; - return ; # .. _rcv will be called later - # when socket is readable again - } else { - last; - } - } - } - FINISH: - if (length($msg) == 0) { - $conn->disconnect(); - } - if ($rcv_now) { - return ($msg, $!); - } else { - &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); + my @lines; + $conn->set_non_blocking(); + $bytes_read = sysread ($sock, $msg, 1024, 0); + if (defined ($bytes_read)) { + if ($bytes_read > 0) { + if ($msg =~ /\n/) { + @lines = split /\n/, $msg; + $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg}; + if ($msg =~ /\n$/) { + delete $conn->{msg}; + } else { + $conn->{msg} = pop @lines; + } + } else { + $conn->{msg} .= $msg; + } + } + } else { + if (_err_will_block($!)) { + return ; + } else { + $bytes_read = 0; + } } + +FINISH: + if (defined $bytes_read && $bytes_read == 0) { +# $conn->disconnect(); + &{$conn->{rcvd_notification_proc}}($conn, undef, $!); + @lines = (); + } + + while (@lines){ + $msg = shift @lines; + $msg =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg; + &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); + $! = 0; + } } sub _new_client { @@ -270,13 +256,20 @@ sub _new_client { &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport()); if ($rcvd_notification_proc) { $conn->{rcvd_notification_proc} = $rcvd_notification_proc; - my $callback = sub {_rcv($conn,0)}; + my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed $conn->disconnect(); } } +sub close_server +{ + set_event_handler ($main_socket, "read" => undef); + $main_socket->close; + $main_socket = 0; +} + #---------------------------------------------------- # Event loop routines used by both client and server