X-Git-Url: http://gb7djk.dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2Fcluster.pl;h=1e10eb518aae676ddbe0d8b85df3825951e52455;hb=36e0c1ffda9295a4090eab75360f1b59d964ada3;hp=39c65c02d4d151a64efde0674e291725fe84236c;hpb=b0d9bed295647635da9cd1ceeb5e4592bd87094b;p=spider.git diff --git a/perl/cluster.pl b/perl/cluster.pl index 39c65c02..1e10eb51 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -10,7 +10,7 @@ # # -require 5.004; +require 5.10.1; # make sure that modules are searched in the order local then perl BEGIN { @@ -123,7 +123,7 @@ use vars qw(@inqueue $systime $starttime $lockfn @outstanding_connects $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr $clusterport $mycall $decease $is_win $routeroot $me $reqreg $bumpexisting $allowdxby $dbh $dsn $dbuser $dbpass $do_xml $systime_days $systime_daystart - $can_encode $maxconnect_user $maxconnect_node $idle_interval + $can_encode $maxconnect_user $maxconnect_node $idle_interval $log_flush_interval ); @inqueue = (); # the main input queue, an array of hashes @@ -138,7 +138,11 @@ $maxconnect_user = 3; # the maximum no of concurrent connections a user can ha $maxconnect_node = 0; # Ditto but for nodes. In either case if a new incoming connection # takes the no of references in the routing table above these numbers # then the connection is refused. This only affects INCOMING connections. -$idle_interval = 0.100; # the wait between invocations of the main idle loop processing. +$idle_interval = 0.500; # the wait between invocations of the main idle loop processing. +$log_flush_interval = 2; # interval to wait between log flushes + +our $ending; # signal that we are ending; + # send a message to call on conn and disconnect sub already_conn @@ -152,13 +156,6 @@ sub already_conn $conn->disconnect; } -sub error_handler -{ - my $dxchan = shift; - $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn}; - $dxchan->disconnect(1); -} - # handle incoming messages sub new_channel { @@ -177,7 +174,7 @@ sub new_channel my $dxchan = DXChannel::get($call); if ($dxchan) { if ($user && $user->is_node) { - already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall)); + already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall)); return; } if ($bumpexisting) { @@ -242,7 +239,8 @@ sub new_channel $conn->conns($call) if $conn->isa('IntMsg'); # set callbacks - $conn->set_error(sub {error_handler($dxchan)}); + $conn->set_error(sub {my $err = shift; LogDbg('DXCommand', "Comms error '$err' received for call $dxchan->{call}"); $dxchan->disconnect(1);}); + $conn->set_on_eof(sub {$dxchan->disconnect}); $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);}); $dxchan->rec($msg); } @@ -253,11 +251,17 @@ sub login return \&new_channel; } +our $ceasing; + # cease running this program, close down all the connections nicely sub cease { my $dxchan; + cluck("ceasing") if $ceasing; + + return if $ceasing++; + unless ($is_win) { $SIG{'TERM'} = 'IGNORE'; $SIG{'INT'} = 'IGNORE'; @@ -272,15 +276,6 @@ sub cease dbg("Local::finish error $@") if $@; } - # disconnect nodes - foreach $dxchan (DXChannel::get_all_nodes) { - $dxchan->disconnect(2) unless $dxchan == $main::me; - } - - # disconnect users - foreach $dxchan (DXChannel::get_all_users) { - $dxchan->disconnect; - } # disconnect AGW AGWMsg::finish(); @@ -309,8 +304,6 @@ sub cease $dbh->finish if $dbh; unlink $lockfn; -# $SIG{__WARN__} = $SIG{__DIE__} = sub {my $a = shift; cluck($a); }; - exit(0); } # the reaper of children @@ -344,11 +337,14 @@ sub AGWrestart AGWMsg::init(\&new_channel); } +our $io_disconnected; + sub idle_loop { my $timenow = time; - DXChannel::process(); + BPQMsg::process(); +# DXChannel::process(); # $DB::trace = 0; @@ -371,23 +367,16 @@ sub idle_loop DXDb::process(); DXUser::process(); DXDupe::process(); - $systime_days = $days; - $systime_daystart = $days * 86400; + DXCron::process(); # do cron jobs + IsoTime::update($systime); + DXProt::process(); # process ongoing ak1a pcxx stuff + DXConnect::process(); + DXUser::process(); + AGWMsg::process(); + + Timer::handler(); + DXLog::flushall(); } - IsoTime::update($systime); - DXCron::process(); # do cron jobs - DXCommandmode::process(); # process ongoing command mode stuff - DXXml::process(); - DXProt::process(); # process ongoing ak1a pcxx stuff - DXConnect::process(); - DXMsg::process(); - DXDb::process(); - DXUser::process(); - DXDupe::process(); - AGWMsg::process(); - BPQMsg::process(); - - Timer::handler(); if (defined &Local::process) { eval { @@ -395,6 +384,29 @@ sub idle_loop }; dbg("Local::process error $@") if $@; } + + while ($ending) { + my $dxchan; + + dbg("DXSpider Ending $ending"); + + unless ($io_disconnected++) { + + # disconnect users + foreach $dxchan (DXChannel::get_all_users) { + $dxchan->disconnect; + } + + # disconnect nodes + foreach $dxchan (DXChannel::get_all_nodes) { + next if $dxchan == $main::me; + $dxchan->disconnect(2); + } + $main::me->disconnect; + } + + Mojo::IOLoop->stop if --$ending <= 0; + } } @@ -426,7 +438,7 @@ if (DXSql::init($dsn)) { $dbh = $dbh->connect($dsn, $dbuser, $dbpass) if $dbh; } -# try to load Encode +# try to load Encode and Git { local $^W = 0; my $w = $SIG{__DIE__}; @@ -436,6 +448,22 @@ if (DXSql::init($dsn)) { import Encode; $can_encode = 1; } + eval { require Git; }; + unless ($@) { + import Git; + + # determine the real version number + my $repo = Git->repository(Directory => "$root/.git"); + if ($repo) { + my $desc = $repo->command_oneline(['describe', '--long'], STDERR => 0); + if ($desc) { + my ($v, $s, $b, $g) = $desc =~ /^([\d.]+)(?:\.(\d+))?-(\d+)-g([0-9a-f]+)/; + $version = $v; + $build = $b || 0; + $gitversion = "$g\[r]"; + } + } + } $SIG{__DIE__} = $w; } @@ -502,7 +530,7 @@ dbg("load badwords: " . (BadWords::load or "Ok")); # prime some signals unless ($DB::VERSION) { - $SIG{INT} = $SIG{TERM} = sub { Mojo::IOLoop->stop; }; + $SIG{INT} = $SIG{TERM} = sub { $ending = 10; }; } unless ($is_win) { @@ -592,10 +620,11 @@ $script->run($main::me) if $script; #open(DB::OUT, "|tee /tmp/aa"); my $main_loop = Mojo::IOLoop->recurring($idle_interval => \&idle_loop); +my $log_flush_loop = Mojo::IOLoop->recurring($log_flush_interval => \&DXLog::flushall); Mojo::IOLoop->start unless Mojo::IOLoop->is_running; +dbg("After Mojo::IOLoop"); cease(0); exit(0); -