fix EOF detection of incoming nodes
[spider.git] / perl / cluster.pl
index 654ac6db5994b2081a3b5bfbe698f036d4e5dd39..f100d4078f3b1440686a754aa512aee04bd18ff0 100755 (executable)
@@ -139,6 +139,8 @@ $maxconnect_node = 0;                       # Ditto but for nodes. In either case if a new incoming
                                                                # takes the no of references in the routing table above these numbers
                                                                # then the connection is refused. This only affects INCOMING connections.
 $idle_interval = 0.100;                        # the wait between invocations of the main idle loop processing.
+our $ending;                                                              # signal that we are ending;
+
 
 # send a message to call on conn and disconnect
 sub already_conn
@@ -152,13 +154,6 @@ sub already_conn
        $conn->disconnect;
 }
 
-sub error_handler
-{
-       my $dxchan = shift;
-       $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn};
-       $dxchan->disconnect(1);
-}
-
 # handle incoming messages
 sub new_channel
 {
@@ -177,7 +172,7 @@ sub new_channel
        my $dxchan = DXChannel::get($call);
        if ($dxchan) {
                if ($user && $user->is_node) {
-                       already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall));
+                       already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall));
                        return;
                }
                if ($bumpexisting) {
@@ -242,7 +237,8 @@ sub new_channel
        $conn->conns($call) if $conn->isa('IntMsg');
 
        # set callbacks
-       $conn->set_error(sub {error_handler($dxchan)});
+       $conn->set_error(sub {my $err = shift; error_handler($dxchan, $err)});
+       $conn->set_on_eof(sub {$dxchan->disconnect});
        $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);});
        $dxchan->rec($msg);
 }
@@ -253,11 +249,17 @@ sub login
        return \&new_channel;
 }
 
+our $ceasing;
+
 # cease running this program, close down all the connections nicely
 sub cease
 {
        my $dxchan;
 
+       cluck("ceasing") if $ceasing; 
+       
+       return if $ceasing++;
+       
        unless ($is_win) {
                $SIG{'TERM'} = 'IGNORE';
                $SIG{'INT'} = 'IGNORE';
@@ -272,15 +274,6 @@ sub cease
                dbg("Local::finish error $@") if $@;
        }
 
-       # disconnect nodes
-       foreach $dxchan (DXChannel::get_all_nodes) {
-           $dxchan->disconnect(2) unless $dxchan == $main::me;
-       }
-
-       # disconnect users
-       foreach $dxchan (DXChannel::get_all_users) {
-               $dxchan->disconnect;
-       }
 
        # disconnect AGW
        AGWMsg::finish();
@@ -309,8 +302,6 @@ sub cease
        $dbh->finish if $dbh;
 
        unlink $lockfn;
-#      $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
-       exit(0);
 }
 
 # the reaper of children
@@ -344,6 +335,8 @@ sub AGWrestart
        AGWMsg::init(\&new_channel);
 }
 
+our $io_disconnected;
+
 sub idle_loop
 {
        my $timenow = time;
@@ -395,6 +388,29 @@ sub idle_loop
                };
                dbg("Local::process error $@") if $@;
        }
+
+       while ($ending) {
+               my $dxchan;
+
+               dbg("DXSpider Ending $ending");
+
+               unless ($io_disconnected++) {
+
+                       # disconnect users
+                       foreach $dxchan (DXChannel::get_all_users) {
+                               $dxchan->disconnect;
+                       }
+
+                       # disconnect nodes
+                       foreach $dxchan (DXChannel::get_all_nodes) {
+                               next if $dxchan == $main::me;
+                               $dxchan->disconnect(2);
+                       }
+                       $main::me->disconnect;
+               }
+
+               Mojo::IOLoop->stop if --$ending <= 0;
+       }
 }
 
 
@@ -502,7 +518,7 @@ dbg("load badwords: " . (BadWords::load or "Ok"));
 
 # prime some signals
 unless ($DB::VERSION) {
-       $SIG{INT} = $SIG{TERM} = sub { Mojo::IOLoop->stop; };
+       $SIG{INT} = $SIG{TERM} = sub { $ending = 10; };
 }
 
 unless ($is_win) {
@@ -595,7 +611,7 @@ my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop);
 
 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
 
+dbg("After Mojo::IOLoop");
 cease(0);
 exit(0);
 
-