new style disconnect
[spider.git] / perl / Msg.pm
index ad09c85da0f784d98b12634f01ad1a0e421144d5..e3385d9166585436dad106bd94e8b94aa6b0c411 100644 (file)
@@ -20,7 +20,7 @@ use Mojo::IOLoop::Stream;
 use DXDebug;
 use Timer;
 
-use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout);
+use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout $disc_waittime);
 
 $total_in = $total_out = 0;
 
@@ -28,6 +28,9 @@ $now = time;
 
 $cnum = 0;
 $connect_timeout = 5;
+$disc_waittime = 3;
+
+our %delqueue;
 
 #
 #-----------------------------------------------------------------
@@ -237,16 +240,61 @@ sub start_program
        return $pid;
 }
 
-sub disconnect 
+sub disconnect
 {
-    my $conn = shift;
-       return if exists $conn->{disconnecting};
+       my $conn = shift;
+       my $count = $conn->{disconnecting}++;
+       if (isdbg('connll')) {
+               my ($pkg, $fn, $line) = caller;
+               dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
+       }
+       return if $count;
+
+       
+       my $sock = $conn->{sock};
+       if ($sock) {
+
+               # remove me from the active list
+               my $call;
+               if ($call = $conn->{call}) {
+                       my $ref = $conns{$call};
+                       delete $conns{$call} if $ref && $ref == $conn;
+               }
+               $conn->{delay} = Mojo::IOLoop->delay (
+#                               Mojo::IOLoop->delay (
+                                                                                         sub {
+                                                                                                 my $delay = shift;
+                                                                                                 dbg("before drain $call");
+                                                                                                 $sock->on(drain => $delay->begin);
+                                                                                                 1;
+                                                                                         },
+                                                                                         sub {
+                                                                                                 my $delay = shift;
+                                                                                                 _close_it($conn);
+                                                                                                 1;
+                                                                                         }
+                                                                                        );
+               $conn->{delay}->wait;
+               
+               $delqueue{$conn} = $conn; # save this connection until everything is finished
+       } else {
+               dbg((ref $conn) . " socket missing on $conn->{call}") if isdbg('connll');
+               _close_it($conn);
+       }
+}
 
-       $conn->{disconnecting} = 1;
+sub _close_it
+{
+    my $conn = shift;
     my $sock = delete $conn->{sock};
        $conn->{state} = 'E';
        $conn->{timeout}->del if $conn->{timeout};
 
+       if (isdbg('connll')) {
+               my ($pkg, $fn, $line) = caller;
+               dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
+       }
+
        # be careful to delete the correct one
        my $call;
        if ($call = $conn->{call}) {
@@ -254,12 +302,18 @@ sub disconnect
                delete $conns{$call} if $ref && $ref == $conn;
        }
        $call ||= 'unallocated';
-       dbg((ref $conn) . " Connection $conn->{cnum} $call disconnected") if isdbg('connll');
+
+       dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll');
        
        if ($conn->{on_disconnect}) {
                &{$conn->{on_disconnect}}($conn);
        }
 
+       if ($sock) {
+               dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll');
+               $sock->close_gracefully;
+       }
+       
        # get rid of any references
        for (keys %$conn) {
                if (ref($conn->{$_})) {
@@ -267,8 +321,7 @@ sub disconnect
                }
        }
 
-       $sock->close_gracefully if defined $sock && $sock->can('close_gracefully');
-       undef $sock;
+       delete $delqueue{$conn};        # finally remove the $conn
        
        unless ($main::is_win) {
                kill 'TERM', $conn->{pid} if exists $conn->{pid};
@@ -490,12 +543,22 @@ sub sleep
 sub DESTROY
 {
        my $conn = shift;
+       my $call = $conn->{call} || 'unallocated';
+
+       if (isdbg('connll')) {
+               my ($pkg, $fn, $line) = caller;
+               dbg((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line ");
+               
+       }
+
        my $call = $conn->{call} || 'unallocated';
        my $host = $conn->{peerhost} || '';
        my $port = $conn->{peerport} || '';
        my $sock = $conn->{sock};
 
-       $sock->close_gracefully if defined $sock && $sock->can('close_gracefully');
+       if ($sock) {
+               $sock->close_gracefully;
+       }
        
        $noconns--;
        dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');