+# delete a message
+sub del_msg
+{
+ my $self = shift;
+
+ # remove it from the active message list
+ @msg = map { $_ != $self ? $_ : () } @msg;
+
+ # belt and braces (one day I will ask someone if this is REALLY necessary)
+ delete $self->{gotit};
+ delete $self->{list};
+
+ # remove the file
+ unlink filename($self->{msgno});
+ dbg('msg', "deleting $self->{msgno}\n");
+}
+
+# read in a message header
+sub read_msg_header
+{
+ my $fn = shift;
+ my $file;
+ my $line;
+ my $ref;
+ my @f;
+ my $size;
+
+ $file = new FileHandle;
+ if (!open($file, $fn)) {
+ print "Error reading $fn $!\n";
+ return undef;
+ }
+ $size = -s $fn;
+ $line = <$file>; # first line
+ chomp $line;
+ $size -= length $line;
+ if (! $line =~ /^===/o) {
+ print "corrupt first line in $fn ($line)\n";
+ return undef;
+ }
+ $line =~ s/^=== //o;
+ @f = split /\^/, $line;
+ $ref = DXMsg->alloc(@f);
+
+ $line = <$file>; # second line
+ chomp $line;
+ $size -= length $line;
+ if (! $line =~ /^===/o) {
+ print "corrupt second line in $fn ($line)\n";
+ return undef;
+ }
+ $line =~ s/^=== //o;
+ $ref->{gotit} = [];
+ @f = split /\^/, $line;
+ push @{$ref->{gotit}}, @f;
+ $ref->{size} = $size;
+
+ close($file);
+
+ return $ref;
+}
+
+# read in a message header
+sub read_msg_body
+{
+ my $self = shift;
+ my $msgno = $self->{msgno};
+ my $file;
+ my $line;
+ my $fn = filename($msgno);
+ my @out;
+
+ $file = new FileHandle;
+ if (!open($file, $fn)) {
+ print "Error reading $fn $!\n";
+ return undef;
+ }
+ chomp (@out = <$file>);
+ close($file);
+
+ shift @out if $out[0] =~ /^=== /;
+ shift @out if $out[0] =~ /^=== /;
+ return @out;
+}
+
+# send a tranche of lines to the other end
+sub send_tranche
+{
+ my ($self, $dxchan) = @_;
+ my @out;
+ my $to = $self->{tonode};
+ my $from = $self->{fromnode};
+ my $stream = $self->{stream};
+ my $i;
+
+ for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) {
+ push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]);
+ }
+ push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
+ $dxchan->send(@out);
+}
+
+
+# find a message to send out and start the ball rolling
+sub queue_msg
+{
+ my $sort = shift;
+ my @nodelist = DXProt::get_all_ak1a();
+ my $ref;
+ my $clref;
+ my $dxchan;
+
+ # bat down the message list looking for one that needs to go off site and whose
+ # nearest node is not busy.
+
+ dbg('msg', "queue msg ($sort)\n");
+ foreach $ref (@msg) {
+ # firstly, is it private and unread? if so can I find the recipient
+ # in my cluster node list offsite?
+ if ($ref->{private}) {
+ if ($ref->{read} == 0) {
+ $clref = DXCluster->get($ref->{to});
+ if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
+ $dxchan = $clref->{dxchan};
+ $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call);
+ }
+ }
+ } elsif ($sort == undef) {
+ # otherwise we are dealing with a bulletin, compare the gotit list with
+ # the nodelist up above, if there are sites that haven't got it yet
+ # then start sending it - what happens when we get loops is anyone's
+ # guess, use (to, from, time, subject) tuple?
+ my $noderef;
+ foreach $noderef (@nodelist) {
+ next if $noderef->call eq $main::mycall;
+ next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
+
+ # if we are here we have a node that doesn't have this message
+ $ref->start_msg($noderef) if !get_busy($noderef->call);
+ last;
+ }
+ }
+
+ # if all the available nodes are busy then stop
+ last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
+ }
+}
+
+# start the message off on its travels with a PC28
+sub start_msg
+{
+ my ($self, $dxchan) = @_;
+
+ dbg('msg', "start msg $self->{msgno}\n");
+ $self->{linesreq} = 5;
+ $self->{count} = 0;
+ $self->{tonode} = $dxchan->call;
+ $self->{fromnode} = $main::mycall;
+ $busy{$dxchan->call} = $self;
+ $work{"$self->{tonode}"} = $self;
+ $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
+}
+
+# get the ref of a busy node
+sub get_busy
+{
+ my $call = shift;
+ return $busy{$call};
+}
+
+# get the busy queue
+sub get_all_busy
+{
+ return values %busy;
+}
+
+# get the forwarding queue
+sub get_fwq
+{
+ return values %work;
+}
+
+# stop a message from continuing, clean it out, unlock interlocks etc
+sub stop_msg
+{
+ my ($self, $dxchan) = @_;
+ my $node = $dxchan->call;
+
+ dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
+ delete $work{$node};
+ delete $work{"$node$self->{stream}"};
+ $self->workclean;
+ delete $busy{$node};
+}
+