speed up inqueue processing
[spider.git] / perl / cluster.pl
index 230cd88e6ec7c35006a524f61e311d5f509fe720..aa95e7ac680df8c1f16ca30992312c6c69083da3 100755 (executable)
@@ -61,7 +61,6 @@ use DXProtVars;
 use DXProtout;
 use DXProt;
 use DXMsg;
-use DXCluster;
 use DXCron;
 use DXConnect;
 use DXBearing;
@@ -82,6 +81,9 @@ use BBS;
 use WCY;
 use BadWords;
 use Timer;
+use Route;
+use Route::Node;
+use Route::User;
 
 use Data::Dumper;
 use IO::File;
@@ -95,23 +97,31 @@ package main;
 use strict;
 use vars qw(@inqueue $systime $version $starttime $lockfn @outstanding_connects 
                        $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr 
-                       $clusterport $mycall $decease $build $is_win
+                       $clusterport $mycall $decease $is_win $routeroot 
                   );
 
 @inqueue = ();                                 # the main input queue, an array of hashes
 $systime = 0;                                  # the time now (in seconds)
-$version = "1.47";                             # the version no of the software
+$version = "1.48";                             # the version no of the software
 $starttime = 0;                 # the starting time of the cluster   
 #@outstanding_connects = ();     # list of outstanding connects
 @listeners = ();                               # list of listeners
 
+use vars qw($VERSION $BRANCH $build $branch);
+$VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
+$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0;
+$main::build += 15;                            # add an offset to make it bigger than last system
+$main::build += $VERSION;
+$main::branch += $BRANCH;
+
       
 # send a message to call on conn and disconnect
 sub already_conn
 {
        my ($conn, $call, $mess) = @_;
-       
-       dbg('chan', "-> D $call $mess\n"); 
+
+       $conn->disable_read(1);
+       dbg("-> D $call $mess\n") if isdbg('chan'); 
        $conn->send_now("D$call|$mess");
        sleep(2);
        $conn->disconnect;
@@ -141,24 +151,9 @@ sub new_channel
                return;
        }
        
-       # is there one already connected elsewhere in the cluster?
        if ($user) {
-               if (($user->is_node || $call eq $myalias) && !DXCluster->get_exact($call)) {
-                       ;
-               } else {
-                       if (my $ref = DXCluster->get_exact($call)) {
-                               my $mess = DXM::msg($lang, 'concluster', $call, $ref->mynode->call);
-                               already_conn($conn, $call, $mess);
-                               return;
-                       }
-               }
                $user->{lang} = $main::lang if !$user->{lang}; # to autoupdate old systems
        } else {
-               if (my $ref = DXCluster->get_exact($call)) {
-                       my $mess = DXM::msg($lang, 'concluster', $call, $ref->mynode->call);
-                       already_conn($conn, $call, $mess);
-                       return;
-               }
                $user = DXUser->new($call);
        }
        
@@ -217,7 +212,7 @@ sub cease
        eval {
                Local::finish();   # end local processing
        };
-       dbg('local', "Local::finish error $@") if $@;
+       dbg("Local::finish error $@") if $@;
 
        # disconnect nodes
        foreach $dxchan (DXChannel->get_all_nodes) {
@@ -246,7 +241,7 @@ sub cease
                $l->close_server;
        }
 
-       dbg('chan', "DXSpider version $version, build $build ended");
+       dbg("DXSpider version $version, build $build ended") if isdbg('chan');
        Log('cluster', "DXSpider V$version, build $build ended");
        dbgclose();
        Logclose();
@@ -260,44 +255,48 @@ sub reap
 {
        my $cpid;
        while (($cpid = waitpid(-1, WNOHANG)) > 0) {
-               dbg('reap', "cpid: $cpid");
+               dbg("cpid: $cpid") if isdbg('reap');
 #              Msg->pid_gone($cpid);
                $zombies-- if $zombies > 0;
        }
-       dbg('reap', "cpid: $cpid");
+       dbg("cpid: $cpid") if isdbg('reap');
 }
 
 # this is where the input queue is dealt with and things are dispatched off to other parts of
 # the cluster
 sub process_inqueue
 {
-       my $self = shift @inqueue;
-       return if !$self;
+       while (@inqueue) {
+               my $self = shift @inqueue;
+               return if !$self;
        
-       my $data = $self->{data};
-       my $dxchan = $self->{dxchan};
-       my $error;
-       my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data);
-       return unless defined $sort;
+               my $data = $self->{data};
+               my $dxchan = $self->{dxchan};
+               my $error;
+               my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data);
+               return unless defined $sort;
        
