fix sh/dx <call> with sql spot logging
[spider.git] / perl / cluster.pl
index a3e915a614ebb45e41686a7ef7162f1d4aff6738..1e10eb518aae676ddbe0d8b85df3825951e52455 100755 (executable)
@@ -10,7 +10,7 @@
 #
 #
 
-require 5.004;
+require 5.10.1;
 
 # make sure that modules are searched in the order local then perl
 BEGIN {
@@ -123,7 +123,7 @@ use vars qw(@inqueue $systime $starttime $lockfn @outstanding_connects
                        $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr
                        $clusterport $mycall $decease $is_win $routeroot $me $reqreg $bumpexisting
                        $allowdxby $dbh $dsn $dbuser $dbpass $do_xml $systime_days $systime_daystart
-                       $can_encode $maxconnect_user $maxconnect_node $idle_interval
+                       $can_encode $maxconnect_user $maxconnect_node $idle_interval $log_flush_interval
                   );
 
 @inqueue = ();                                 # the main input queue, an array of hashes
@@ -138,7 +138,11 @@ $maxconnect_user = 3;                      # the maximum no of concurrent connections a user can ha
 $maxconnect_node = 0;                  # Ditto but for nodes. In either case if a new incoming connection
                                                                # takes the no of references in the routing table above these numbers
                                                                # then the connection is refused. This only affects INCOMING connections.
-$idle_interval = 0.100;                        # the wait between invocations of the main idle loop processing.
+$idle_interval = 0.500;                # the wait between invocations of the main idle loop processing.
+$log_flush_interval = 2;               # interval to wait between log flushes
+
+our $ending;                                                              # signal that we are ending;
+
 
 # send a message to call on conn and disconnect
 sub already_conn
@@ -152,13 +156,6 @@ sub already_conn
        $conn->disconnect;
 }
 
-sub error_handler
-{
-       my $dxchan = shift;
-       $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn};
-       $dxchan->disconnect(1);
-}
-
 # handle incoming messages
 sub new_channel
 {
@@ -177,7 +174,7 @@ sub new_channel
        my $dxchan = DXChannel::get($call);
        if ($dxchan) {
                if ($user && $user->is_node) {
-                       already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall));
+                       already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall));
                        return;
                }
                if ($bumpexisting) {
@@ -242,7 +239,8 @@ sub new_channel
        $conn->conns($call) if $conn->isa('IntMsg');
 
        # set callbacks
-       $conn->set_error(sub {error_handler($dxchan)});
+       $conn->set_error(sub {my $err = shift; LogDbg('DXCommand', "Comms error '$err' received for call $dxchan->{call}"); $dxchan->disconnect(1);});
+       $conn->set_on_eof(sub {$dxchan->disconnect});
        $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);});
        $dxchan->rec($msg);
 }
@@ -253,11 +251,17 @@ sub login
        return \&new_channel;
 }
 
+our $ceasing;
+
 # cease running this program, close down all the connections nicely
 sub cease
 {
        my $dxchan;
 
+       cluck("ceasing") if $ceasing; 
+       
+       return if $ceasing++;
+       
        unless ($is_win) {
                $SIG{'TERM'} = 'IGNORE';
                $SIG{'INT'} = 'IGNORE';
@@ -272,15 +276,6 @@ sub cease
                dbg("Local::finish error $@") if $@;
        }
 
-       # disconnect nodes
-       foreach $dxchan (DXChannel::get_all_nodes) {
-           $dxchan->disconnect(2) unless $dxchan == $main::me;
-       }
-
-       # disconnect users
-       foreach $dxchan (DXChannel::get_all_users) {
-               $dxchan->disconnect;
-       }
 
        # disconnect AGW
        AGWMsg::finish();
@@ -301,7 +296,7 @@ sub cease
                $l->close_server;
        }
 
-       LogDbg('cluster', "DXSpider V$version, build $subversion.$build (git: $gitversion) ended");
+       LogDbg('cluster', "DXSpider V$version, build $build (git: $gitversion) ended");
        dbg("bye bye everyone - bye bye");
        dbgclose();
        Logclose();
@@ -309,8 +304,6 @@ sub cease
        $dbh->finish if $dbh;
 
        unlink $lockfn;
-#      $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
-       exit(0);
 }
 
 # the reaper of children
@@ -344,11 +337,14 @@ sub AGWrestart
        AGWMsg::init(\&new_channel);
 }
 
