X-Git-Url: http://dxcluster.org/gitweb/gitweb.cgi?a=blobdiff_plain;ds=sidebyside;f=perl%2FMsg.pm;h=6702f152bc6811334a8cf1f6661791f0d0a2c7b7;hb=f7d2c39f20734d48a5306ac585f1bbee2fc2fbe7;hp=f5704a81e46eb237403f695383e7bd3307770dbf;hpb=0bd9d2811cc42417676a1b11b121681c2377d70a;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index f5704a81..6702f152 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -15,12 +15,15 @@ use IO::Select; use IO::Socket; #use DXDebug; -use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles); +use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles $now @timerchain); %rd_callbacks = (); %wt_callbacks = (); $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); +$now = time; +@timerchain = (); + my $blocking_supported = 0; BEGIN { @@ -31,38 +34,66 @@ BEGIN { $blocking_supported = 1 unless $@; } +# +#----------------------------------------------------------------- +# Generalised initializer + +sub new +{ + my ($pkg, $rproc) = @_; + my $obj = ref($pkg); + my $class = $obj || $pkg; + + my $conn = { + rproc => $rproc, + inqueue => [], + outqueue => [], + state => 0, + lineend => "\r\n", + csort => 'telnet', + timeval => 60, + }; + + return bless $conn, $class; +} + #----------------------------------------------------------------- # Send side routines sub connect { - my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_; - + my ($pkg, $to_host, $to_port, $rproc) = @_; + + # Create a connection end-point object + my $conn = $pkg; + unless (ref $pkg) { + $conn = $pkg->new($rproc); + } + # Create a new internet socket - my $sock = IO::Socket::INET->new ( PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp', - Reuse => 1); + Reuse => 1, + Timeout => $conn->{timeval} / 2); return undef unless $sock; - # Create a connection end-point object - my $conn = { - sock => $sock, - rcvd_notification_proc => $rcvd_notification_proc, - }; + $conn->{sock} = $sock; - if ($rcvd_notification_proc) { + if ($conn->{rproc}) { my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } - return bless $conn, $pkg; + return $conn; } sub disconnect { my $conn = shift; my $sock = delete $conn->{sock}; - return unless defined($sock); + $conn->{state} = 'E'; + delete $conn->{cmd}; + $conn->{timeout}->del_timer if $conn->{timeout}; + return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); shutdown($sock, 3); close($sock); @@ -70,31 +101,28 @@ sub disconnect { sub send_now { my ($conn, $msg) = @_; - _enqueue ($conn, $msg); + $conn->enqueue($msg); $conn->_send (1); # 1 ==> flush } sub send_later { my ($conn, $msg) = @_; - _enqueue($conn, $msg); + $conn->enqueue($msg); my $sock = $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } -sub _enqueue { - my ($conn, $msg) = @_; - # prepend length (encoded as network long) - my $len = length($msg); - $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg; - push (@{$conn->{queue}}, $msg . "\n"); +sub enqueue { + my $conn = shift; + push (@{$conn->{outqueue}}, $_[0]); } sub _send { my ($conn, $flush) = @_; my $sock = $conn->{sock}; return unless defined($sock); - my ($rq) = $conn->{queue}; + my $rq = $conn->{outqueue}; # If $flush is set, set the socket to blocking, and send all # messages in the queue - return only if there's an error @@ -179,21 +207,31 @@ sub handle_send_err { #----------------------------------------------------------------- # Receive side routines -my ($g_login_proc,$g_pkg); -my $main_socket = 0; sub new_server { - @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n"; + @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n"; my ($pkg, $my_host, $my_port, $login_proc) = @_; - - $main_socket = IO::Socket::INET->new ( + my $self = $pkg->new($login_proc); + + $self->{sock} = IO::Socket::INET->new ( LocalAddr => $my_host, LocalPort => $my_port, Listen => 5, Proto => 'tcp', Reuse => 1); - die "Could not create socket: $! \n" unless $main_socket; - set_event_handler ($main_socket, "read" => \&_new_client); - $g_login_proc = $login_proc; $g_pkg = $pkg; + die "Could not create socket: $! \n" unless $self->{sock}; + set_event_handler ($self->{sock}, "read" => sub { $self->new_client } ); + return $self; +} + +sub dequeue +{ + my $conn = shift; + my $msg; + + while ($msg = shift @{$conn->{inqueue}}){ + &{$conn->{rproc}}($conn, $msg, $!); + $! = 0; + } } sub _rcv { # Complement to _send @@ -209,13 +247,19 @@ sub _rcv { # Complement to _send if (defined ($bytes_read)) { if ($bytes_read > 0) { if ($msg =~ /\n/) { - @lines = split /\n/, $msg; - $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg}; + @lines = split /\r?\n/, $msg; + if (@lines) { + $lines[0] = $conn->{msg} . $lines[0] if exists $conn->{msg}; + } else { + $lines[0] = $conn->{msg} if exists $conn->{msg}; + push @lines, '' unless @lines; + } if ($msg =~ /\n$/) { delete $conn->{msg}; } else { $conn->{msg} = pop @lines; } + push @{$conn->{inqueue}}, @lines if @lines; } else { $conn->{msg} .= $msg; } @@ -231,28 +275,21 @@ sub _rcv { # Complement to _send FINISH: if (defined $bytes_read && $bytes_read == 0) { # $conn->disconnect(); - &{$conn->{rcvd_notification_proc}}($conn, undef, $!); - @lines = (); - } - - while (@lines){ - $msg = shift @lines; - $msg =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg; - &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); - $! = 0; + &{$conn->{rproc}}($conn, undef, $!); + delete $conn->{inqueue}; + } else { + $conn->dequeue; } } -sub _new_client { - my $sock = $main_socket->accept(); - my $conn = bless { - 'sock' => $sock, - 'state' => 'connected' - }, $g_pkg; - my $rcvd_notification_proc = - &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport()); - if ($rcvd_notification_proc) { - $conn->{rcvd_notification_proc} = $rcvd_notification_proc; +sub new_client { + my $server_conn = shift; + my $sock = $server_conn->{sock}->accept(); + my $conn = $server_conn->new($server_conn->{rproc}); + $conn->{sock} = $sock; + my $rproc = &{$server_conn->{rproc}} ($conn, $sock->peerhost(), $sock->peerport()); + if ($rproc) { + $conn->{rproc} = $rproc; my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed @@ -262,9 +299,9 @@ sub _new_client { sub close_server { - set_event_handler ($main_socket, "read" => undef); - $main_socket->close; - $main_socket = 0; + my $conn = shift; + set_event_handler ($conn->{sock}, "read" => undef); + $conn->{sock}->close; } #---------------------------------------------------- @@ -296,20 +333,53 @@ sub set_event_handler { } } +sub new_timer +{ + my ($pkg, $time, $proc, $recur) = @_; + my $obj = ref($pkg); + my $class = $obj || $pkg; + my $self = bless { t=>$time + time, proc=>$proc }, $class; + $self->{interval} = $time if $recur; + push @timerchain, $self; + return $self; +} + +sub del_timer +{ + my $self = shift; + @timerchain = grep {$_ != $self} @timerchain; +} + sub event_loop { my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once my ($conn, $r, $w, $rset, $wset); while (1) { - # Quit the loop if no handles left to process + + # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); - ($rset, $wset) = + + ($rset, $wset) = IO::Select->select ($rd_handles, $wt_handles, undef, $timeout); + $now = time; + foreach $r (@$rset) { &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r}; } foreach $w (@$wset) { &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; } + + # handle things on the timer chain + for (@timerchain) { + if ($now >= $_->{t}) { + &{$_->{proc}}(); + $_->{t} = $now + $_->{interval} if exists $_->{interval}; + } + } + + # remove dead timers + @timerchain = grep { $_->{t} > $now } @timerchain; + if (defined($loop_count)) { last unless --$loop_count; }