use DXDebug;
use Timer;
-use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout);
+use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout $disc_waittime);
$total_in = $total_out = 0;
$cnum = 0;
$connect_timeout = 5;
+$disc_waittime = 1.5;
+
+our %delqueue;
#
#-----------------------------------------------------------------
{
my $conn = shift;
my $callback = shift;
- $conn->{sock}->on(error => sub {$callback->($conn, $_[1]);});
+ $conn->{sock}->on(error => sub {$callback->($_[1]);});
}
sub set_on_eof
{
my $conn = shift;
my $callback = shift;
- $conn->{sock}->on(close => sub {$callback->($conn);});
+ $conn->{sock}->on(close => sub {$callback->()});
}
sub set_rproc
sub peerhost
{
my $conn = shift;
- $conn->{peerhost} ||= 'ax25' if $conn->ax25;
- $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
- $conn->{peerhost} ||= 'UNKNOWN';
+ unless ($conn->{peerhost}) {
+ $conn->{peerhost} ||= 'ax25' if $conn->ax25;
+ $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
+ $conn->{peerhost} ||= 'UNKNOWN';
+ }
return $conn->{peerhost};
}
undef $conn->{sock};
my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
$sock->on(read => sub {$conn->_rcv($_[1]);} );
- $sock->on(error => sub {$conn->disconnect;});
- $sock->on(close => sub {$conn->disconnect;});
+ $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;});
+ $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;});
$sock->timeout(0);
$sock->start;
- dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
+ $conn->{peerhost} = eval { $handle->peerhost; };
+ dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg ('connect');
if ($conn->{on_connect}) {
- &{$conn->{on_connect}}($conn);
+ &{$conn->{on_connect}}($conn, $handle);
}
}
}
sub connect {
- my ($pkg, $to_host, $to_port, $rproc, %args) = @_;
+ my ($pkg, $to_host, $to_port, %args) = @_;
my $timeout = delete $args{timeout} || $connect_timeout;
# Create a connection end-point object
my $conn = $pkg;
unless (ref $pkg) {
+ my $rproc = delete $args{rproc};
$conn = $pkg->new($rproc);
}
$conn->{peerhost} = $to_host;
my $sock;
$conn->{sock} = $sock = Mojo::IOLoop::Client->new;
- $sock->on(connect => sub {$conn->_on_connect($_[1])} );
- $sock->on(error => sub {$conn->disconnect});
- $sock->on(close => sub {$conn->disconnect});
+ $sock->on(connect => sub {
+ $conn->_on_connect($_[1])
+ } );
+ $sock->on(error => sub {
+ &{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc};
+ delete $conn->{sock};
+ $conn->disconnect
+ });
+ $sock->on(close => sub {
+ delete $conn->{sock};
+ $conn->disconnect}
+ );
# copy any args like on_connect, on_disconnect etc
while (my ($k, $v) = each %args) {
return $pid;
}
-sub disconnect
+sub disconnect
{
- my $conn = shift;
- return if exists $conn->{disconnecting};
-
- $conn->{disconnecting} = 1;
- my $sock = delete $conn->{sock};
- $conn->{state} = 'E';
- $conn->{timeout}->del if $conn->{timeout};
+ my $conn = shift;
+ my $count = $conn->{disconnecting}++;
+ my $dbg = isdbg('connll');
+ my ($pkg, $fn, $line) = caller if $dbg;
+
+ if ($count >= 2) {
+ dbgtrace((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line FORCING CLOSE ") if $dbg;
+ _close_it($conn);
+ return;
+ }
+ dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ") if $dbg;
+ return if $count;
+ # remove this conn from the active queue
# be careful to delete the correct one
my $call;
if ($call = $conn->{call}) {
delete $conns{$call} if $ref && $ref == $conn;
}
$call ||= 'unallocated';
- dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll');
+
+ $delqueue{$conn} = $conn; # save this connection until everything is finished
+ my $sock = $conn->{sock};
+ if ($sock) {
+ if ($sock->{buffer}) {
+ my $lth = length $sock->{buffer};
+ Mojo::IOLoop->timer($disc_waittime, sub {
+ dbg("Buffer contained $lth characters, coordinated for $disc_waittime secs, now disconnecting $call") if $dbg;
+ _close_it($conn);
+ });
+ } else {
+ dbg("Buffer empty, just close $call") if $dbg;
+ _close_it($conn);
+ }
+ } else {
+ dbg((ref $conn) . " socket missing on $conn->{call}") if $dbg;
+ _close_it($conn);
+ }
+}
+
+sub _close_it
+{
+ my $conn = shift;
+ my $sock = delete $conn->{sock};
+ $conn->{state} = 'E';
+ $conn->{timeout}->del if $conn->{timeout};
+
+ my $call = $conn->{call};
+
+ if (isdbg('connll')) {
+ my ($pkg, $fn, $line) = caller;
+ dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
+ }
+
+
+ dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll');
if ($conn->{on_disconnect}) {
&{$conn->{on_disconnect}}($conn);
}
+ if ($sock) {
+ dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll');
+ $sock->close_gracefully;
+ }
+
# get rid of any references
for (keys %$conn) {
if (ref($conn->{$_})) {
}
}
- if (defined($sock)) {
- $sock->close_gracefully;
- }
+ delete $delqueue{$conn}; # finally remove the $conn
unless ($main::is_win) {
kill 'TERM', $conn->{pid} if exists $conn->{pid};
my $conn = shift;
my $rq = $conn->{outqueue};
my $sock = $conn->{sock};
+ return unless defined $sock;
+ return if $conn->{disconnecting};
+
while (@$rq) {
my $data = shift @$rq;
my $lth = length $data;
}
if (defined $sock) {
$sock->write($data);
- $total_out = $lth;
+ $total_out += $lth;
} else {
dbg("_send_stuff $call ending data ignored: $data");
}
my $msg = shift;
my $sock = $conn->{sock};
return unless defined($sock);
+ return if $conn->{disconnecting};
+
+ $total_in += length $msg;
my @lines;
if (isdbg('raw')) {
sub new_client {
my $server_conn = shift;
- my $client = shift;
+ my $handle = shift;
my $conn = $server_conn->new($server_conn->{rproc});
- my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($client);
+ my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
$sock->on(read => sub {$conn->_rcv($_[1])});
$sock->timeout(0);
$sock->start;
- dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll');
-
- my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $client->peerhost, $conn->{peerport} = $client->peerport);
+ $conn->{peerhost} = $handle->peerhost || 'unknown';
+ $conn->{peerhost} =~ s|^::ffff:||; # chop off leading pseudo IPV6 stuff on dual stack listeners
+ $conn->{peerport} = $handle->peerport || 0;
+ dbg((ref $conn) . " accept $conn->{cnum} from $conn->{peerhost}:$conn->{peerport}") if isdbg('conn') || isdbg('connect');
+ my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost}, $conn->{peerport});
$conn->{sort} = 'Incoming';
if ($eproc) {
$conn->{eproc} = $eproc;
sub DESTROY
{
my $conn = shift;
+ my $call = $conn->{call} || 'unallocated';
+
+ if (isdbg('connll')) {
+ my ($pkg, $fn, $line) = caller;
+ dbgtrace((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line ");
+ }
+
my $call = $conn->{call} || 'unallocated';
my $host = $conn->{peerhost} || '';
my $port = $conn->{peerport} || '';
+ my $sock = $conn->{sock};
+
+ if ($sock) {
+ $sock->close_gracefully;
+ }
+
$noconns--;
dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');
}