+our $io_disconnected;
+
 sub idle_loop
 {
        my $timenow = time;
 
-       DXChannel::process();
+       BPQMsg::process();
+#      DXChannel::process();
 
        #      $DB::trace = 0;
 
@@ -371,23 +367,16 @@ sub idle_loop
                DXDb::process();
                DXUser::process();
                DXDupe::process();
-               $systime_days = $days;
-               $systime_daystart = $days * 86400;
+               DXCron::process();                      # do cron jobs
+               IsoTime::update($systime);
+               DXProt::process();                      # process ongoing ak1a pcxx stuff
+               DXConnect::process();
+               DXUser::process();
+               AGWMsg::process();
+               
+               Timer::handler();
+               DXLog::flushall();
        }
-       IsoTime::update($systime);
-       DXCron::process();                      # do cron jobs
-       DXCommandmode::process();       # process ongoing command mode stuff
-       DXXml::process();
-       DXProt::process();                      # process ongoing ak1a pcxx stuff
-       DXConnect::process();
-       DXMsg::process();
-       DXDb::process();
-       DXUser::process();
-       DXDupe::process();
-       AGWMsg::process();
-       BPQMsg::process();
-
-       Timer::handler();
 
        if (defined &Local::process) {
                eval {
@@ -395,6 +384,29 @@ sub idle_loop
                };
                dbg("Local::process error $@") if $@;
        }
+
+       while ($ending) {
+               my $dxchan;
+
+               dbg("DXSpider Ending $ending");
+
+               unless ($io_disconnected++) {
+
+                       # disconnect users
+                       foreach $dxchan (DXChannel::get_all_users) {
+                               $dxchan->disconnect;
+                       }
+
+                       # disconnect nodes
+                       foreach $dxchan (DXChannel::get_all_nodes) {
+                               next if $dxchan == $main::me;
+                               $dxchan->disconnect(2);
+                       }
+                       $main::me->disconnect;
+               }
+
+               Mojo::IOLoop->stop if --$ending <= 0;
+       }
 }
 
 
@@ -426,7 +438,7 @@ if (DXSql::init($dsn)) {
        $dbh = $dbh->connect($dsn, $dbuser, $dbpass) if $dbh;
 }
 
-# try to load Encode
+# try to load Encode and Git
 {
        local $^W = 0;
        my $w = $SIG{__DIE__};
@@ -436,6 +448,22 @@ if (DXSql::init($dsn)) {
                import Encode;
                $can_encode = 1;
        }
+       eval { require Git; };
+       unless ($@) {
+               import Git;
+               
+               # determine the real version number
+               my $repo = Git->repository(Directory => "$root/.git");
+               if ($repo) {
+                       my $desc = $repo->command_oneline(['describe', '--long'], STDERR => 0);
+                       if ($desc) {
+                               my ($v, $s, $b, $g) = $desc =~ /^([\d.]+)(?:\.(\d+))?-(\d+)-g([0-9a-f]+)/;
+                               $version = $v;
+                               $build = $b || 0;
+                               $gitversion = "$g\[r]";
+                       }
+               }
+       }
        $SIG{__DIE__} = $w;
 }
 
@@ -445,7 +473,7 @@ DXXml::init();
 # banner
 my ($year) = (gmtime)[5];
 $year += 1900;
-LogDbg('cluster', "DXSpider V$version, build $subversion.$build (git: $gitversion) started");
+LogDbg('cluster', "DXSpider V$version, build $build (git: $gitversion) started");
 dbg("Copyright (c) 1998-$year Dirk Koopman G1TLH");
 
 # load Prefixes
@@ -502,7 +530,7 @@ dbg("load badwords: " . (BadWords::load or "Ok"));
 
 # prime some signals
 unless ($DB::VERSION) {
-       $SIG{INT} = $SIG{TERM} = sub { Mojo::IOLoop->stop; };
+       $SIG{INT} = $SIG{TERM} = sub { $ending = 10; };
 }
 
 unless ($is_win) {
@@ -591,9 +619,12 @@ $script->run($main::me) if $script;
 
 #open(DB::OUT, "|tee /tmp/aa");
 
-Mojo::IOLoop->start;
+my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop);
+my $log_flush_loop = Mojo::IOLoop->recurring($log_flush_interval => \&DXLog::flushall);
+
+Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
 
+dbg("After Mojo::IOLoop");
 cease(0);
 exit(0);
 
-