fixed problem where two consoles with same call stopped the cluster
[spider.git] / perl / DXMsg.pm
index 146392f44f970a83eb269167c1590a29b71cb414..2abcb55f95b2967bf21f4d26d241c7f3c5530992 100644 (file)
@@ -32,7 +32,8 @@ use Carp;
 
 use strict;
 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
-                       @badmsg $badmsgfn $forwardfn @forward $timeout $waittime);
+                       @badmsg $badmsgfn $forwardfn @forward $timeout $waittime
+                   $queueinterval $lastq);
 
 %work = ();                                            # outstanding jobs
 @msg = ();                                             # messages we have
@@ -43,33 +44,36 @@ $last_clean = 0;                            # last time we did a clean
 @forward = ();                  # msg forward table
 $timeout = 30*60;               # forwarding timeout
 $waittime = 60*60;              # time an aborted outgoing message waits before trying again
+$queueinterval = 5*60;          # run the queue every 5 minutes
+$lastq = 0;
+
 
 $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
 
 %valid = (
-                 fromnode => '9,From Node',
-                 tonode => '9,To Node',
+                 fromnode => '5,From Node',
+                 tonode => '5,To Node',
                  to => '0,To',
                  from => '0,From',
                  t => '0,Msg Time,cldatetime',
-                 private => '9,Private',
+                 private => '5,Private',
                  subject => '0,Subject',
                  linesreq => '0,Lines per Gob',
-                 rrreq => '9,Read Confirm',
+                 rrreq => '5,Read Confirm',
                  origin => '0,Origin',
                  lines => '5,Data',
                  stream => '9,Stream No',
-                 count => '9,Gob Linecnt',
-                 file => '9,File?,yesno',
-                 gotit => '9,Got it Nodes,parray',
-                 lines => '9,Lines,parray',
-                 'read' => '9,Times read',
+                 count => '5,Gob Linecnt',
+                 file => '5,File?,yesno',
+                 gotit => '5,Got it Nodes,parray',
+                 lines => '5,Lines,parray',
+                 'read' => '5,Times read',
                  size => '0,Size',
                  msgno => '0,Msgno',
                  keep => '0,Keep this?,yesno',
-                 lastt => '9,Last processed,cldatetime',
-                 waitt => '9,Wait until,cldatetime',
+                 lastt => '5,Last processed,cldatetime',
+                 waitt => '5,Wait until,cldatetime',
                 );
 
 sub DESTROY
@@ -130,14 +134,21 @@ sub process
                for (keys %busy) {
                        my $node = $_;
                        my $ref = $busy{$_};
-                       if ($main::systime > $ref->{lastt} + $timeout) {
+                       if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) {
+                               dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
                                $ref->stop_msg($node);
 
                                # delay any outgoing messages that fail
-                               $ref->{waitt} = $main::systime + $waittime if $node ne $main::mycall;
+                               $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
                        }
                }
-               
+
+               # queue some message if the interval timer has gone off
+               if ($main::systime > $lastq + $queueinterval) {
+                       queue_msg(0);
+                       $lastq = $main::systime;
+               }
+
                # clean the message queue
                clean_old() if $main::systime - $last_clean > 3600 ;
                return;
@@ -155,6 +166,7 @@ sub process
                        if (exists $busy{$f[2]}) {
                                my $ref = $busy{$f[2]};
                                my $tonode = $ref->{tonode};
+                               dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
                                $ref->stop_msg($self->call);
                        }
 
@@ -173,6 +185,7 @@ sub process
                        $work{"$f[2]$stream"} = $ref; # store in work
                        $busy{$f[2]} = $ref; # set interlock
                        $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
+                       $ref->{lastt} = $main::systime;
                        last SWITCH;
                }
                
@@ -187,6 +200,9 @@ sub process
                                        $ref->{count} = 0;
                                }
                                $ref->{lastt} = $main::systime;
+                       } else {
+                               dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
+                               $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        }
                        last SWITCH;
                }
@@ -206,6 +222,7 @@ sub process
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
                        } else {
+                               dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
                        last SWITCH;
@@ -218,6 +235,7 @@ sub process
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
                        } else {
+                               dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
                        last SWITCH;
@@ -264,16 +282,16 @@ sub process
                                                $ref->store($ref->{lines});
                                                add_dir($ref);
                                                my $dxchan = DXChannel->get($ref->{to});
-                                               $dxchan->send($dxchan->msg('m9')) if $dxchan;
+                                               $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
                                                Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
                                        }
                                }
                                $ref->stop_msg($self->call);
-                               queue_msg(0);
                        } else {
+                               dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        }
-                       queue_msg(0);
+                       queue_msg(0);
                        last SWITCH;
                }
                
@@ -290,8 +308,11 @@ sub process
                                }
                                $ref->stop_msg($self->call);
                        } else {
+                               dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
+
+                       # send next one if present
                        queue_msg(0);
                        last SWITCH;
                }
@@ -830,7 +851,6 @@ sub do_send_stuff
                        delete $self->{loc};
                        $self->func(undef);
                        
-                       DXMsg::queue_msg(0);
                        $self->state('prompt');
                } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
                        #push @out, $self->msg('sendabort');