X-Git-Url: http://gb7djk.dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2Fcluster.pl;h=af1fe2e70cc26e8abd6f08b8b7b9accdf9269029;hb=da115e307d67c9aa6756a7b3b1cfa7a6f70e2e0d;hp=d108a560c3187871f370b6615afad51bab761ce4;hpb=cde4b624b9eccd5b360d9cc7047a7f73527885aa;p=spider.git diff --git a/perl/cluster.pl b/perl/cluster.pl index d108a560..af1fe2e7 100755 --- a/perl/cluster.pl +++ b/perl/cluster.pl @@ -23,9 +23,17 @@ BEGIN { unshift @INC, "$root/perl"; # this IS the right way round! unshift @INC, "$root/local"; + # do some validation of the input + die "The directory $root doesn't exist, please RTFM" unless -d $root; + die "$root/local doesn't exist, please RTFM" unless -d "$root/local"; + die "$root/local/DXVars.pm doesn't exist, please RTFM" unless -e "$root/local/DXVars.pm"; + + mkdir "$root/local_cmd", 0777 unless -d "$root/local_cmd"; + + # try to create and lock a lockfile (this isn't atomic but # should do for now - $lockfn = "$root/perl/cluster.lck"; # lock file name + $lockfn = "$root/local/cluster.lck"; # lock file name if (-e $lockfn) { open(CLLOCK, "$lockfn") or die "Can't open Lockfile ($lockfn) $!"; my $pid = ; @@ -39,6 +47,14 @@ BEGIN { $is_win = ($^O =~ /^MS/ || $^O =~ /^OS-2/) ? 1 : 0; # is it Windows? $systime = time; + + sub main::mkver + { + my $s = shift; + my ($v, $b) = $s =~ /(\d+\.\d+)(?:\.(\d+\.\d+))?/; + $main::build += sprintf "%.3f", $v; + $main::branch += sprintf("%.3f", $b) if $b; + } } use DXVars; @@ -60,7 +76,7 @@ use DXCommandmode; use DXProtVars; use DXProtout; use DXProt; -use QXProt; +use Aranea; use DXMsg; use DXCron; use DXConnect; @@ -87,6 +103,13 @@ use Route; use Route::Node; use Route::User; use Editable; +use Mrtg; +use USDB; +use UDPMsg; +use QSL; +use Thingy; +use RouteDB; +use AMsg; use Data::Dumper; use IO::File; @@ -100,23 +123,224 @@ package main; use strict; use vars qw(@inqueue $systime $version $starttime $lockfn @outstanding_connects $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr - $clusterport $mycall $decease $is_win $routeroot $me $reqreg + $clusterport $mycall $decease $is_win $routeroot $me $reqreg $bumpexisting + $allowdxby ); @inqueue = (); # the main input queue, an array of hashes $systime = 0; # the time now (in seconds) -$version = "1.49"; # the version no of the software +$version = "2.01"; # the version no of the software $starttime = 0; # the starting time of the cluster #@outstanding_connects = (); # list of outstanding connects @listeners = (); # list of listeners $reqreg = 0; # 1 = registration required, 2 = deregister people +$bumpexisting = 1; # 1 = allow new connection to disconnect old, 0 - don't allow it +$allowdxby = 0; # 1 = allow "dx by ", 0 - don't allow it + use vars qw($VERSION $BRANCH $build $branch); -$VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ ); -$BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0; -$main::build += 12; # add an offset to make it bigger than last system -$main::build += $VERSION; -$main::branch += $BRANCH; + +mkver($VERSION = q$Revision$); + +############################################################# +# +# The start of the main line of code +# +############################################################# + +$starttime = $systime = time; +$lang = 'en' unless $lang; + +# open the debug file, set various FHs to be unbuffered +dbginit(\&DXCommandmode::broadcast_debug); +foreach (@debug) { + dbgadd($_); +} +STDOUT->autoflush(1); + +# calculate build number +$build += $main::version; +$build = "$build.$branch" if $branch; + +Log('cluster', "DXSpider V$version, build $build started"); + +# banner +dbg("Copyright (c) 1998-2002 Dirk Koopman G1TLH"); +dbg("DXSpider Version $version, build $build started"); + +# load Prefixes +dbg("loading prefixes ..."); +dbg(USDB::init()); +my $r = Prefix::init(); +confess $r if $r; + +# load band data +dbg("loading band data ..."); +Bands::load(); + +# initialise User file system +dbg("loading user file system ..."); +DXUser->init($userfn, 1); + +# look for the sysop and the alias user and complain if they aren't there +{ + my $ref = DXUser->get($mycall); + die "$mycall missing, run the create_sysop.pl script and please RTFM" unless $ref && $ref->priv == 9; + $ref = DXUser->get($myalias); + die "$myalias missing, run the create_sysop.pl script and please RTFM" unless $ref && $ref->priv == 9; +} + +# start listening for incoming messages/connects +dbg("starting listeners ..."); +my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login); +$conn->conns("Server $clusteraddr/$clusterport using IntMsg"); +push @listeners, $conn; +dbg("Internal port: $clusteraddr $clusterport using IntMsg"); +foreach my $l (@main::listen) { + no strict 'refs'; + my $pkg = $l->[2] || 'ExtMsg'; + my $login = $l->[3] || 'login'; + + $conn = $pkg->new_server($l->[0], $l->[1], \&{"${pkg}::${login}"}); + $conn->conns("Server $l->[0]/$l->[1] using ${pkg}::${login}"); + push @listeners, $conn; + dbg("External Port: $l->[0] $l->[1] using ${pkg}::${login}"); +} + +dbg("AGW Listener") if $AGWMsg::enable; +AGWrestart(); + +dbg("UDP Listener") if $UDPMsg::enable; +UDPMsg::init(\&new_channel); + +# load bad words +dbg("load badwords: " . (BadWords::load or "Ok")); + +# prime some signals +unless ($DB::VERSION) { + $SIG{INT} = $SIG{TERM} = sub { $decease = 1 }; +} + +unless ($is_win) { + $SIG{HUP} = 'IGNORE'; + $SIG{CHLD} = sub { $zombies++ }; + + $SIG{PIPE} = sub { dbg("Broken PIPE signal received"); }; + $SIG{IO} = sub { dbg("SIGIO received"); }; + $SIG{WINCH} = $SIG{STOP} = $SIG{CONT} = 'IGNORE'; + $SIG{KILL} = 'DEFAULT'; # as if it matters.... + + # catch the rest with a hopeful message + for (keys %SIG) { + if (!$SIG{$_}) { + # dbg("Catching SIG $_") if isdbg('chan'); + $SIG{$_} = sub { my $sig = shift; DXDebug::confess("Caught signal $sig"); }; + } + } +} + +# start dupe system +dbg("Starting Dupe system"); +DXDupe::init(); + +# read in system messages +dbg("Read in Messages"); +DXM->init(); + +# read in command aliases +dbg("Read in Aliases"); +CmdAlias->init(); + +# initialise the Geomagnetic data engine +dbg("Start WWV"); +Geomag->init(); +dbg("Start WCY"); +WCY->init(); + +# initial the Spot stuff +dbg("Starting DX Spot system"); +Spot->init(); + +# initialise the protocol engine +dbg("Start Protocol Engines ..."); +DXProt->init(); +Aranea->init(); + +# put in a DXCluster node for us here so we can add users and take them away +$routeroot = Route::Node->new($mycall, int($version*100)+$DXProt::myprot_version, $main::me->here); + +# make sure that there is a routing OUTPUT node default file +#unless (Filter::read_in('route', 'node_default', 0)) { +# my $dxcc = $main::me->dxcc; +# $Route::filterdef->cmd($main::me, 'route', 'accept', "node_default call $mycall" ); +#} + +# read in any existing message headers and clean out old crap +dbg("reading existing message headers ..."); +DXMsg->init(); +DXMsg::clean_old(); + +# read in any cron jobs +dbg("reading cron jobs ..."); +DXCron->init(); + +# read in database descriptors +dbg("reading database descriptors ..."); +DXDb::load(); + +# starting local stuff +dbg("doing local initialisation ..."); +QSL::init(1); +eval { + Local::init(); +}; +dbg("Local::init error $@") if $@; + +# this, such as it is, is the main loop! +dbg("orft we jolly well go ..."); +my $script = new Script "startup"; +$script->run($main::me) if $script; + +#open(DB::OUT, "|tee /tmp/aa"); + +for (;;) { +# $DB::trace = 1; + + Msg->event_loop(10, 0.010); + my $timenow = time; + + DXChannel::process(); + Thingy::process(); + +# $DB::trace = 0; + + # do timed stuff, ongoing processing happens one a second + if ($timenow != $systime) { + rand(); # keep randomising to reduce (but not eliminate) predictability + reap() if $zombies; + $systime = $timenow; + DXCron::process(); # do cron jobs + DXCommandmode::process(); # process ongoing command mode stuff + DXProt::process(); # process ongoing ak1a pcxx stuff + Aranea::process(); + DXConnect::process(); + DXMsg::process(); + DXDb::process(); + DXUser::process(); + DXDupe::process(); + AGWMsg::process(); + + eval { + Local::process(); # do any localised processing + }; + dbg("Local::process error $@") if $@; + } + if ($decease) { + last if --$decease <= 0; + } +} +cease(0); +exit(0); # send a message to call on conn and disconnect @@ -152,18 +376,29 @@ sub new_channel # set up the basic channel info # is there one already connected to me - locally? - my $user = DXUser->get($call); - my $dxchan = DXChannel->get($call); + my $user = DXUser->get_current($call); + my $dxchan = DXChannel::get($call); if ($dxchan) { - my $mess = DXM::msg($lang, ($user && $user->is_node) ? 'concluster' : 'conother', $call, $main::mycall); - already_conn($conn, $call, $mess); - return; + if ($user && $user->is_node) { + already_conn($conn, $call, DXM::msg($lang, 'concluster', $call, $main::mycall)); + return; + } + if ($bumpexisting && $call ne $main::mycall) { + my $ip = $conn->{peerhost} || 'unknown'; + $dxchan->send_now('D', DXM::msg($lang, 'conbump', $call, $ip)); + Log('DXCommand', "$call bumped off by $ip, disconnected"); + dbg("$call bumped off by $ip, disconnected"); + $dxchan->disconnect; + } else { + already_conn($conn, $call, DXM::msg($lang, 'conother', $call, $main::mycall)); + return; + } } # is he locked out ? my $basecall = $call; $basecall =~ s/-\d+$//; - my $baseuser = DXUser->get($basecall); + my $baseuser = DXUser->get_current($basecall); my $lock = $user->lockout if $user; if ($baseuser && $baseuser->lockout || $lock) { if (!$user || !defined $lock || $lock) { @@ -180,11 +415,8 @@ sub new_channel $user = DXUser->new($call); } - # create the channel - if ($user->is_spider) { - $dxchan = QXProt->new($call, $conn, $user); - } elsif ($user->is_node) { + if ($user->is_node) { $dxchan = DXProt->new($call, $conn, $user); } elsif ($user->is_user) { $dxchan = DXCommandmode->new($call, $conn, $user); @@ -199,22 +431,10 @@ sub new_channel # set callbacks $conn->set_error(sub {error_handler($dxchan)}); - $conn->set_rproc(sub {my ($conn,$msg) = @_; rec($dxchan, $conn, $msg);}); - rec($dxchan, $conn, $msg); + $conn->set_rproc(sub {my ($conn,$msg) = @_; $dxchan->rec($msg);}); + $dxchan->rec($msg); } -sub rec -{ - my ($dxchan, $conn, $msg) = @_; - - # queue the message and the channel object for later processing - if (defined $msg) { - my $self = bless {}, "inqueue"; - $self->{dxchan} = $dxchan; - $self->{data} = $msg; - push @inqueue, $self; - } -} sub login { @@ -239,19 +459,22 @@ sub cease dbg("Local::finish error $@") if $@; # disconnect nodes - foreach $dxchan (DXChannel->get_all_nodes) { + foreach $dxchan (grep {$_->is_node || $_->is_aranea} DXChannel::get_all()) { $dxchan->disconnect(2) unless $dxchan == $main::me; } Msg->event_loop(100, 0.01); # disconnect users - foreach $dxchan (DXChannel->get_all_users) { + foreach $dxchan (DXChannel::get_all_users) { $dxchan->disconnect; } # disconnect AGW AGWMsg::finish(); + # disconnect UDP customers + UDPMsg::finish(); + # end everything else Msg->event_loop(100, 0.01); DXUser::finish(); @@ -288,45 +511,6 @@ sub reap # this is where the input queue is dealt with and things are dispatched off to other parts of # the cluster -sub process_inqueue -{ - while (@inqueue) { - my $self = shift @inqueue; - return if !$self; - - my $data = $self->{data}; - my $dxchan = $self->{dxchan}; - my $error; - my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data); - return unless defined $sort; - - # do the really sexy console interface bit! (Who is going to do the TK interface then?) - dbg("<- $sort $call $line\n") if $sort ne 'D' && isdbg('chan'); - if ($self->{disconnecting}) { - dbg('In disconnection, ignored'); - next; - } - - # handle A records - my $user = $dxchan->user; - if ($sort eq 'A' || $sort eq 'O') { - $dxchan->start($line, $sort); - } elsif ($sort eq 'I') { - die "\$user not defined for $call" if !defined $user; - - # normal input - $dxchan->normal($line); - } elsif ($sort eq 'Z') { - $dxchan->disconnect; - } elsif ($sort eq 'D') { - ; # ignored (an echo) - } elsif ($sort eq 'G') { - $dxchan->enhanced($line); - } else { - print STDERR atime, " Unknown command letter ($sort) received from $call\n"; - } - } -} sub uptime { @@ -344,173 +528,3 @@ sub AGWrestart AGWMsg::init(\&new_channel); } -############################################################# -# -# The start of the main line of code -# -############################################################# - -$starttime = $systime = time; -$lang = 'en' unless $lang; - -# open the debug file, set various FHs to be unbuffered -dbginit(\&DXCommandmode::broadcast_debug); -foreach (@debug) { - dbgadd($_); -} -STDOUT->autoflush(1); - -# calculate build number -$build += $main::version; -$build = "$build.$branch" if $branch; - -Log('cluster', "DXSpider V$version, build $build started"); - -# banner -dbg("Copyright (c) 1998-2002 Dirk Koopman G1TLH"); -dbg("DXSpider Version $version, build $build started"); - -# load Prefixes -dbg("loading prefixes ..."); -Prefix::load(); - -# load band data -dbg("loading band data ..."); -Bands::load(); - -# initialise User file system -dbg("loading user file system ..."); -DXUser->init($userfn, 1); - -# start listening for incoming messages/connects -dbg("starting listeners ..."); -my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login); -$conn->conns("Server $clusteraddr/$clusterport"); -push @listeners, $conn; -dbg("Internal port: $clusteraddr $clusterport"); -foreach my $l (@main::listen) { - $conn = ExtMsg->new_server($l->[0], $l->[1], \&login); - $conn->conns("Server $l->[0]/$l->[1]"); - push @listeners, $conn; - dbg("External Port: $l->[0] $l->[1]"); -} -AGWrestart(); - -# load bad words -dbg("load badwords: " . (BadWords::load or "Ok")); - -# prime some signals -unless ($DB::VERSION) { - $SIG{INT} = $SIG{TERM} = sub { $decease = 1 }; -} - -unless ($is_win) { - $SIG{HUP} = 'IGNORE'; - $SIG{CHLD} = sub { $zombies++ }; - - $SIG{PIPE} = sub { dbg("Broken PIPE signal received"); }; - $SIG{IO} = sub { dbg("SIGIO received"); }; - $SIG{WINCH} = $SIG{STOP} = $SIG{CONT} = 'IGNORE'; - $SIG{KILL} = 'DEFAULT'; # as if it matters.... - - # catch the rest with a hopeful message - for (keys %SIG) { - if (!$SIG{$_}) { - # dbg("Catching SIG $_") if isdbg('chan'); - $SIG{$_} = sub { my $sig = shift; DXDebug::confess("Caught signal $sig"); }; - } - } -} - -# start dupe system -DXDupe::init(); - -# read in system messages -DXM->init(); - -# read in command aliases -CmdAlias->init(); - -# initialise the Geomagnetic data engine -Geomag->init(); -WCY->init(); - -# initial the Spot stuff -Spot->init(); - -# initialise the protocol engine -dbg("reading in duplicate spot and WWV info ..."); -DXProt->init(); - -# put in a DXCluster node for us here so we can add users and take them away -$routeroot = Route::Node->new($mycall, $version*100+5300, Route::here($main::me->here)|Route::conf($main::me->conf)); - -# make sure that there is a routing OUTPUT node default file -#unless (Filter::read_in('route', 'node_default', 0)) { -# my $dxcc = $main::me->dxcc; -# $Route::filterdef->cmd($main::me, 'route', 'accept', "node_default call $mycall" ); -#} - -# read in any existing message headers and clean out old crap -dbg("reading existing message headers ..."); -DXMsg->init(); -DXMsg::clean_old(); - -# read in any cron jobs -dbg("reading cron jobs ..."); -DXCron->init(); - -# read in database descriptors -dbg("reading database descriptors ..."); -DXDb::load(); - -# starting local stuff -dbg("doing local initialisation ..."); -eval { - Local::init(); -}; -dbg("Local::init error $@") if $@; - -# this, such as it is, is the main loop! -dbg("orft we jolly well go ..."); -my $script = new Script "startup"; -$script->run($main::me) if $script; - -#open(DB::OUT, "|tee /tmp/aa"); - -for (;;) { -# $DB::trace = 1; - - Msg->event_loop(10, 0.010); - my $timenow = time; - process_inqueue(); # read in lines from the input queue and despatch them -# $DB::trace = 0; - - # do timed stuff, ongoing processing happens one a second - if ($timenow != $systime) { - reap if $zombies; - $systime = $timenow; - DXCron::process(); # do cron jobs - DXCommandmode::process(); # process ongoing command mode stuff - DXProt::process(); # process ongoing ak1a pcxx stuff - QXProt::process(); - DXConnect::process(); - DXMsg::process(); - DXDb::process(); - DXUser::process(); - DXDupe::process(); - AGWMsg::process(); - - eval { - Local::process(); # do any localised processing - }; - dbg("Local::process error $@") if $@; - } - if ($decease) { - last if --$decease <= 0; - } -} -cease(0); -exit(0); - -