X-Git-Url: http://gb7djk.dxcluster.net/gitweb/gitweb.cgi?a=blobdiff_plain;f=perl%2FDXMsg.pm;h=f3c2172cfba7c45c70e7fa5d3c12353853dd2b92;hb=48d614dae214326305879ac572d8c5f0a6150f99;hp=501d137d1454ba49bdc971312cffbfd39c1bc0c8;hpb=6d56cd879560a16b952a938fc47eee420511e628;p=spider.git diff --git a/perl/DXMsg.pm b/perl/DXMsg.pm index 501d137d..f3c2172c 100644 --- a/perl/DXMsg.pm +++ b/perl/DXMsg.pm @@ -15,13 +15,10 @@ package DXMsg; -@ISA = qw(DXProt DXChannel); - use DXUtil; use DXChannel; use DXUser; use DXM; -use DXCluster; use DXProtVars; use DXProtout; use DXDebug; @@ -32,7 +29,7 @@ use Fcntl; use strict; use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime - $queueinterval $lastq $importfn $minchunk $maxchunk); + $queueinterval $lastq $importfn $minchunk $maxchunk $bulltopriv); %work = (); # outstanding jobs @msg = (); # messages we have @@ -50,6 +47,8 @@ $lastq = 0; $minchunk = 4800; # minimum chunk size for a split message $maxchunk = 6000; # maximum chunk size +$bulltopriv = 1; # convert msgs with callsigns to private if they are bulls + $badmsgfn = "$msgdir/badmsg.pl"; # list of TO address we wont store $forwardfn = "$msgdir/forward.pl"; # the forwarding table @@ -102,8 +101,9 @@ sub alloc $self->{'read'} = shift; $self->{rrreq} = shift; $self->{gotit} = []; - $self->{lastt} = $main::systime; +# $self->{lastt} = $main::systime; $self->{lines} = []; + $self->{private} = 1 if $bulltopriv && DXUser->get_current($self->{to}); return $self; } @@ -131,20 +131,6 @@ sub process if ($main::systime >= $lastq + $queueinterval) { - # wander down the work queue stopping any messages that have timed out - for (keys %busy) { - my $node = $_; - my $ref = $busy{$_}; - if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { - dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); - Log('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); - $ref->stop_msg($node); - - # delay any outgoing messages that fail - $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; - } - } - # queue some message if the interval timer has gone off queue_msg(0); @@ -165,40 +151,51 @@ sub process SWITCH: { if ($pcno == 28) { # incoming message + # sort out various extant protocol errors that occur + my ($fromnode, $origin); + if ($self->is_arcluster && $f[13] eq $self->call) { + $fromnode = $f[13]; + $origin = $f[2]; + } else { + $fromnode = $f[2]; + $origin = $f[13]; + } + $origin = $self->call unless $origin && $origin gt ' '; + # first look for any messages in the busy queue # and cancel them this should both resolve timed out incoming messages # and crossing of message between nodes, incoming messages have priority - if (exists $busy{$f[2]}) { - my $ref = $busy{$f[2]}; + + if (exists $busy{$fromnode}) { + my $ref = $busy{$fromnode}; my $tonode = $ref->{tonode}; - dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]"); + dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$tonode") if isdbg('msg'); $ref->stop_msg($self->call); } my $t = cltounix($f[5], $f[6]); - my $stream = next_transno($f[2]); - $f[13] = $self->call unless $f[13] && $f[13] gt ' '; - my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]); + my $stream = next_transno($fromnode); + my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]); # fill in various forwarding state variables - $ref->{fromnode} = $f[2]; + $ref->{fromnode} = $fromnode; $ref->{tonode} = $f[1]; $ref->{rrreq} = $f[11]; $ref->{linesreq} = $f[10]; $ref->{stream} = $stream; $ref->{count} = 0; # no of lines between PC31s - dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n"); + dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg'); Log('msg', "Incoming message $f[4] to $f[3] '$f[8]'" ); - $work{"$f[2]$stream"} = $ref; # store in work - $busy{$f[2]} = $ref; # set interlock - $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack + $work{"$fromnode$stream"} = $ref; # store in work + $busy{$fromnode} = $ref; # set interlock + $self->send(DXProt::pc30($fromnode, $f[1], $stream)); # send ack $ref->{lastt} = $main::systime; # look to see whether this is a non private message sent to a known callsign my $uref = DXUser->get_current($ref->{to}); if (is_callsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) { $ref->{private} = 1; - dbg('msg', "set bull to $ref->{to} to private"); + dbg("set bull to $ref->{to} to private") if isdbg('msg'); } last SWITCH; } @@ -211,12 +208,12 @@ sub process $ref->{count}++; if ($ref->{count} >= $ref->{linesreq}) { $self->send(DXProt::pc31($f[2], $f[1], $f[3])); - dbg('msg', "stream $f[3]: $ref->{count} lines received\n"); + dbg("stream $f[3]: $ref->{count} lines received\n") if isdbg('msg'); $ref->{count} = 0; } $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" ); + dbg("PC29 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -230,13 +227,13 @@ sub process $ref->{count} = 0; $ref->{linesreq} = 5; $work{"$f[2]$f[3]"} = $ref; # new ref - dbg('msg', "incoming subject ack stream $f[3]\n"); + dbg("incoming subject ack stream $f[3]\n") if isdbg('msg'); $busy{$f[2]} = $ref; # interlock push @{$ref->{lines}}, ($ref->read_msg_body); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" ); + dbg("PC30 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; @@ -245,18 +242,18 @@ sub process if ($pcno == 31) { # acknowledge a tranche of lines my $ref = $work{"$f[2]$f[3]"}; if ($ref) { - dbg('msg', "tranche ack stream $f[3]\n"); + dbg("tranche ack stream $f[3]\n") if isdbg('msg'); $ref->send_tranche($self); $ref->{lastt} = $main::systime; } else { - dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" ); + dbg("PC31 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } last SWITCH; } if ($pcno == 32) { # incoming EOM - dbg('msg', "stream $f[3]: EOM received\n"); + dbg("stream $f[3]: EOM received\n") if isdbg('msg'); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it @@ -277,7 +274,7 @@ sub process if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) { $ref->stop_msg($self->call); my $msgno = $m->{msgno}; - dbg('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); + dbg("duplicate message from $ref->{from} -> $ref->{to} to $msgno") if isdbg('msg'); Log('msg', "duplicate message from $ref->{from} -> $ref->{to} to $msgno"); return; } @@ -287,10 +284,9 @@ sub process $ref->swop_it($self->call); # look for 'bad' to addresses -# if (grep $ref->{to} eq $_, @badmsg) { if ($ref->dump_it($self->call)) { $ref->stop_msg($self->call); - dbg('msg', "'Bad' message $ref->{to}"); + dbg("'Bad' message $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' message $ref->{to}"); return; } @@ -306,7 +302,7 @@ sub process } $ref->stop_msg($self->call); } else { - dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" ); + dbg("PC32 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } # queue_msg(0); @@ -326,7 +322,7 @@ sub process } $ref->stop_msg($self->call); } else { - dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" ); + dbg("PC33 from unknown stream $f[3] from $f[2]") if isdbg('msg'); $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream } @@ -340,7 +336,7 @@ sub process $f[3] =~ s/\.//og; # remove dots $f[3] =~ s/^\///o; # remove the leading / $f[3] = lc $f[3]; # to lower case; - dbg('msg', "incoming file $f[3]\n"); + dbg("incoming file $f[3]\n") if isdbg('msg'); $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o; # create any directories @@ -352,7 +348,7 @@ sub process $fn .= "/$part"; next if -e $fn; last SWITCH if !mkdir $fn, 0777; - dbg('msg', "created directory $fn\n"); + dbg("created directory $fn\n") if isdbg('msg'); } my $stream = next_transno($f[2]); my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0'); @@ -372,7 +368,7 @@ sub process } if ($pcno == 42) { # abort transfer - dbg('msg', "stream $f[3]: abort received\n"); + dbg("stream $f[3]: abort received\n") if isdbg('msg'); my $ref = $work{"$f[2]$f[3]"}; if ($ref) { $ref->stop_msg($self->call); @@ -403,7 +399,7 @@ sub store my $lines = shift; if ($ref->{file}) { # a file - dbg('msg', "To be stored in $ref->{to}\n"); + dbg("To be stored in $ref->{to}\n") if isdbg('msg'); my $fh = new IO::File "$ref->{to}", "w"; if (defined $fh) { @@ -412,7 +408,7 @@ sub store print $fh "$line\n"; } $fh->close; - dbg('msg', "file $ref->{to} stored\n"); + dbg("file $ref->{to} stored\n") if isdbg('msg'); Log('msg', "file $ref->{to} from $ref->{from} stored" ); } else { confess "can't open file $ref->{to} $!"; @@ -422,7 +418,7 @@ sub store # attempt to open the message file my $fn = filename($ref->{msgno}); - dbg('msg', "To be stored in $fn\n"); + dbg("To be stored in $fn\n") if isdbg('msg'); # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem) my $fh = new IO::File "$fn", "w"; @@ -438,7 +434,7 @@ sub store print $fh "$line\n"; } $fh->close; - dbg('msg', "msg $ref->{msgno} stored\n"); + dbg("msg $ref->{msgno} stored\n") if isdbg('msg'); Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" ); } else { confess "can't open msg file $fn $!"; @@ -452,13 +448,13 @@ sub del_msg my $self = shift; # remove it from the active message list - dbg('msg', "\@msg = " . scalar @msg . " before delete"); + dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); @msg = grep { $_ != $self } @msg; # remove the file unlink filename($self->{msgno}); - dbg('msg', "deleting $self->{msgno}\n"); - dbg('msg', "\@msg = " . scalar @msg . " after delete"); + dbg("deleting $self->{msgno}\n") if isdbg('msg'); + dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); } # clean out old messages from the message queue @@ -467,18 +463,18 @@ sub clean_old my $ref; # mark old messages for deletion - dbg('msg', "\@msg = " . scalar @msg . " before delete"); + dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg'); foreach $ref (@msg) { if (ref($ref) && !$ref->{keep} && $ref->{t} < $main::systime - $maxage) { $ref->{deleteme} = 1; unlink filename($ref->{msgno}); - dbg('msg', "deleting old $ref->{msgno}\n"); + dbg("deleting old $ref->{msgno}\n") if isdbg('msg'); } } # remove them all from the active message list @msg = grep { !$_->{deleteme} } @msg; - dbg('msg', "\@msg = " . scalar @msg . " after delete"); + dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg'); $last_clean = $main::systime; } @@ -494,21 +490,21 @@ sub read_msg_header $file = new IO::File "$fn"; if (!$file) { - dbg('err', "Error reading $fn $!"); + dbg("Error reading $fn $!"); Log('err', "Error reading $fn $!"); return undef; } $size = -s $fn; $line = <$file>; # first line if ($size == 0 || !$line) { - dbg('err', "Empty $fn $!"); + dbg("Empty $fn $!"); Log('err', "Empty $fn $!"); return undef; } chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - dbg('err', "corrupt first line in $fn ($line)"); + dbg("corrupt first line in $fn ($line)"); Log('err', "corrupt first line in $fn ($line)"); return undef; } @@ -520,7 +516,7 @@ sub read_msg_header chomp $line; $size -= length $line; if (! $line =~ /^===/o) { - dbg('err', "corrupt second line in $fn ($line)"); + dbg("corrupt second line in $fn ($line)"); Log('err', "corrupt second line in $fn ($line)"); return undef; } @@ -547,7 +543,7 @@ sub read_msg_body $file = new IO::File; if (!open($file, $fn)) { - dbg('err' ,"Error reading $fn $!"); + dbg("Error reading $fn $!"); Log('err' ,"Error reading $fn $!"); return undef; } @@ -591,11 +587,9 @@ sub queue_msg # bat down the message list looking for one that needs to go off site and whose # nearest node is not busy. - dbg('msg', "queue msg ($sort)\n"); + dbg("queue msg ($sort)\n") if isdbg('msg'); my @nodelist = DXChannel::get_all_nodes; foreach $ref (@msg) { - # firstly, is it private and unread? if so can I find the recipient - # in my cluster node list offsite? # ignore 'delayed' messages until their waiting time has expired if (exists $ref->{waitt}) { @@ -603,20 +597,42 @@ sub queue_msg delete $ref->{waitt}; } + # any time outs? + if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) { + my $node = $ref->{tonode}; + dbg("Timeout, stopping msgno: $ref->{msgno} -> $node") if isdbg('msg'); + Log('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node"); + $ref->stop_msg($node); + + # delay any outgoing messages that fail + $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall; + delete $ref->{lastt}; + next; + } + + # firstly, is it private and unread? if so can I find the recipient + # in my cluster node list offsite? + # deal with routed private messages my $dxchan; if ($ref->{private}) { next if $ref->{'read'}; # if it is read, it is stuck here - $clref = DXCluster->get_exact($ref->{to}); - unless ($clref) { # otherwise look for a homenode - my $uref = DXUser->get_current($ref->{to}); - my $hnode = $uref->homenode if $uref; - $clref = DXCluster->get_exact($hnode) if $hnode; - } - if ($clref && !grep { $clref->dxchan == $_ } DXCommandmode::get_all()) { - next if $clref->call eq $main::mycall; # i.e. it lives here + $clref = Route::get($ref->{to}); +# unless ($clref) { # otherwise look for a homenode +# my $uref = DXUser->get_current($ref->{to}); +# my $hnode = $uref->homenode if $uref; +# $clref = Route::Node::get($hnode) if $hnode; +# } + if ($clref) { $dxchan = $clref->dxchan; - $ref->start_msg($dxchan) if $dxchan && !get_busy($dxchan->call) && $dxchan->state eq 'normal'; + if ($dxchan) { + if ($dxchan->is_node) { + next if $clref->call eq $main::mycall; # i.e. it lives here + $ref->start_msg($dxchan) if !get_busy($dxchan->call) && $dxchan->state eq 'normal'; + } + } else { + dbg("Route: No dxchan for $ref->{to} " . ref($clref) ) if isdbg('msg'); + } } } @@ -662,7 +678,7 @@ sub start_msg { my ($self, $dxchan) = @_; - dbg('msg', "start msg $self->{msgno}\n"); + dbg("start msg $self->{msgno}\n") if isdbg('msg'); $self->{linesreq} = 10; $self->{count} = 0; $self->{tonode} = $dxchan->call; @@ -700,7 +716,7 @@ sub stop_msg my $stream = $self->{stream} if exists $self->{stream}; - dbg('msg', "stop msg $self->{msgno} -> node $node\n"); + dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg'); delete $work{$node}; delete $work{"$node$stream"} if $stream; $self->workclean; @@ -718,12 +734,12 @@ sub next_transno my $fh = new IO::File; if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) { $fh->autoflush(1); - $msgno = $fh->getline; + $msgno = $fh->getline || '0'; chomp $msgno; $msgno++; seek $fh, 0, 0; $fh->print("$msgno\n"); - dbg('msg', "msgno $msgno allocated for $name\n"); + dbg("msgno $msgno allocated for $name\n") if isdbg('msg'); $fh->close; } else { confess "can't open $fn $!"; @@ -739,9 +755,9 @@ sub init my $ref; # load various control files - dbg('err', "load badmsg: " . (load_badmsg() or "Ok")); - dbg('err', "load forward: " . (load_forward() or "Ok")); - dbg('err', "load swop: " . (load_swop() or "Ok")); + dbg("load badmsg: " . (load_badmsg() or "Ok")); + dbg("load forward: " . (load_forward() or "Ok")); + dbg("load swop: " . (load_swop() or "Ok")); # read in the directory opendir($dir, $msgdir) or confess "can't open $msgdir $!"; @@ -754,15 +770,15 @@ sub init $ref = read_msg_header("$msgdir/$_"); unless ($ref) { - dbg('err', "Deleting $_"); + dbg("Deleting $_"); Log('err', "Deleting $_"); unlink "$msgdir/$_"; next; } # delete any messages to 'badmsg.pl' places - if (grep $ref->{to} eq $_, @badmsg) { - dbg('msg', "'Bad' TO address $ref->{to}"); + if ($ref->dump_it('')) { + dbg("'Bad' TO address $ref->{to}") if isdbg('msg'); Log('msg', "'Bad' TO address $ref->{to}"); $ref->del_msg; next; @@ -992,6 +1008,7 @@ sub dump_it $tested = $ref->{from} if $field eq 'F'; $tested = $ref->{origin} if $field eq 'O'; $tested = $ref->{subject} if $field eq 'S'; + $tested = $call if $field eq 'I'; if (!$pattern || $tested =~ m{$pattern}i) { return 1; @@ -1055,7 +1072,7 @@ sub import_msgs # are there any to do in this directory? return unless -d $importfn; unless (opendir(DIR, $importfn)) { - dbg('msg', "can\'t open $importfn $!"); + dbg("can\'t open $importfn $!") if isdbg('msg'); Log('msg', "can\'t open $importfn $!"); return; } @@ -1069,7 +1086,7 @@ sub import_msgs my $fn = "$importfn/$name"; next unless -f $fn; unless (open(MSG, $fn)) { - dbg('msg', "can\'t open import file $fn $!"); + dbg("can\'t open import file $fn $!") if isdbg('msg'); Log('msg', "can\'t open import file $fn $!"); unlink($fn); next; @@ -1102,7 +1119,7 @@ sub import_one my @f = split /\s+/, $line; unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) { my $m = "invalid first line in import '$line'"; - dbg('MSG', $m ); + dbg($m) if isdbg('msg'); return (1, $m); } while (@f) {