+# send a tranche of lines to the other end
+sub send_tranche
+{
+ my ($self, $dxchan) = @_;
+ my @out;
+ my $to = $self->{tonode};
+ my $from = $self->{fromnode};
+ my $stream = $self->{stream};
+ my $i;
+
+ for ($i = 0; $i < $self->{linesreq} && $self->{count} < @{$self->{lines}}; $i++, $self->{count}++) {
+ push @out, DXProt::pc29($to, $from, $stream, ${$self->{lines}}[$self->{count}]);
+ }
+ push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
+ $dxchan->send(@out);
+}
+
+
+# find a message to send out and start the ball rolling
+sub queue_msg
+{
+ my $sort = shift;
+ my @nodelist = DXProt::get_all_ak1a();
+ my $ref;
+ my $clref;
+ my $dxchan;
+
+ # bat down the message list looking for one that needs to go off site and whose
+ # nearest node is not busy.
+
+ dbg('msg', "queue msg ($sort)\n");
+ foreach $ref (@msg) {
+ # firstly, is it private and unread? if so can I find the recipient
+ # in my cluster node list offsite?
+ if ($ref->{private}) {
+ if ($ref->{read} == 0) {
+ $clref = DXCluster->get($ref->{to});
+ if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
+ $dxchan = $clref->{dxchan};
+ $ref->start_msg($dxchan) if $clref && !get_busy($dxchan->call);
+ }
+ }
+ } elsif ($sort == undef) {
+ # otherwise we are dealing with a bulletin, compare the gotit list with
+ # the nodelist up above, if there are sites that haven't got it yet
+ # then start sending it - what happens when we get loops is anyone's
+ # guess, use (to, from, time, subject) tuple?
+ my $noderef;
+ foreach $noderef (@nodelist) {
+ next if $noderef->call eq $main::mycall;
+ next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
+
+ # if we are here we have a node that doesn't have this message
+ $ref->start_msg($noderef) if !get_busy($noderef->call);
+ last;
+ }
+ }
+
+ # if all the available nodes are busy then stop
+ last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
+ }
+}
+
+# start the message off on its travels with a PC28
+sub start_msg
+{
+ my ($self, $dxchan) = @_;
+
+ dbg('msg', "start msg $self->{msgno}\n");
+ $self->{linesreq} = 5;
+ $self->{count} = 0;
+ $self->{tonode} = $dxchan->call;
+ $self->{fromnode} = $main::mycall;
+ $busy{$dxchan->call} = $self;
+ $work{"$self->{tonode}"} = $self;
+ $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
+}
+
+# get the ref of a busy node
+sub get_busy
+{
+ my $call = shift;
+ return $busy{$call};
+}
+
+# get the busy queue
+sub get_all_busy
+{
+ return values %busy;
+}
+
+# get the forwarding queue
+sub get_fwq
+{
+ return values %work;
+}
+
+# stop a message from continuing, clean it out, unlock interlocks etc
+sub stop_msg
+{
+ my ($self, $dxchan) = @_;
+ my $node = $dxchan->call;
+
+ dbg('msg', "stop msg $self->{msgno} stream $self->{stream}\n");
+ delete $work{$node};
+ delete $work{"$node$self->{stream}"};
+ $self->workclean;
+ delete $busy{$node};
+}
+