-       # do the really sexy console interface bit! (Who is going to do the TK interface then?)
-       dbg('chan', "<- $sort $call $line\n") unless $sort eq 'D';
-
-       # handle A records
-       my $user = $dxchan->user;
-       if ($sort eq 'A' || $sort eq 'O') {
-               $dxchan->start($line, $sort);  
-       } elsif ($sort eq 'I') {
-               die "\$user not defined for $call" if !defined $user;
-               # normal input
-               $dxchan->normal($line);
-               $dxchan->disconnect if ($dxchan->{state} eq 'bye');
-       } elsif ($sort eq 'Z') {
-               $dxchan->disconnect;
-       } elsif ($sort eq 'D') {
-               ;                       # ignored (an echo)
-       } else {
-               print STDERR atime, " Unknown command letter ($sort) received from $call\n";
+               # do the really sexy console interface bit! (Who is going to do the TK interface then?)
+               dbg("<- $sort $call $line\n") if $sort ne 'D' && isdbg('chan');
+
+               # handle A records
+               my $user = $dxchan->user;
+               if ($sort eq 'A' || $sort eq 'O') {
+                       $dxchan->start($line, $sort);  
+               } elsif ($sort eq 'I') {
+                       die "\$user not defined for $call" if !defined $user;
+                       # normal input
+                       $dxchan->normal($line);
+                       $dxchan->disconnect if ($dxchan->{state} eq 'bye');
+               } elsif ($sort eq 'Z') {
+                       $dxchan->disconnect;
+               } elsif ($sort eq 'D') {
+                       ;                                       # ignored (an echo)
+               } elsif ($sort eq 'G') {
+                       $dxchan->enhanced($line);
+               } else {
+                       print STDERR atime, " Unknown command letter ($sort) received from $call\n";
+               }
        }
 }
 
@@ -327,66 +326,50 @@ $starttime = $systime = time;
 $lang = 'en' unless $lang;
 
 # open the debug file, set various FHs to be unbuffered
-dbginit();
+dbginit(\&DXCommandmode::broadcast_debug);
 foreach (@debug) {
        dbgadd($_);
 }
 STDOUT->autoflush(1);
 
 # calculate build number
-$build = $main::version;
-
-my @fn;
-open(CL, "$main::root/perl/cluster.pl") or die "Cannot open cluster.pl $!";
-while (<CL>) {
-       next unless /^use\s+([\w:_]+)/;
-       push @fn, $1;
-}
-close CL;
-foreach my $fn (@fn) {
-       open(CL, "$main::root/perl/${fn}.pm") or next;
-       while (<CL>) {
-               if (/^#\s+\$Id:\s+[\w\._]+,v\s+(\d+\.\d+)/ ) {
-                       $build += $1;
-                       last;
-               }
-       }
-       close CL;
-}
+$build += $main::version;
+$build = "$build.$branch" if $branch;
 
 Log('cluster', "DXSpider V$version, build $build started");
 
 # banner
-dbg('err', "DXSpider Version $version, build $build started", "Copyright (c) 1998-2001 Dirk Koopman G1TLH");
+dbg("Copyright (c) 1998-2001 Dirk Koopman G1TLH");
+dbg("DXSpider Version $version, build $build started");
 
 # load Prefixes
-dbg('err', "loading prefixes ...");
+dbg("loading prefixes ...");
 Prefix::load();
 
 # load band data
-dbg('err', "loading band data ...");
+dbg("loading band data ...");
 Bands::load();
 
 # initialise User file system
-dbg('err', "loading user file system ..."); 
+dbg("loading user file system ..."); 
 DXUser->init($userfn, 1);
 
 # start listening for incoming messages/connects
-dbg('err', "starting listeners ...");
+dbg("starting listeners ...");
 my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login);
 $conn->conns("Server $clusteraddr/$clusterport");
 push @listeners, $conn;
