add dxver to these routines
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 # $Id$
9 #
10
11 package Msg;
12
13 use strict;
14
15 use DXUtil;
16
17 use vars qw($VERSION $BRANCH);
18 ($VERSION, $BRANCH) = dxver(q$Revision$);
19
20 use IO::Select;
21 use IO::Socket;
22 use DXDebug;
23 use Timer;
24
25 use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out);
26
27 %rd_callbacks = ();
28 %wt_callbacks = ();
29 %er_callbacks = ();
30 $rd_handles   = IO::Select->new();
31 $wt_handles   = IO::Select->new();
32 $er_handles   = IO::Select->new();
33 $total_in = $total_out = 0;
34
35 $now = time;
36
37 BEGIN {
38     # Checks if blocking is supported
39     eval {
40                 local $^W;
41         require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL))
42     };
43         if ($@ || $main::is_win) {
44                 $blocking_supported = IO::Socket->can('blocking') ? 2 : 0;
45         } else {
46                 $blocking_supported = IO::Socket->can('blocking') ? 2 : 1;
47         }
48
49
50         # import as many of these errno values as are available
51         eval {
52                 local $^W;
53                 require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK));
54         };
55
56         unless ($^O eq 'MSWin32') {
57                 if ($] >= 5.6) {
58                         eval {
59                                 local $^W;
60                                 require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY));
61                         };
62                 } else {
63                         dbg("IPPROTO_TCP and TCP_NODELAY manually defined");
64                         eval 'sub IPPROTO_TCP {     6 };';
65                         eval 'sub TCP_NODELAY {     1 };';
66                 }
67         }
68         # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
69         # defines EINPROGRESS as 10035.  We provide it here because some
70         # Win32 users report POSIX::EINPROGRESS is not vendor-supported.
71         if ($^O eq 'MSWin32') { 
72                 eval '*EINPROGRESS = sub { 10036 };';
73                 eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };';
74                 eval '*F_GETFL     = sub {     0 };';
75                 eval '*F_SETFL     = sub {     0 };';
76                 eval '*IPPROTO_TCP     = sub {     6 };';
77                 eval '*TCP_NODELAY     = sub {     1 };';
78                 $blocking_supported = 0;   # it appears that this DOESN'T work :-(
79         } 
80 }
81
82 my $w = $^W;
83 $^W = 0;
84 my $eagain = eval {EAGAIN()};
85 my $einprogress = eval {EINPROGRESS()};
86 my $ewouldblock = eval {EWOULDBLOCK()};
87 $^W = $w;
88 $cnum = 0;
89
90
91 #
92 #-----------------------------------------------------------------
93 # Generalised initializer
94
95 sub new
96 {
97     my ($pkg, $rproc) = @_;
98         my $obj = ref($pkg);
99         my $class = $obj || $pkg;
100
101     my $conn = {
102         rproc => $rproc,
103                 inqueue => [],
104                 outqueue => [],
105                 state => 0,
106                 lineend => "\r\n",
107                 csort => 'telnet',
108                 timeval => 60,
109                 blocking => 0,
110                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
111     };
112
113         $noconns++;
114         
115         dbg("Connection created ($noconns)") if isdbg('connll');
116         return bless $conn, $class;
117 }
118
119 sub set_error
120 {
121         my $conn = shift;
122         my $callback = shift;
123         $conn->{eproc} = $callback;
124         set_event_handler($conn->{sock}, error => $callback) if exists $conn->{sock};
125 }
126
127 sub set_rproc
128 {
129         my $conn = shift;
130         my $callback = shift;
131         $conn->{rproc} = $callback;
132 }
133
134 sub blocking
135 {
136         return unless $blocking_supported;
137
138         # Make the handle stop blocking, the Windows way.
139         if ($blocking_supported) { 
140                 $_[0]->blocking($_[1]);
141         } else {
142                 my $flags = fcntl ($_[0], F_GETFL, 0);
143                 if ($_[1]) {
144                         $flags &= ~O_NONBLOCK;
145                 } else {
146                         $flags |= O_NONBLOCK;
147                 }
148                 fcntl ($_[0], F_SETFL, $flags);
149         }
150 }
151
152 # save it
153 sub conns
154 {
155         my $pkg = shift;
156         my $call = shift;
157         my $ref;
158         
159         if (ref $pkg) {
160                 $call = $pkg->{call} unless $call;
161                 return undef unless $call;
162                 dbg("changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
163                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
164                 $pkg->{call} = $call;
165                 $ref = $conns{$call} = $pkg;
166                 dbg("Connection $pkg->{cnum} $call stored") if isdbg('connll');
167         } else {
168                 $ref = $conns{$call};
169         }
170         return $ref;
171 }
172
173 # this is only called by any dependent processes going away unexpectedly
174 sub pid_gone
175 {
176         my ($pkg, $pid) = @_;
177         
178         my @pid = grep {$_->{pid} == $pid} values %conns;
179         foreach my $p (@pid) {
180                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
181                 $p->disconnect;
182         }
183 }
184
185 #-----------------------------------------------------------------
186 # Send side routines
187 sub connect {
188     my ($pkg, $to_host, $to_port, $rproc) = @_;
189
190     # Create a connection end-point object
191     my $conn = $pkg;
192         unless (ref $pkg) {
193                 $conn = $pkg->new($rproc);
194         }
195         $conn->{peerhost} = $to_host;
196         $conn->{peerport} = $to_port;
197         $conn->{sort} = 'Outgoing';
198         
199     # Create a new internet socket
200     my $sock = IO::Socket::INET->new();
201     return undef unless $sock;
202         
203         my $proto = getprotobyname('tcp');
204         $sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
205         
206         blocking($sock, 0);
207         $conn->{blocking} = 0;
208
209         # does the host resolve?
210         my $ip = gethostbyname($to_host);
211         return undef unless $ip;
212         
213         my $r = connect($sock, pack_sockaddr_in($to_port, $ip));
214         return undef unless $r || _err_will_block($!);
215         
216         $conn->{sock} = $sock;
217     
218     if ($conn->{rproc}) {
219         my $callback = sub {$conn->_rcv};
220         set_event_handler ($sock, read => $callback);
221     }
222     return $conn;
223 }
224
225 sub start_program
226 {
227         my ($conn, $line, $sort) = @_;
228         my $pid;
229         
230         local $^F = 10000;              # make sure it ain't closed on exec
231         my ($a, $b) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
232         if ($a && $b) {
233                 $a->autoflush(1);
234                 $b->autoflush(1);
235                 $pid = fork;
236                 if (defined $pid) {
237                         if ($pid) {
238                                 close $b;
239                                 $conn->{sock} = $a;
240                                 $conn->{csort} = $sort;
241                                 $conn->{lineend} = "\cM" if $sort eq 'ax25';
242                                 $conn->{pid} = $pid;
243                                 if ($conn->{rproc}) {
244                                         my $callback = sub {$conn->_rcv};
245                                         Msg::set_event_handler ($a, read => $callback);
246                                 }
247                                 dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect');
248                         } else {
249                                 $^W = 0;
250                                 dbgclose();
251                                 STDIN->close;
252                                 STDOUT->close;
253                                 STDOUT->close;
254                                 *STDIN = IO::File->new_from_fd($b, 'r') or die;
255                                 *STDOUT = IO::File->new_from_fd($b, 'w') or die;
256                                 *STDERR = IO::File->new_from_fd($b, 'w') or die;
257                                 close $a;
258                                 unless ($main::is_win) {
259                                         #                                               $SIG{HUP} = 'IGNORE';
260                                         $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT';
261                                         alarm(0);
262                                 }
263                                 exec "$line" or dbg("exec '$line' failed $!");
264                         } 
265                 } else {
266                         dbg("cannot fork for $line");
267                 }
268         } else {
269                 dbg("no socket pair $! for $line");
270         }
271         return $pid;
272 }
273
274 sub disconnect 
275 {
276     my $conn = shift;
277         return if exists $conn->{disconnecting};
278
279         $conn->{disconnecting} = 1;
280     my $sock = delete $conn->{sock};
281         $conn->{state} = 'E';
282         $conn->{timeout}->del if $conn->{timeout};
283
284         # be careful to delete the correct one
285         my $call;
286         if ($call = $conn->{call}) {
287                 my $ref = $conns{$call};
288                 delete $conns{$call} if $ref && $ref == $conn;
289         }
290         $call ||= 'unallocated';
291         dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll');
292         
293         # get rid of any references
294         for (keys %$conn) {
295                 if (ref($conn->{$_})) {
296                         delete $conn->{$_};
297                 }
298         }
299
300         if (defined($sock)) {
301                 set_event_handler ($sock, read => undef, write => undef, error => undef);
302                 shutdown($sock, 3);
303                 close($sock);
304         }
305         
306         unless ($main::is_win) {
307                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
308         }
309 }
310
311 sub send_now {
312     my ($conn, $msg) = @_;
313     $conn->enqueue($msg);
314     $conn->_send (1); # 1 ==> flush
315 }
316
317 sub send_later {
318     my ($conn, $msg) = @_;
319     $conn->enqueue($msg);
320     my $sock = $conn->{sock};
321     return unless defined($sock);
322     set_event_handler ($sock, write => sub {$conn->_send(0)});
323 }
324
325 sub enqueue {
326     my $conn = shift;
327     push (@{$conn->{outqueue}}, defined $_[0] ? $_[0] : '');
328 }
329
330 sub _send {
331     my ($conn, $flush) = @_;
332     my $sock = $conn->{sock};
333     return unless defined($sock);
334     my $rq = $conn->{outqueue};
335
336     # If $flush is set, set the socket to blocking, and send all
337     # messages in the queue - return only if there's an error
338     # If $flush is 0 (deferred mode) make the socket non-blocking, and
339     # return to the event loop only after every message, or if it
340     # is likely to block in the middle of a message.
341
342 #       if ($conn->{blocking} != $flush) {
343 #               blocking($sock, $flush);
344 #               $conn->{blocking} = $flush;
345 #       }
346     my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0;
347
348     while (@$rq) {
349         my $msg            = $rq->[0];
350                 my $mlth           = length($msg);
351         my $bytes_to_write = $mlth - $offset;
352         my $bytes_written  = 0;
353                 confess("Negative Length! msg: '$msg' lth: $mlth offset: $offset") if $bytes_to_write < 0;
354         while ($bytes_to_write > 0) {
355             $bytes_written = syswrite ($sock, $msg,
356                                        $bytes_to_write, $offset);
357             if (!defined($bytes_written)) {
358                 if (_err_will_block($!)) {
359                     # Should happen only in deferred mode. Record how
360                     # much we have already sent.
361                     $conn->{send_offset} = $offset;
362                     # Event handler should already be set, so we will
363                     # be called back eventually, and will resume sending
364                     return 1;
365                 } else {    # Uh, oh
366                                         &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
367                                         $conn->disconnect;
368                     return 0; # fail. Message remains in queue ..
369                 }
370             } elsif (isdbg('raw')) {
371                                 my $call = $conn->{call} || 'none';
372                                 dbgdump('raw', "$call send $bytes_written: ", $msg);
373                         }
374                         $total_out      += $bytes_written;
375             $offset         += $bytes_written;
376             $bytes_to_write -= $bytes_written;
377         }
378         delete $conn->{send_offset};
379         $offset = 0;
380         shift @$rq;
381         #last unless $flush; # Go back to select and wait
382                             # for it to fire again.
383     }
384     # Call me back if queue has not been drained.
385     unless (@$rq) {
386         set_event_handler ($sock, write => undef);
387                 if (exists $conn->{close_on_empty}) {
388                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
389                         $conn->disconnect; 
390                 }
391     }
392     1;  # Success
393 }
394
395 sub dup_sock
396 {
397         my $conn = shift;
398         my $oldsock = $conn->{sock};
399         my $rc = $rd_callbacks{$oldsock};
400         my $wc = $wt_callbacks{$oldsock};
401         my $ec = $er_callbacks{$oldsock};
402         my $sock = $oldsock->new_from_fd($oldsock, "w+");
403         if ($sock) {
404                 set_event_handler($oldsock, read=>undef, write=>undef, error=>undef);
405                 $conn->{sock} = $sock;
406                 set_event_handler($sock, read=>$rc, write=>$wc, error=>$ec);
407                 $oldsock->close;
408         }
409 }
410
411 sub _err_will_block {
412         return 0 unless $blocking_supported;
413         return ($_[0] == $eagain || $_[0] == $ewouldblock || $_[0] == $einprogress);
414 }
415
416 sub close_on_empty
417 {
418         my $conn = shift;
419         $conn->{close_on_empty} = 1;
420 }
421
422 #-----------------------------------------------------------------
423 # Receive side routines
424
425 sub new_server {
426     @_ == 4 || die "Msg->new_server (myhost, myport, login_proc\n";
427     my ($pkg, $my_host, $my_port, $login_proc) = @_;
428         my $self = $pkg->new($login_proc);
429         
430     $self->{sock} = IO::Socket::INET->new (
431                                           LocalAddr => "$my_host:$my_port",
432 #                                          LocalPort => $my_port,
433                                           Listen    => SOMAXCONN,
434                                           Proto     => 'tcp',
435                                           Reuse => 1);
436     die "Could not create socket: $! \n" unless $self->{sock};
437     set_event_handler ($self->{sock}, read => sub { $self->new_client }  );
438         return $self;
439 }
440
441
442 sub nolinger
443 {
444         my $conn = shift;
445
446         unless ($main::is_win) {
447                 if (isdbg('sock')) {
448                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
449                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
450                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
451                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
452                 }
453                 
454                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1)} or dbg("setsockopt keepalive: $!");
455                 eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0))} or dbg("setsockopt linger: $!");
456                 eval {setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)} or eval {setsockopt($conn->{sock}, SOL_SOCKET, TCP_NODELAY, 1)} or dbg("setsockopt tcp_nodelay: $!");
457                 $conn->{sock}->autoflush(0);
458
459                 if (isdbg('sock')) {
460                         my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); 
461                         my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE);
462                         my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY);
463                         dbg("Linger is: $l $t, keepalive: $k, nagle: $n");
464                 }
465         } 
466 }
467
468 sub dequeue
469 {
470         my $conn = shift;
471
472         if ($conn->{msg} =~ /\n/) {
473                 my @lines = split /\r?\n/, $conn->{msg};
474                 if ($conn->{msg} =~ /\n$/) {
475                         delete $conn->{msg};
476                 } else {
477                         $conn->{msg} = pop @lines;
478                 }
479                 for (@lines) {
480                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
481                 }
482         }
483 }
484
485 sub _rcv {                     # Complement to _send
486     my $conn = shift; # $rcv_now complement of $flush
487     # Find out how much has already been received, if at all
488     my ($msg, $offset, $bytes_to_read, $bytes_read);
489     my $sock = $conn->{sock};
490     return unless defined($sock);
491
492         my @lines;
493         if ($conn->{blocking}) {
494                 blocking($sock, 0);
495                 $conn->{blocking} = 0;
496         }
497         $bytes_read = sysread ($sock, $msg, 1024, 0);
498         if (defined ($bytes_read)) {
499                 if ($bytes_read > 0) {
500                         $total_in += $bytes_read;
501                         if (isdbg('raw')) {
502                                 my $call = $conn->{call} || 'none';
503                                 dbgdump('raw', "$call read $bytes_read: ", $msg);
504                         }
505                         if ($conn->{echo}) {
506                                 my @ch = split //, $msg;
507                                 my $out;
508                                 for (@ch) {
509                                         if (/[\cH\x7f]/) {
510                                                 $out .= "\cH \cH";
511                                                 $conn->{msg} =~ s/.$//;
512                                         } else {
513                                                 $out .= $_;
514                                                 $conn->{msg} .= $_;
515                                         }
516                                 }
517                                 if (defined $out) {
518                                         set_event_handler ($sock, write => sub{$conn->_send(0)});
519                                         push @{$conn->{outqueue}}, $out;
520                                 }
521                         } else {
522                                 $conn->{msg} .= $msg;
523                         }
524                 } 
525         } else {
526                 if (_err_will_block($!)) {
527                         return ; 
528                 } else {
529                         $bytes_read = 0;
530                 }
531     }
532
533 FINISH:
534     if (defined $bytes_read && $bytes_read == 0) {
535                 &{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
536                 $conn->disconnect;
537     } else {
538                 unless ($conn->{disable_read}) {
539                         $conn->dequeue if exists $conn->{msg};
540                 }
541         }
542 }
543
544 sub new_client {
545         my $server_conn = shift;
546     my $sock = $server_conn->{sock}->accept();
547         if ($sock) {
548                 my $conn = $server_conn->new($server_conn->{rproc});
549                 $conn->{sock} = $sock;
550                 blocking($sock, 0);
551                 $conn->nolinger;
552                 $conn->{blocking} = 0;
553                 my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
554                 $conn->{sort} = 'Incoming';
555                 if ($eproc) {
556                         $conn->{eproc} = $eproc;
557                         set_event_handler ($sock, error => $eproc);
558                 }
559                 if ($rproc) {
560                         $conn->{rproc} = $rproc;
561                         my $callback = sub {$conn->_rcv};
562                         set_event_handler ($sock, read => $callback);
563                 } else {  # Login failed
564                         &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
565                         $conn->disconnect();
566                 }
567         } else {
568                 dbg("Msg: error on accept ($!)") if isdbg('err');
569         }
570 }
571
572 sub close_server
573 {
574         my $conn = shift;
575         set_event_handler ($conn->{sock}, read => undef, write => undef, error => undef );
576         $conn->{sock}->close;
577 }
578
579 # close all clients (this is for forking really)
580 sub close_all_clients
581 {
582         foreach my $conn (values %conns) {
583                 $conn->disconnect;
584         }
585 }
586
587 sub disable_read
588 {
589         my $conn = shift;
590         set_event_handler ($conn->{sock}, read => undef);
591         return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
592 }
593
594 #
595 #----------------------------------------------------
596 # Event loop routines used by both client and server
597
598 sub set_event_handler {
599     shift unless ref($_[0]); # shift if first arg is package name
600     my ($handle, %args) = @_;
601     my $callback;
602     if (exists $args{'write'}) {
603         $callback = $args{'write'};
604         if ($callback) {
605             $wt_callbacks{$handle} = $callback;
606             $wt_handles->add($handle);
607         } else {
608             delete $wt_callbacks{$handle};
609             $wt_handles->remove($handle);
610         }
611     }
612     if (exists $args{'read'}) {
613         $callback = $args{'read'};
614         if ($callback) {
615             $rd_callbacks{$handle} = $callback;
616             $rd_handles->add($handle);
617         } else {
618             delete $rd_callbacks{$handle};
619             $rd_handles->remove($handle);
620        }
621     }
622     if (exists $args{'error'}) {
623         $callback = $args{'error'};
624         if ($callback) {
625             $er_callbacks{$handle} = $callback;
626             $er_handles->add($handle);
627         } else {
628             delete $er_callbacks{$handle};
629             $er_handles->remove($handle);
630        }
631     }
632 }
633
634 sub event_loop {
635     my ($pkg, $loop_count, $timeout, $wronly) = @_; # event_loop(1) to process events once
636     my ($conn, $r, $w, $e, $rset, $wset, $eset);
637     while (1) {
638  
639        # Quit the loop if no handles left to process
640                 if ($wronly) {
641                         last unless $wt_handles->count();
642         
643                         ($rset, $wset, $eset) = IO::Select->select(undef, $wt_handles, undef, $timeout);
644                         
645                         foreach $w (@$wset) {
646                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
647                         }
648                 } else {
649                         
650                         last unless ($rd_handles->count() || $wt_handles->count());
651         
652                         ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
653                         
654                         foreach $e (@$eset) {
655                                 &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
656                         }
657                         foreach $r (@$rset) {
658                                 &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r};
659                         }
660                         foreach $w (@$wset) {
661                                 &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w};
662                         }
663                 }
664
665                 Timer::handler;
666                 
667         if (defined($loop_count)) {
668             last unless --$loop_count;
669         }
670     }
671 }
672
673 sub sleep
674 {
675         my ($pkg, $interval) = @_;
676         my $now = time;
677         while (time - $now < $interval) {
678                 $pkg->event_loop(10, 0.01);
679         }
680 }
681
682 sub DESTROY
683 {
684         my $conn = shift;
685         my $call = $conn->{call} || 'unallocated';
686         my $host = $conn->{peerhost} || '';
687         my $port = $conn->{peerport} || '';
688         dbg("Connection $conn->{cnum} $call [$host $port] being destroyed") if isdbg('connll');
689         $noconns--;
690 }
691
692 1;
693
694 __END__
695