X-Git-Url: http://gb7djk.dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FMsg.pm;h=00569928165bb2a630862b4d1fd9603ee05f6d8a;hb=60d6599887f29ec966d075f413c2c73b9e913212;hp=cdb72aa0e920e41433c8ab724d58b41c1719b8b1;hpb=f935566e70cd40345937910094a5fa3394a58dc1;p=spider.git diff --git a/perl/Msg.pm b/perl/Msg.pm index cdb72aa0..00569928 100644 --- a/perl/Msg.pm +++ b/perl/Msg.pm @@ -14,7 +14,7 @@ use strict; use vars qw($VERSION $BRANCH); $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ ); -$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0; +$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ || (0,0)); $main::build += $VERSION; $main::branch += $BRANCH; @@ -23,7 +23,7 @@ use IO::Socket; use DXDebug; use Timer; -use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum); +use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns $blocking_supported $cnum $total_in $total_out); %rd_callbacks = (); %wt_callbacks = (); @@ -31,12 +31,14 @@ use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $e $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); $er_handles = IO::Select->new(); +$total_in = $total_out = 0; $now = time; BEGIN { # Checks if blocking is supported eval { + local $^W; require POSIX; POSIX->import(qw(O_NONBLOCK F_SETFL F_GETFL)) }; if ($@ || $main::is_win) { @@ -50,8 +52,34 @@ BEGIN { # import as many of these errno values as are available eval { + local $^W; require Errno; Errno->import(qw(EAGAIN EINPROGRESS EWOULDBLOCK)); }; + + unless ($^O eq 'MSWin32') { + if ($] >= 5.6) { + eval { + local $^W; + require Socket; Socket->import(qw(IPPROTO_TCP TCP_NODELAY)); + }; + } else { + dbg("IPPROTO_TCP and TCP_NODELAY manually defined"); + eval 'sub IPPROTO_TCP { 6 };'; + eval 'sub TCP_NODELAY { 1 };'; + } + } + # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp + # defines EINPROGRESS as 10035. We provide it here because some + # Win32 users report POSIX::EINPROGRESS is not vendor-supported. + if ($^O eq 'MSWin32') { + eval '*EINPROGRESS = sub { 10036 };'; + eval '*EWOULDBLOCK = *EAGAIN = sub { 10035 };'; + eval '*F_GETFL = sub { 0 };'; + eval '*F_SETFL = sub { 0 };'; + eval '*IPPROTO_TCP = sub { 6 };'; + eval '*TCP_NODELAY = sub { 1 };'; + $blocking_supported = 0; # it appears that this DOESN'T work :-( + } } my $w = $^W; @@ -109,14 +137,23 @@ sub set_rproc sub blocking { return unless $blocking_supported; - - my $flags = fcntl ($_[0], F_GETFL, 0); - if ($_[1]) { - $flags &= ~O_NONBLOCK; + + # Make the handle stop blocking, the Windows way. + if ($main::is_win) { + # 126 is FIONBIO (some docs say 0x7F << 16) + ioctl( $_[0], + 0x80000000 | (4 << 16) | (ord('f') << 8) | 126, + "$_[1]" + ); } else { - $flags |= O_NONBLOCK; + my $flags = fcntl ($_[0], F_GETFL, 0); + if ($_[1]) { + $flags &= ~O_NONBLOCK; + } else { + $flags |= O_NONBLOCK; + } + fcntl ($_[0], F_SETFL, $flags); } - fcntl ($_[0], F_SETFL, $flags); } # save it @@ -176,8 +213,10 @@ sub connect { blocking($sock, 0); $conn->{blocking} = 0; + # does the host resolve? my $ip = gethostbyname($to_host); -# my $r = $sock->connect($to_port, $ip); + return undef unless $ip; + my $r = connect($sock, pack_sockaddr_in($to_port, $ip)); return undef unless $r || _err_will_block($!); @@ -190,7 +229,57 @@ sub connect { return $conn; } -sub disconnect { +sub start_program +{ + my ($conn, $line, $sort) = @_; + my $pid; + + local $^F = 10000; # make sure it ain't closed on exec + my ($a, $b) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC); + if ($a && $b) { + $a->autoflush(1); + $b->autoflush(1); + $pid = fork; + if (defined $pid) { + if ($pid) { + close $b; + $conn->{sock} = $a; + $conn->{csort} = $sort; + $conn->{lineend} = "\cM" if $sort eq 'ax25'; + $conn->{pid} = $pid; + if ($conn->{rproc}) { + my $callback = sub {$conn->_rcv}; + Msg::set_event_handler ($a, read => $callback); + } + dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect'); + } else { + $^W = 0; + dbgclose(); + STDIN->close; + STDOUT->close; + STDOUT->close; + *STDIN = IO::File->new_from_fd($b, 'r') or die; + *STDOUT = IO::File->new_from_fd($b, 'w') or die; + *STDERR = IO::File->new_from_fd($b, 'w') or die; + close $a; + unless ($main::is_win) { + # $SIG{HUP} = 'IGNORE'; + $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT'; + alarm(0); + } + exec "$line" or dbg("exec '$line' failed $!"); + } + } else { + dbg("cannot fork for $line"); + } + } else { + dbg("no socket pair $! for $line"); + } + return $pid; +} + +sub disconnect +{ my $conn = shift; return if exists $conn->{disconnecting}; @@ -208,10 +297,6 @@ sub disconnect { $call ||= 'unallocated'; dbg("Connection $conn->{cnum} $call disconnected") if isdbg('connll'); - unless ($main::is_win) { - kill 'TERM', $conn->{pid} if exists $conn->{pid}; - } - # get rid of any references for (keys %$conn) { if (ref($conn->{$_})) { @@ -219,10 +304,15 @@ sub disconnect { } } - return unless defined($sock); - set_event_handler ($sock, read => undef, write => undef, error => undef); - shutdown($sock, 3); - close($sock); + if (defined($sock)) { + set_event_handler ($sock, read => undef, write => undef, error => undef); + shutdown($sock, 3); + close($sock); + } + + unless ($main::is_win) { + kill 'TERM', $conn->{pid} if exists $conn->{pid}; + } } sub send_now { @@ -288,6 +378,7 @@ sub _send { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call send $bytes_written: ", $msg); } + $total_out += $bytes_written; $offset += $bytes_written; $bytes_to_write -= $bytes_written; } @@ -354,6 +445,33 @@ sub new_server { return $self; } + +sub nolinger +{ + my $conn = shift; + + unless ($main::is_win) { + if (isdbg('sock')) { + my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); + my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); + my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); + dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); + } + + eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE, 1)} or dbg("setsockopt keepalive: $!"); + eval {setsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER, pack("ll", 0, 0))} or dbg("setsockopt linger: $!"); + eval {setsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY, 1)} or eval {setsockopt($conn->{sock}, SOL_SOCKET, TCP_NODELAY, 1)} or dbg("setsockopt tcp_nodelay: $!"); + $conn->{sock}->autoflush(0); + + if (isdbg('sock')) { + my ($l, $t) = unpack "ll", getsockopt($conn->{sock}, SOL_SOCKET, SO_LINGER); + my $k = unpack 'l', getsockopt($conn->{sock}, SOL_SOCKET, SO_KEEPALIVE); + my $n = $main::is_win ? 0 : unpack "l", getsockopt($conn->{sock}, IPPROTO_TCP, TCP_NODELAY); + dbg("Linger is: $l $t, keepalive: $k, nagle: $n"); + } + } +} + sub dequeue { my $conn = shift; @@ -386,6 +504,7 @@ sub _rcv { # Complement to _send $bytes_read = sysread ($sock, $msg, 1024, 0); if (defined ($bytes_read)) { if ($bytes_read > 0) { + $total_in += $bytes_read; if (isdbg('raw')) { my $call = $conn->{call} || 'none'; dbgdump('raw', "$call read $bytes_read: ", $msg); @@ -436,6 +555,7 @@ sub new_client { my $conn = $server_conn->new($server_conn->{rproc}); $conn->{sock} = $sock; blocking($sock, 0); + $conn->nolinger; $conn->{blocking} = 0; my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport()); $conn->{sort} = 'Incoming'; @@ -519,24 +639,35 @@ sub set_event_handler { } sub event_loop { - my ($pkg, $loop_count, $timeout) = @_; # event_loop(1) to process events once + my ($pkg, $loop_count, $timeout, $wronly) = @_; # event_loop(1) to process events once my ($conn, $r, $w, $e, $rset, $wset, $eset); while (1) { # Quit the loop if no handles left to process - last unless ($rd_handles->count() || $wt_handles->count()); + if ($wronly) { + last unless $wt_handles->count(); - ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); - - foreach $e (@$eset) { - &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; - } - foreach $r (@$rset) { - &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r}; - } - foreach $w (@$wset) { - &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; - } + ($rset, $wset, $eset) = IO::Select->select(undef, $wt_handles, undef, $timeout); + + foreach $w (@$wset) { + &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; + } + } else { + + last unless ($rd_handles->count() || $wt_handles->count()); + + ($rset, $wset, $eset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout); + + foreach $e (@$eset) { + &{$er_callbacks{$e}}($e) if exists $er_callbacks{$e}; + } + foreach $r (@$rset) { + &{$rd_callbacks{$r}}($r) if exists $rd_callbacks{$r}; + } + foreach $w (@$wset) { + &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; + } + } Timer::handler;