-dbg('err', "Internal port: $clusteraddr $clusterport");
+dbg("Internal port: $clusteraddr $clusterport");
 foreach my $l (@main::listen) {
        $conn = ExtMsg->new_server($l->[0], $l->[1], \&login);
        $conn->conns("Server $l->[0]/$l->[1]");
        push @listeners, $conn;
-       dbg('err', "External Port: $l->[0] $l->[1]");
+       dbg("External Port: $l->[0] $l->[1]");
 }
 AGWrestart();
 
 # load bad words
-dbg('err', "load badwords: " . (BadWords::load or "Ok"));
+dbg("load badwords: " . (BadWords::load or "Ok"));
 
 # prime some signals
 unless ($DB::VERSION) {
@@ -397,15 +380,15 @@ unless ($is_win) {
        $SIG{HUP} = 'IGNORE';
        $SIG{CHLD} = sub { $zombies++ };
        
-       $SIG{PIPE} = sub {      dbg('err', "Broken PIPE signal received"); };
-       $SIG{IO} = sub {        dbg('err', "SIGIO received"); };
+       $SIG{PIPE} = sub {      dbg("Broken PIPE signal received"); };
+       $SIG{IO} = sub {        dbg("SIGIO received"); };
        $SIG{WINCH} = $SIG{STOP} = $SIG{CONT} = 'IGNORE';
        $SIG{KILL} = 'DEFAULT';     # as if it matters....
 
        # catch the rest with a hopeful message
        for (keys %SIG) {
                if (!$SIG{$_}) {
-                       #               dbg('chan', "Catching SIG $_");
+                       #               dbg("Catching SIG $_") if isdbg('chan');
                        $SIG{$_} = sub { my $sig = shift;       DXDebug::confess("Caught signal $sig");  }; 
                }
        }
@@ -428,37 +411,46 @@ WCY->init();
 Spot->init();
 
 # initialise the protocol engine
-dbg('err', "reading in duplicate spot and WWV info ...");
+dbg("reading in duplicate spot and WWV info ...");
 DXProt->init();
 
 # put in a DXCluster node for us here so we can add users and take them away
-DXNode->new($DXProt::me, $mycall, 0, 1, $DXProt::myprot_version); 
+$routeroot = Route::Node->new($mycall, $version*100+5300, Route::here($DXProt::me->here)|Route::conf($DXProt::me->conf));
+
+# make sure that there is a routing OUTPUT node default file
+#unless (Filter::read_in('route', 'node_default', 0)) {
+#      my $dxcc = $DXProt::me->dxcc;
+#      $Route::filterdef->cmd($DXProt::me, 'route', 'accept', "node_default call $mycall" );
+#}
 
 # read in any existing message headers and clean out old crap
-dbg('err', "reading existing message headers ...");
+dbg("reading existing message headers ...");
 DXMsg->init();
 DXMsg::clean_old();
 
 # read in any cron jobs
-dbg('err', "reading cron jobs ...");
+dbg("reading cron jobs ...");
 DXCron->init();
 
 # read in database descriptors
-dbg('err', "reading database descriptors ...");
+dbg("reading database descriptors ...");
 DXDb::load();
 
 # starting local stuff
-dbg('err', "doing local initialisation ...");
+dbg("doing local initialisation ...");
 eval {
        Local::init();
 };
-dbg('local', "Local::init error $@") if $@;
+dbg("Local::init error $@") if $@;
+
+dbg("cleaning out old debug files");
+DXDebug::dbgclean();
 
 # print various flags
-#dbg('err', "seful info - \$^D: $^D \$^W: $^W \$^S: $^S \$^P: $^P");
+#dbg("seful info - \$^D: $^D \$^W: $^W \$^S: $^S \$^P: $^P");
 
 # this, such as it is, is the main loop!
-dbg('err', "orft we jolly well go ...");
+dbg("orft we jolly well go ...");
 
 #open(DB::OUT, "|tee /tmp/aa");
 
@@ -487,7 +479,7 @@ for (;;) {
                eval { 
                        Local::process();       # do any localised processing
                };
-               dbg('local', "Local::process error $@") if $@;
+               dbg("Local::process error $@") if $@;
        }
        if ($decease) {
                last if --$decease <= 0;