]> dxcluster.org Git - spider.git/blob - perl/Msg.pm
commit again
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14 use IO::Select;
15 use IO::Socket;
16 #use DXDebug;
17
18 use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles);
19
20 %rd_callbacks = ();
21 %wt_callbacks = ();
22 $rd_handles   = IO::Select->new();
23 $wt_handles   = IO::Select->new();
24 my $blocking_supported = 0;
25
26 BEGIN {
27     # Checks if blocking is supported
28     eval {
29         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
30     };
31     $blocking_supported = 1 unless $@;
32 }
33
34 #-----------------------------------------------------------------
35 # Send side routines
36 sub connect {
37     my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_;
38     
39     # Create a new internet socket
40     
41     my $sock = IO::Socket::INET->new (
42                                       PeerAddr => $to_host,
43                                       PeerPort => $to_port,
44                                       Proto    => 'tcp',
45                                       Reuse    => 1);
46
47     return undef unless $sock;
48
49     # Create a connection end-point object
50     my $conn = {
51         sock                   => $sock,
52         rcvd_notification_proc => $rcvd_notification_proc,
53     };
54     
55     if ($rcvd_notification_proc) {
56         my $callback = sub {_rcv($conn)};
57         set_event_handler ($sock, "read" => $callback);
58     }
59     return bless $conn, $pkg;
60 }
61
62 sub disconnect {
63     my $conn = shift;
64     my $sock = delete $conn->{sock};
65     return unless defined($sock);
66     set_event_handler ($sock, "read" => undef, "write" => undef);
67     shutdown($sock, 3);
68         close($sock);
69 }
70
71 sub send_now {
72     my ($conn, $msg) = @_;
73     _enqueue ($conn, $msg);
74     $conn->_send (1); # 1 ==> flush
75 }
76
77 sub send_later {
78     my ($conn, $msg) = @_;
79     _enqueue($conn, $msg);
80     my $sock = $conn->{sock};
81     return unless defined($sock);
82     set_event_handler ($sock, "write" => sub {$conn->_send(0)});
83 }
84
85 sub _enqueue {
86     my ($conn, $msg) = @_;
87     # prepend length (encoded as network long)
88     my $len = length($msg);
89         $msg =~ s/([\%\x00-\x1f\x7f-\xff])/sprintf("%%%02X", ord($1))/eg; 
90     push (@{$conn->{queue}}, $msg . "\n");
91 }
92
93 sub _send {
94     my ($conn, $flush) = @_;
95     my $sock = $conn->{sock};
96     return unless defined($sock);
97     my ($rq) = $conn->{queue};
98
99     # If $flush is set, set the socket to blocking, and send all
100     # messages in the queue - return only if there's an error
101     # If $flush is 0 (deferred mode) make the socket non-blocking, and
102     # return to the event loop only after every message, or if it
103     # is likely to block in the middle of a message.
104
105     $flush ? $conn->set_blocking() : $conn->set_non_blocking();
106     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
107
108     while (@$rq) {
109         my $msg            = $rq->[0];
110                 my $mlth           = length($msg);
111         my $bytes_to_write = $mlth - $offset;
112         my $bytes_written  = 0;
113                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
114         while ($bytes_to_write > 0) {
115             $bytes_written = syswrite ($sock, $msg,
116                                        $bytes_to_write, $offset);
117             if (!defined($bytes_written)) {
118                 if (_err_will_block($!)) {
119                     # Should happen only in deferred mode. Record how
120                     # much we have already sent.
121                     $conn->{send_offset} = $offset;
122                     # Event handler should already be set, so we will
123                     # be called back eventually, and will resume sending
124                     return 1;
125                 } else {    # Uh, oh
126                                         delete $conn->{send_offset};
127                     $conn->handle_send_err($!);
128                                         $conn->disconnect;
129                     return 0; # fail. Message remains in queue ..
130                 }
131             }
132             $offset         += $bytes_written;
133             $bytes_to_write -= $bytes_written;
134         }
135         delete $conn->{send_offset};
136         $offset = 0;
137         shift @$rq;
138         last unless $flush; # Go back to select and wait
139                             # for it to fire again.
140     }
141     # Call me back if queue has not been drained.
142     if (@$rq) {
143         set_event_handler ($sock, "write" => sub {$conn->_send(0)});
144     } else {
145         set_event_handler ($sock, "write" => undef);
146     }
147     1;  # Success
148 }
149
150 sub _err_will_block {
151     if ($blocking_supported) {
152         return ($_[0] == EAGAIN());
153     }
154     return 0;
155 }
156 sub set_non_blocking {                        # $conn->set_blocking
157     if ($blocking_supported) {
158         # preserve other fcntl flags
159         my $flags = fcntl ($_[0], F_GETFL(), 0);
160         fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK());
161     }
162 }
163 sub set_blocking {
164     if ($blocking_supported) {
165         my $flags = fcntl ($_[0], F_GETFL(), 0);
166         $flags  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
167         fcntl ($_[0], F_SETFL(), $flags);
168     }
169 }
170
171 sub handle_send_err {
172    # For more meaningful handling of send errors, subclass Msg and
173    # rebless $conn.  
174    my ($conn, $err_msg) = @_;
175    warn "Error while sending: $err_msg \n";
176    set_event_handler ($conn->{sock}, "write" => undef);
177 }
178
179 #-----------------------------------------------------------------
180 # Receive side routines
181
182 my ($g_login_proc,$g_pkg);
183 my $main_socket = 0;
184 sub new_server {
185     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
186     my ($pkg, $my_host, $my_port, $login_proc) = @_;
187     
188     $main_socket = IO::Socket::INET->new (
189                                           LocalAddr => $my_host,
190                                           LocalPort => $my_port,
191                                           Listen    => 5,
192                                           Proto     => 'tcp',
193                                           Reuse     => 1);
194     die "Could not create socket: $! \n" unless $main_socket;
195     set_event_handler ($main_socket, "read" => \&_new_client);
196     $g_login_proc = $login_proc; $g_pkg = $pkg;
197 }
198
199 sub _rcv {                     # Complement to _send
200     my $conn = shift; # $rcv_now complement of $flush
201     # Find out how much has already been received, if at all
202     my ($msg, $offset, $bytes_to_read, $bytes_read);
203     my $sock = $conn->{sock};
204     return unless defined($sock);
205
206         my @lines;
207     $conn->set_non_blocking();
208         $bytes_read = sysread ($sock, $msg, 1024, 0);
209         if (defined ($bytes_read)) {
210                 if ($bytes_read > 0) {
211                         if ($msg =~ /\n/) {
212                                 @lines = split /\n/, $msg;
213                                 $lines[0] = $conn->{msg} . $lines[0] if $conn->{msg};
214                                 if ($msg =~ /\n$/) {
215                                         delete $conn->{msg};
216                                 } else {
217                                         $conn->{msg} = pop @lines;
218                                 }
219                         } else {
220                                 $conn->{msg} .= $msg;
221                         }
222                 } 
223         } else {
224                 if (_err_will_block($!)) {
225                         return ; 
226                 } else {
227                         $bytes_read = 0;
228                 }
229     }
230
231 FINISH:
232     if (defined $bytes_read && $bytes_read == 0) {
233 #               $conn->disconnect();
234                 &{$conn->{rcvd_notification_proc}}($conn, undef, $!);
235                 @lines = ();
236     } 
237
238         while (@lines){
239                 $msg = shift @lines;
240                 $msg =~ s/%([2-9A-F][0-9A-F])/chr(hex($1))/eg;
241                 &{$conn->{rcvd_notification_proc}}($conn, $msg, $!);
242                 $! = 0;
243         }
244 }
245
246 sub _new_client {
247     my $sock = $main_socket->accept();
248     my $conn = bless {
249         'sock' =>  $sock,
250         'state' => 'connected'
251     }, $g_pkg;
252     my $rcvd_notification_proc =
253         &$g_login_proc ($conn, $sock->peerhost(), $sock->peerport());
254     if ($rcvd_notification_proc) {
255         $conn->{rcvd_notification_proc} = $rcvd_notification_proc;
256         my $callback = sub {_rcv($conn)};
257         set_event_handler ($sock, "read" => $callback);
258     } else {  # Login failed
259         $conn->disconnect();
260     }
261 }
262
263 sub close_server
264 {
265         set_event_handler ($main_socket, "read" => undef);
266         $main_socket->close;
267         $main_socket = 0;
268 }
269
270 #----------------------------------------------------
271 # Event loop routines used by both client and server
272
273 sub set_event_handler {
274     shift unless ref($_[0]); # shift if first arg is package name
275     my ($handle, %args) = @_;
276     my $callback;
277     if (exists $args{'write'}) {
278         $callback = $args{'write'};
279         if ($callback) {
280             $wt_callbacks{$handle} = $callback;
281             $wt_handles->add($handle);
282         } else {
283             delete $wt_callbacks{$handle};
284             $wt_handles->remove($handle);
285         }
286     }
287     if (exists $args{'read'}) {
288         $callback = $args{'read'};
289         if ($callback) {
290             $rd_callbacks{$handle} = $callback;
291             $rd_handles->add($handle);
292         } else {
293             delete $rd_callbacks{$handle};
294             $rd_handles->remove($handle);
295        }
296     }
297 }
298
299 sub event_loop {
300     my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once
301     my ($conn, $r, $w, $rset, $wset);
302     while (1) {
303         # Quit the loop if no handles left to process
304         last unless ($rd_handles->count() || $wt_handles->count());
305         ($rset, $wset) =
306             IO::Select->select ($rd_handles, $wt_handles, undef, $timeout);
307         foreach $r (@$rset) {
308             &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r};
309         }
310         foreach $w (@$wset) {
311             &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
312         }
313         if (defined($loop_count)) {
314             last unless --$loop_count;
315         }
316     }
317 }
318
319 1;
320
321 __END__
322