fixed problem where two consoles with same call stopped the cluster
[spider.git] / perl / DXMsg.pm
index bcb4dc006fc1482f7f2929f35c69f37be55a318a..2abcb55f95b2967bf21f4d26d241c7f3c5530992 100644 (file)
@@ -52,28 +52,28 @@ $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
 
 %valid = (
-                 fromnode => '9,From Node',
-                 tonode => '9,To Node',
+                 fromnode => '5,From Node',
+                 tonode => '5,To Node',
                  to => '0,To',
                  from => '0,From',
                  t => '0,Msg Time,cldatetime',
-                 private => '9,Private',
+                 private => '5,Private',
                  subject => '0,Subject',
                  linesreq => '0,Lines per Gob',
-                 rrreq => '9,Read Confirm',
+                 rrreq => '5,Read Confirm',
                  origin => '0,Origin',
                  lines => '5,Data',
                  stream => '9,Stream No',
-                 count => '9,Gob Linecnt',
-                 file => '9,File?,yesno',
-                 gotit => '9,Got it Nodes,parray',
-                 lines => '9,Lines,parray',
-                 'read' => '9,Times read',
+                 count => '5,Gob Linecnt',
+                 file => '5,File?,yesno',
+                 gotit => '5,Got it Nodes,parray',
+                 lines => '5,Lines,parray',
+                 'read' => '5,Times read',
                  size => '0,Size',
                  msgno => '0,Msgno',
                  keep => '0,Keep this?,yesno',
-                 lastt => '9,Last processed,cldatetime',
-                 waitt => '9,Wait until,cldatetime',
+                 lastt => '5,Last processed,cldatetime',
+                 waitt => '5,Wait until,cldatetime',
                 );
 
 sub DESTROY
@@ -135,6 +135,7 @@ sub process
                        my $node = $_;
                        my $ref = $busy{$_};
                        if (exists $ref->{lastt} && $main::systime > $ref->{lastt} + $timeout) {
+                               dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
                                $ref->stop_msg($node);
 
                                # delay any outgoing messages that fail
@@ -165,6 +166,7 @@ sub process
                        if (exists $busy{$f[2]}) {
                                my $ref = $busy{$f[2]};
                                my $tonode = $ref->{tonode};
+                               dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
                                $ref->stop_msg($self->call);
                        }
 
@@ -198,6 +200,9 @@ sub process
                                        $ref->{count} = 0;
                                }
                                $ref->{lastt} = $main::systime;
+                       } else {
+                               dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
+                               $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        }
                        last SWITCH;
                }
@@ -217,6 +222,7 @@ sub process
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
                        } else {
+                               dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
                        last SWITCH;
@@ -229,6 +235,7 @@ sub process
                                $ref->send_tranche($self);
                                $ref->{lastt} = $main::systime;
                        } else {
+                               dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
                        last SWITCH;
@@ -275,12 +282,13 @@ sub process
                                                $ref->store($ref->{lines});
                                                add_dir($ref);
                                                my $dxchan = DXChannel->get($ref->{to});
-                                               $dxchan->send($dxchan->msg('m9')) if $dxchan;
+                                               $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
                                                Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
                                        }
                                }
                                $ref->stop_msg($self->call);
                        } else {
+                               dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        }
                        # queue_msg(0);
@@ -300,8 +308,12 @@ sub process
                                }
                                $ref->stop_msg($self->call);
                        } else {
+                               dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
                                $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
                        } 
+
+                       # send next one if present
+                       queue_msg(0);
                        last SWITCH;
                }