8e981fc2f908690db5f6cb539c0fa06fe07b3c7f
[spider.git] / perl / RBN.pm
1 #
2 # The RBN connection system
3 #
4 # Copyright (c) 2020 Dirk Koopman G1TLH
5 #
6
7 use warnings;
8 use strict;
9
10 package RBN;
11
12 use 5.10.1;
13
14 use lib qw {.};
15
16 use DXDebug;
17 use DXUtil;
18 use DXLog;
19 use DXUser;
20 use DXChannel;
21 use Math::Round qw(nearest nearest_floor);
22 use Date::Parse;
23 use Time::HiRes qw(gettimeofday);
24 use Spot;
25 use DXJSON;
26 use IO::File;
27
28 use constant {
29                           ROrigin => 0,
30                           RQrg => 1,
31                           RCall => 2,
32                           RMode => 3,
33                           RStrength => 4,
34                           RTime => 5,
35                           RUtz => 6,
36                           Respot => 7,
37                           RQra => 8,
38                           RSpotData => 9,
39                          };
40
41 use constant {
42                           SQrg => 0,
43                           SCall => 1,
44                           STime => 2,
45                           SComment => 3,
46                           SOrigin => 4,
47                           SZone => 11,
48                          };
49 use constant {
50                           OQual => 0,
51                           OAvediff => 1,
52                           OSpare => 2,
53                           ODiff => 3,
54                          };
55 use constant {
56                           CTime => 0,
57                           CQual => 1,
58                           CData => 2,
59                          };
60
61 use constant {
62                           DScore => 0,
63                           DGood => 1,
64                           DBad => 2,
65                           DLastin => 3,
66                           DEviants => 4,
67                          };
68
69
70 our $DATA_VERSION = 1;
71
72 our @ISA = qw(DXChannel);
73
74 our $startup_delay = 5*60;              # don't send anything out until this timer has expired
75                                 # this is to allow the feed to "warm up" with duplicates
76                                 # so that the "big rush" doesn't happen.
77
78 our $minspottime = 30*60;               # the time between respots of a callsign - if a call is
79                                 # still being spotted (on the same freq) and it has been
80                                 # spotted before, it's spotted again after this time
81                                 # until the next minspottime has passed.
82
83 our $beacontime = 5*60;                 # same as minspottime, but for beacons (and shorter)
84
85 our $dwelltime = 10;                    # the amount of time to wait for duplicates before issuing
86                                 # a spot to the user (no doubt waiting with bated breath).
87
88 our $limbotime = 5*60;                  # if there are fewer than $minqual candidates and $dwelltime
89                                 # has expired then allow this spot to live a bit longer. It may
90                                 # simply be that it is not in standard spot coverage. (ask G4PIQ
91                                 # about this).
92
93 our $filterdef = $Spot::filterdef; # we use the same filter as the Spot system. Can't think why :-).
94
95 my $spots;                                              # the GLOBAL spot cache
96
97 my %runtime;                                    # how long each channel has been running
98
99 our $cachefn = localdata('rbn_cache');
100 our $cache_valid = 4*60;                # The cache file is considered valid if it is not more than this old
101
102 our $maxqrgdiff = 10;                   # the maximum
103 our $minqual = 2;                               # the minimum quality we will accept for output
104 our $maxqual = 9;                               # if there is enough quality, then short circuit any remaining dwelltime.
105
106 my $json;
107 my $noinrush = 0;                               # override the inrushpreventor if set
108 our $maxdeviants = 5;                   # the number of deviant QRGs to record for skimmer records
109
110 sub init
111 {
112         $json = DXJSON->new;
113         $json->canonical(0);
114         if (check_cache()) {
115                 $noinrush = 1;
116         } else {
117                 $spots = {VERSION=>$DATA_VERSION};
118         }
119         if (defined $DB::VERSION) {
120                 $noinrush = 1;
121                 $json->indent(1);
122         }
123         
124 }
125
126 sub new 
127 {
128         my $self = DXChannel::alloc(@_);
129
130         # routing, this must go out here to prevent race condx
131         my $pkg = shift;
132         my $call = shift;
133
134         $self->{last} = 0;
135         $self->{noraw} = 0;
136         $self->{nospot} = 0;
137         $self->{nouser} = {};
138         $self->{norbn} = 0;
139         $self->{noraw10} = 0;
140         $self->{nospot10} = 0;
141         $self->{nouser10} = {};
142         $self->{norbn10} = 0;
143         $self->{nospothour} = 0;
144         $self->{nouserhour} = {};
145         $self->{norbnhour} = 0;
146         $self->{norawhour} = 0;
147         $self->{sort} = 'N';
148         $self->{lasttime} = $main::systime;
149         $self->{minspottime} = $minspottime;
150         $self->{beacontime} = $beacontime;
151         $self->{showstats} = 0;
152         $self->{pingint} = 0;
153         $self->{nopings} = 0;
154         $self->{queue} = {};
155
156         return $self;
157 }
158
159 sub start
160
161         my ($self, $line, $sort) = @_;
162         my $user = $self->{user};
163         my $call = $self->{call};
164         my $name = $user->{name};
165                 
166         # log it
167         my $host = $self->{conn}->peerhost;
168         $host ||= "unknown";
169         $self->{hostname} = $host;
170
171         $self->{name} = $name ? $name : $call;
172         $self->state('prompt');         # a bit of room for further expansion, passwords etc
173         $self->{lang} = $user->lang || $main::lang || 'en';
174         if ($line =~ /host=/) {
175                 my ($h) = $line =~ /host=(\d+\.\d+\.\d+\.\d+)/;
176                 $line =~ s/\s*host=\d+\.\d+\.\d+\.\d+// if $h;
177                 unless ($h) {
178                         ($h) = $line =~ /host=([\da..fA..F:]+)/;
179                         $line =~ s/\s*host=[\da..fA..F:]+// if $h;
180                 }
181                 if ($h) {
182                         $h =~ s/^::ffff://;
183                         $self->{hostname} = $h;
184                 }
185         }
186         $self->{width} = 80 unless $self->{width} && $self->{width} > 80;
187         $self->{consort} = $line;       # save the connection type
188
189         LogDbg('DXCommand', "$call connected from $self->{hostname}");
190
191         # set some necessary flags on the user if they are connecting
192         $self->{registered} = 1;
193         # sort out privilege reduction
194         $self->{priv} = 0;
195
196         # get the filters
197         my $nossid = $call;
198         $nossid =~ s/-\d+$//;
199
200         $self->{inrbnfilter} = Filter::read_in('rbn', $call, 1) 
201                 || Filter::read_in('rbn', 'node_default', 1);
202         
203         # clean up qra locators
204         my $qra = $user->qra;
205         $qra = undef if ($qra && !DXBearing::is_qra($qra));
206         unless ($qra) {
207                 my $lat = $user->lat;
208                 my $long = $user->long;
209                 $user->qra(DXBearing::lltoqra($lat, $long)) if (defined $lat && defined $long);  
210         }
211
212         # if we have been running and stopped for a while 
213         # if the cache is warm enough don't operate the inrush preventor
214         $self->{inrushpreventor} = exists $runtime{$call} && $runtime{$call} > $startup_delay || $noinrush ?  0 : $main::systime + $startup_delay;
215         dbg("RBN: noinrush: $noinrush, setting inrushpreventor on $self->{call} to $self->{inrushpreventor}");
216 }
217
218 my @queue;                                              # the queue of spots ready to send
219
220 sub normal
221 {
222         my $self = shift;
223         my $line = shift;
224         my @ans;
225         my $dbgrbn = isdbg('rbn');
226         
227         # remove leading and trailing spaces
228         chomp $line;
229         $line =~ s/^\s*//;
230         $line =~ s/\s*$//;
231
232         # add base RBN
233
234         my $now = $main::systime;
235
236         # parse line
237         dbg "RBN:RAW,$line" if isdbg('rbnraw');
238         return unless $line=~/^DX\s+de/;
239
240         my (undef, undef, $origin, $qrg, $call, $mode, $s, $m, $spd, $u, $sort, $t, $tx) = split /[:\s]+/, $line;
241
242         # fix up FT8 spots from 7001
243         $t = $u, $u = '' if !$t && is_ztime($u);
244         $t = $sort, $sort = '' if !$t && is_ztime($sort);
245         my $qra = $spd, $spd = '' if is_qra($spd);
246         $u = $qra if $qra;
247
248         # is this anything like a callsign?
249         unless (is_callsign($call)) {
250                 dbg("RBN: ERROR $call from $origin on $qrg is invalid, dumped");
251                 return;
252         }
253
254         # remove all extraneous crap from the origin - just leave the base callsign
255         $origin = basecall($origin);
256         unless ($origin) {
257                 dbg("RBN: ERROR '$origin' is an invalid callsign, dumped");
258                 return;
259         }
260
261         # is this callsign in badspotter list?
262         if ($DXProt::badspotter->in($origin) || $DXProt::badnode->in($origin)) {
263                 dbg("RBN: ERROR $origin is a bad spotter/node, dumped");
264                 return;
265         }
266         
267         # is the qrg valid
268         unless ($qrg =~ /^\d+\.\d{1,3}$/) {
269                 dbg("RBN: ERROR qrg $qrg from $origin invalid, dumped");
270                 return;
271         }
272
273         $sort ||= '';
274         $tx ||= '';
275         $qra ||= '';
276     dbg qq{RBN:input decode or:$origin qr:$qrg ca:$call mo:$mode s:$s m:$m sp:$spd u:$u sort:$sort t:$t tx:$tx qra:$qra} if $dbgrbn && isdbg('rbn');
277
278         ++$self->{noraw};
279         ++$self->{noraw10};
280         ++$self->{norawhour};
281         
282         my $b;
283         
284         if ($t || $tx) {
285
286                 # fix up times for things like 'NXDXF B' etc
287                 if ($tx && is_ztime($t)) {
288                         if (is_ztime($tx)) {
289                                 $b = $t;
290                                 $t = $tx;
291                         } else {
292                                 dbg "RBN:ERR,$line";
293                                 return (0);
294                         }
295                 }
296                 if ($sort && $sort eq 'NCDXF') {
297                         $mode = 'DXF';
298                         $t = $tx;
299                 }
300                 if ($sort && $sort eq 'BEACON') {
301                         $mode = 'BCN';
302                 }
303                 if ($mode =~ /^PSK/) {
304                         $mode = 'PSK';
305                 }
306                 if ($mode eq 'RTTY') {
307                         $mode = 'RTT';
308                 }
309
310                 # The main de-duping key is [call, $frequency], but we probe a bit around that frequency to find a
311                 # range of concurrent frequencies that might be in play. 
312
313                 # The key to this is deducing the true callsign by "majority voting" (the greater the number of spotters
314         # the more effective this is) together with some lexical analsys probably in conjuction with DXSpider
315                 # data sources (for singleton spots) to then generate a "centre" from and to zone (whatever that will mean if it isn't the usual one)
316                 # and some heuristical "Kwalitee" rating given distance from the zone centres of spotter, recipient user
317         # and spotted. A map can be generated once per user and spotter as they are essentially mostly static. 
318                 # The spotted will only get a coarse position unless other info is available. Programs that parse 
319                 # DX bulletins and the online data online databases could be be used and then cached. 
320
321                 # Obviously users have to opt in to receiving RBN spots and other users will simply be passed over and
322                 # ignored.
323
324                 # Clearly this will only work in the 'mojo' branch of DXSpider where it is possible to pass off external
325                 # data requests to ephemeral or semi resident forked processes that do any grunt work and the main
326                 # process to just the standard "message passing" which has been shown to be able to sustain over 5000 
327                 # per second (limited by the test program's output and network speed, rather than DXSpider's handling).
328
329                 my $search = 5;
330                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
331                 my $sp = "$call|$nqrg";           # hopefully the skimmers will be calibrated at least this well!
332
333                 # find it?
334                 my $cand = $spots->{$sp};
335                 unless ($cand) {
336                         my ($i, $new);
337                         for ($i = $nqrg; !$cand && $i <= $nqrg+$search; $i += 1) {
338                                 $new = "$call|$i";
339                                 $cand = $spots->{$new}, last if exists $spots->{$new};
340                         }
341                         if ($cand) {
342                                 my $diff = $i - $nqrg;
343                                 dbg(qq{RBN: QRG Diff using $new (+$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
344                                 $sp = $new;
345                         }
346                 }
347                 unless ($cand) {
348                         my ($i, $new);
349                         for ($i = $nqrg; !$cand && $i >= $nqrg-$search; $i -= 1) {
350                                 $new = "$call|$i";
351                                 $cand = $spots->{$new}, last if exists $spots->{$new};
352                         }
353                         if ($cand) {
354                                 my $diff = $nqrg - $i;
355                                 dbg(qq{RBN: QRG Diff using $new (-$diff) for $sp for qrg $qrg}) if (isdbg('rbnqrg') || ($dbgrbn && isdbg('rbn')));
356                                 $sp = $new;
357                         }
358                 }
359                 
360                 # if we have one and there is only one slot and that slot's time isn't expired for respot then return
361                 my $respot = 0;
362                 if ($cand && ref $cand) {
363                         if (@$cand <= CData) {
364                                 if ($self->{minspottime} > 0 && $now - $cand->[CTime] < $self->{minspottime}) {
365                                         dbg("RBN: key: '$sp' call: $call qrg: $qrg DUPE \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
366                                         return;
367                                 }
368                                 
369                                 dbg("RBN: key: '$sp' RESPOTTING call: $call qrg: $qrg last seen \@ ". atime(int $cand->[CTime])) if $dbgrbn && isdbg('rbn');
370                                 $cand->[CTime] = $now;
371                                 ++$respot;
372                         }
373
374                         # otherwise we have a spot being built up at the moment
375                 } elsif ($cand) {
376                         dbg("RBN: key '$sp' = '$cand' not ref");
377                         return;
378                 } else {
379                         # new spot / frequency
380                         $spots->{$sp} = $cand = [$now, 0];
381                         dbg("RBN: key: '$sp' call: $call qrg: $qrg NEW" . ($respot ? ' RESPOT' : '')) if $dbgrbn && isdbg('rbn');
382                 }
383
384                 # add me to the display queue unless we are waiting for initial in rush to finish
385                 return unless $noinrush || $self->{inrushpreventor} < $main::systime;
386
387                 # build up a new record and store it in the buildup
388                 # deal with the unix time
389                 my ($hh,$mm) = $t =~ /(\d\d)(\d\d)Z$/;
390                 my $utz = $hh*3600 + $mm*60 + $main::systime_daystart; # possible issue with late spot from previous day
391                 $utz -= 86400 if $utz > $now+3600;                                         # too far ahead, drag it back one day
392
393                 # create record and add into the buildup
394                 my $r = [$origin, nearest(.1, $qrg), $call, $mode, $s, $t, $utz, $respot, $u];
395                 my @s =  Spot::prepare($r->[RQrg], $r->[RCall], $r->[RUtz], '', $r->[ROrigin]);
396                 if ($s[5] == 666) {
397                         dbg("RBN: ERROR invalid prefix/callsign $call from $origin-# on $qrg, dumped");
398                         return;
399                 }
400                 
401                 if ($self->{inrbnfilter}) {
402                         my ($want, undef) = $self->{inrbnfilter}->it($s);
403                         return unless $want;    
404                 }
405                 $r->[RSpotData] = \@s;
406
407                 ++$self->{queue}->{$sp};# unless @$cand>= CData; # queue the KEY (not the record)
408
409                 dbg("RBN: key: '$sp' ADD RECORD call: $call qrg: $qrg origin: $origin respot: $respot") if $dbgrbn && isdbg('rbn');
410
411                 push @$cand, $r;
412
413         } else {
414                 dbg "RBN:DATA,$line" if $dbgrbn && isdbg('rbn');
415         }
416 }
417
418 # we should get the spot record minus the time, so just an array of record (arrays)
419 sub send_dx_spot
420 {
421         my $self = shift;
422         my $quality = shift;
423         my $cand = shift;
424
425         ++$self->{norbn};
426         ++$self->{norbn10};
427         ++$self->{norbnhour};
428         
429         # $r = [$origin, $qrg, $call, $mode, $s, $utz, $respot];
430
431         my $mode = $cand->[CData]->[RMode]; # as all the modes will be the same;
432         
433         my @dxchan = DXChannel::get_all();
434
435         foreach my $dxchan (@dxchan) {
436                 next unless $dxchan->is_user;
437                 my $user = $dxchan->{user};
438                 next unless $user &&  $user->wantrbn;
439
440                 # does this user want this sort of spot at all?
441                 my $want = 0;
442                 ++$want if $user->wantbeacon && $mode =~ /^BCN|DXF/;
443                 ++$want if $user->wantcw && $mode =~ /^CW/;
444                 ++$want if $user->wantrtty && $mode =~ /^RTT/;
445                 ++$want if $user->wantpsk && $mode =~ /^PSK|FSK|MSK/;
446                 ++$want if $user->wantft && $mode =~ /^FT/;
447
448                 dbg(sprintf("RBN: spot selection for $dxchan->{call} mode: '$mode' want: $want flags rbn:%d ft:%d bcn:%d cw:%d psk:%d rtty:%d",
449                                         $user->wantrbn,
450                                         $user->wantft,
451                                         $user->wantbeacon,
452                                         $user->wantcw,
453                                         $user->wantpsk,
454                                         $user->wantrtty,
455                                    )) if isdbg('rbnll');
456
457                 # send one spot to one user out of the ones that we have
458                 $self->dx_spot($dxchan, $quality, $cand) if $want;
459         }
460 }
461
462 sub dx_spot
463 {
464         my $self = shift;
465         my $dxchan = shift;
466         my $quality = shift;
467         my $cand = shift;
468         my $call = $dxchan->{call};
469         my $seeme = $dxchan->user->rbnseeme();
470         my $strength = 100;             # because it could if we talk about FTx
471         my $saver;
472         my %zone;
473         my $respot;
474         my $qra;
475
476         ++$self->{nousers}->{$call};
477         ++$self->{nousers10}->{$call};
478         ++$self->{nousershour}->{$call};
479
480         my $filtered;
481         my $rf = $dxchan->{rbnfilter} || $dxchan->{spotsfilter};
482         my $comment;
483         
484         foreach my $r (@$cand) {
485                 # $r = [$origin, $qrg, $call, $mode, $s, $t, $utz, $respot, $qra];
486                 # Spot::prepare($qrg, $call, $utz, $comment, $origin);
487                 next unless $r && ref $r;
488
489                 $qra = $r->[RQra] if !$qra && $r->[RQra] && is_qra($r->[RQra]);
490
491                 $comment = sprintf "%-3s %2ddB $quality", $r->[RMode], $r->[RStrength];
492                 my $s = $r->[RSpotData];                # the prepared spot
493                 $s->[SComment] = $comment;              # apply new generated comment
494
495                 ++$zone{$s->[SZone]};           # save the spotter's zone
496
497                 # if the 'see me' flag is set, then show all the spots without further adornment (see set/rbnseeme for more info)
498                 if ($seeme) {
499                         send_final($dxchan, $s);
500                         next;
501                 }
502
503                 # save the lowest strength one
504                 if ($r->[RStrength] < $strength) {
505                         $strength = $r->[RStrength];
506                         $saver = $s;
507                         dbg("RBN: STRENGTH spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] < $strength") if isdbg 'rbnll';
508                 }
509
510                 if ($rf) {
511                         my ($want, undef) = $rf->it($s);
512                         dbg("RBN: FILTERING for $call spot: $s->[SCall] qrg: $s->[SQrg] origin: $s->[SOrigin] dB: $r->[RStrength] com: '$s->[SComment]' want: " . ($want ? 'YES':'NO')) if isdbg 'rbnll';
513                         next unless $want;
514                         $filtered = $s;
515                 }
516         }
517
518         if ($rf) {
519                 $saver = $filtered;             # if nothing passed the filter's lips then $saver == $filtered == undef !
520         }
521         
522         if ($saver) {
523                 my $buf;
524                 # create a zone list of spotters
525                 delete $zone{$saver->[SZone]};  # remove this spotter's zone (leaving all the other zones)
526                 my $z = join ',', sort {$a <=> $b} keys %zone;
527
528                 # alter spot data accordingly
529                 $saver->[SComment] .= " Z:$z" if $z;
530                 
531                 send_final($dxchan, $saver);
532                 
533                 ++$self->{nospot};
534                 ++$self->{nospot10};
535                 ++$self->{nospothour};
536                 
537                 if ($qra) {
538                         my $user = DXUser::get_current($saver->[SCall]) || DXUser->new($saver->[SCall]);
539                         unless ($user->qra && is_qra($user->qra)) {
540                                 $user->qra($qra);
541                                 dbg("RBN: update qra on $saver->[SCall] to $qra");
542                                 $user->put;
543                         }
544                 }
545         }
546 }
547
548 sub send_final
549 {
550         my $dxchan = shift;
551         my $saver = shift;
552         my $call = $dxchan->{call};
553         my $buf;
554         
555         dbg("RBN: SENDING to $call spot: $saver->[SCall] qrg: $saver->[SQrg] origin: $saver->[SOrigin] $saver->[SComment]") if isdbg 'rbnll';
556         if ($dxchan->{ve7cc}) {
557                 my $call = $saver->[SOrigin];
558                 $saver->[SOrigin] .= '-#';
559                 $buf = VE7CC::dx_spot($dxchan, @$saver);
560                 $saver->[SOrigin] = $call;
561         } else {
562                 my $call = $saver->[SOrigin];
563                 $saver->[SOrigin] = substr($call, 0, 6);
564                 $saver->[SOrigin] .= '-#';
565                 $buf = $dxchan->format_dx_spot(@$saver);
566                 $saver->[SOrigin] = $call;
567         }
568         $dxchan->local_send('N', $buf);
569 }
570
571 # per second
572 sub process
573 {
574         my $rbnskim = isdbg('rbnskim');
575         
576         foreach my $dxchan (DXChannel::get_all()) {
577                 next unless $dxchan->is_rbn;
578
579                 # At this point we run the queue to see if anything can be sent onwards to the punter
580                 my $now = $main::systime;
581                 my $ta = [gettimeofday];
582                 my $items = 0;
583                 
584                 # now run the waiting queue which just contains KEYS ($call|$qrg)
585                 foreach my $sp (keys %{$dxchan->{queue}}) {
586                         my $cand = $spots->{$sp};
587                         ++$items;
588                         
589                         unless ($cand && $cand->[CTime]) {
590                                 dbg "RBN Cand $sp " . ($cand ? 'def' : 'undef') . " [CTime] " . ($cand->[CTime] ? 'def' : 'undef') . " dwell $dwelltime";
591                                 delete $spots->{$sp};
592                                 delete $dxchan->{queue}->{$sp};    # remove
593                                 next;
594                         }
595                         
596                         my $ctime = $cand->[CTime];
597                         my $quality = @$cand - CData;
598                         my $dwellsecs =  $now - $ctime;
599                         if ($quality >= $maxqual || $dwellsecs >= $dwelltime || $dwellsecs >= $limbotime) {
600                                 # we have a candidate, create qualitee value(s);
601                                 unless (@$cand > CData) {
602                                         dbg "RBN: QUEUE key '$sp' MISSING RECORDS, IGNORED" . dd($cand) if isdbg 'rbnqueue';
603                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
604                                         delete $dxchan->{queue}->{$sp};
605                                         next;
606                                 }
607                                 dbg "RBN: QUEUE PROCESSING key: '$sp' $now >= $cand->[CTime]" if isdbg 'rbnqueue'; 
608                                 my $spotters = $quality;
609
610                                 # dump it and remove it from the queue if it is of unadequate quality, but only if it is no longer in Limbo and can be reasonably passed on to its demise
611                                 my $r = $cand->[CData];
612                                 if ($dwellsecs > $limbotime && $quality < $minqual) {
613                                         if ( $rbnskim && isdbg('rbnskim')) {
614                                                 $r = $cand->[CData];
615                                                 if ($r) {
616                                                         my $lastin = difft($ctime, $now, 2);
617                                                         my $s = "RBN:SKIM time in Limbo exceeded DUMPED (lastin: $lastin Q:$quality < Q:$minqual) key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] route: $dxchan->{call}";
618                                                         dbg($s);
619                                                 }
620                                         }
621                                         delete $spots->{$sp}; # don't remember it either - this means that a spot HAS to come in with sufficient spotters to be processed.
622                                         delete $dxchan->{queue}->{$sp};
623                                         next;
624                                 }
625
626                                 # we have a possible removal from Limbo, check for more than one skimmer and reset the quality if required
627                                 # DOES THIS TEST CAUSE RACES?
628                                 if (!$r->[Respot] && $quality >= $minqual && $dwellsecs > $dwelltime+1) {
629
630                                         # because we don't need to check for repeats by the same skimmer in the normal case, we do here
631                                         my %seen;
632                                         my @origin;
633                                         foreach my $wr (@$cand) {
634                                                 next unless ref $wr;
635                                                 push @origin, $wr->[ROrigin];
636                                                 if (exists $seen{$wr->[ROrigin]}) {
637                                                         next;
638                                                 }
639                                                 $seen{$wr->[ROrigin]} = $wr;
640                                         }
641                                         # reset the quality to ignore dupes
642                                         my $oq = $quality;
643                                         $quality = keys %seen;
644                                         if ($quality >= $minqual) {
645                                                 if ( $rbnskim && isdbg('rbnskim')) {
646                                                         my $lastin = difft($ctime, $now, 2);
647                                                         my $sk = join ' ', keys %seen;
648                                                         my $or = join ' ', @origin;
649                                                         my $s = "RBN:SKIM promoted from Limbo - key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk";
650                                                         $s .= " was $or" if $or ne $sk;
651                                                         $s .= ')';
652                                                         dbg($s);
653                                                 } 
654                                         } elsif ($oq != $quality) {
655                                                 if ( $rbnskim && isdbg('rbnskim')) {
656                                                         my $lastin = difft($ctime, $now, 2);
657                                                         my $sk = join ' ', keys %seen;
658                                                         my $or = join ' ', @origin;
659                                                         my $s = "RBN:SKIM quality reset key: '$sp' (lastin: $lastin Q now: $quality was $oq skimmers now: $sk was: $or)";
660                                                         dbg($s);
661                                                 }
662                                                 # remove the excess
663                                                 my @ncand = (@$cand[CTime, CQual], values %seen);
664                                                 $spots->{$sp} = \@ncand;
665                                         }
666                                 }
667
668                                 # we now kick this spot into Limbo 
669                                 if ($quality < $minqual) {
670                                         next;
671                                 }
672
673                                 $quality = 9 if $quality > 9;
674                                 $cand->[CQual] = $quality if $quality > $cand->[CQual];
675
676                                 # this scores each candidate according to its skimmer's QRG score (i.e. how often it agrees with its peers)
677                                 # what happens is hash of all QRGs in candidates are incremented by that skimmer's reputation for "accuracy"
678                                 # or, more exactly, past agreement with the consensus. This score can be from -5 -> +5. 
679                                 my %qrg = ();
680                                 my $skimmer;
681                                 my $sk;
682                                 my $band;
683                                 my %seen = ();
684                                 foreach $r (@$cand) {
685                                         next unless ref $r;
686                                         if (exists $seen{$r->[ROrigin]}) {
687                                                 $r = 0;
688                                                 next;
689                                         }
690                                         $seen{$r->[ROrigin]} = 1;
691                                         $band ||= int $r->[RQrg] / 1000;
692                                         $sk = "SKIM|$r->[ROrigin]|$band"; # thus only once per set of candidates
693                                         $skimmer = $spots->{$sk};
694                                         unless ($skimmer) {
695                                                 $skimmer = $spots->{$sk} = [1, 0, 0, $now, []]; # this first time, this new skimmer gets the benefit of the doubt on frequency.
696                                                 dbg("RBN:SKIM new slot $sk " . $json->encode($skimmer)) if  $rbnskim && isdbg('rbnskim');
697                                         }
698                                         $qrg{$r->[RQrg]} += ($skimmer->[DScore] || 1);
699                                 }
700                                 
701                                 # determine the most likely qrg and then set it - NOTE (-)ve votes, generated by the skimmer scoring system above, are ignored
702                                 my @deviant;
703                                 my $c = 0;
704                                 my $mv = 0;
705                                 my $qrg = 0;
706                                 while (my ($k, $votes) = each %qrg) {
707                                         if ($votes >= $mv) {
708                                                 $qrg = $k;
709                                                 $mv = $votes;
710                                         }
711                                         ++$c;
712                                 }
713
714                                 # Ignore possible spots with 0 QRG score - as determined by the skimmer scoring system above -  as they are likely to be wrong 
715                                 unless ($qrg > 0) {
716                                         if ( $rbnskim && isdbg('rbnskim')) {
717                                                 my $keys;
718                                                 while (my ($k, $v) = (each %qrg)) {
719                                                         $keys .= "$k=>$v, ";
720                                                 }
721                                                 $keys =~ /,\s*$/;
722                                                 my $i = 0;
723                                                 foreach $r (@$cand) {
724                                                         next unless $r && ref $r;
725                                                         dbg "RBN:SKIM cand $i QRG likely wrong from '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] (qrgs: $keys c: $c) route: $dxchan->{call}, ignored";
726                                                         ++$i;
727                                                 }
728                                         }
729                                         delete $spots->{$sp}; # get rid
730                                         delete $dxchan->{queue}->{$sp};
731                                         next;
732                                 }
733
734                                 # detemine and spit out the deviants. Then adjust the scores according to whether it is a deviant or good
735                                 # NOTE: deviant nodes can become good (or less bad), and good nodes bad (or less good) on each spot that
736                                 # they generate. This is based solely on each skimmer's agreement (or not) with the "consensus" score generated
737                                 # above ($qrg). The resultant score + good + bad is stored per band and will be used the next time a spot
738                                 # appears on this band from each skimmer.
739                                 foreach $r (@$cand) {
740                                         next unless $r && ref $r;
741                                         my $diff = $c > 1 ? nearest(.1, $r->[RQrg] - $qrg) : 0;
742                                         $sk = "SKIM|$r->[ROrigin]|$band";
743                                         $skimmer = $spots->{$sk};
744                                         if ($diff) {
745                                                 ++$skimmer->[DBad] if $skimmer->[DBad] < $maxdeviants;
746                                                 --$skimmer->[DGood] if $skimmer->[DGood] > 0;
747                                                 push @deviant, sprintf("$r->[ROrigin]:%+.1f", $diff);
748                                                 push @{$skimmer->[DEviants]}, $diff;
749                                                 shift @{$skimmer->[DEviants]} while @{$skimmer->[DEviants]} > $maxdeviants;
750                                         } else {
751                                                 ++$skimmer->[DGood] if $skimmer->[DGood] < $maxdeviants;
752                                                 --$skimmer->[DBad] if $skimmer->[DBad] > 0;
753                                                 shift @{$skimmer->[DEviants]};
754                                         }
755                                         $skimmer->[DScore] = $skimmer->[DGood] - $skimmer->[DBad];
756                                         if ($rbnskim && isdbg('rbnskim')) {
757                                                 my $lastin = difft($skimmer->[DLastin], $now, 2);
758                                                 my $difflist = join(', ', @{$skimmer->[DEviants]});
759                                                 $difflist = " band qrg diffs: $difflist" if $difflist;
760                                                 dbg("RBN:SKIM key $sp slot $sk $r->[RQrg] - $qrg = $diff Skimmer score: $skimmer->[DGood] - $skimmer->[DBad] = $skimmer->[DScore] lastseen:$lastin ago$difflist"); 
761                                         }
762                                         $skimmer->[DLastin] = $now;
763                                         $r->[RSpotData]->[SQrg] = $qrg if $qrg && $c > 1; # set all the QRGs to the agreed value
764                                 }
765
766                                 $qrg = (sprintf "%.1f",  $qrg)+0;
767                                 $r = $cand->[CData];
768                                 $r->[RQrg] = $qrg;
769                                 my $squality = "Q:$cand->[CQual]";
770                                 $squality .= '*' if $c > 1; 
771                                 $squality .= '+' if $r->[Respot];
772
773                                 if (isdbg('progress')) {
774                                         my $rt = difft($ctime, $now, 2);
775                                         my $s = "RBN: SPOT key: '$sp' = $r->[RCall] on $r->[RQrg] by $r->[ROrigin] \@ $r->[RTime] $squality route: $dxchan->{call} dwell:$rt";
776                                         my $td = @deviant;
777                                         $s .= " QRGScore: $mv Deviants: $td/$spotters";
778                                         $s .= ' (' . join(', ', sort @deviant) . ')' if $td;
779                                         dbg($s);
780                                 }
781
782                                 # finally send it out to any waiting public
783                                 send_dx_spot($dxchan, $squality, $cand);
784                                 
785                                 # clear out the data and make this now just "spotted", but no further action required until respot time
786                                 dbg "RBN: QUEUE key '$sp' cleared" if isdbg 'rbn';
787
788                                 delete $dxchan->{queue}->{$sp};
789
790                                 # calculate new sp (which will be 70% likely the same as the old one)
791                                 # we do this to cope with the fact that the first spotter may well be "wrongly calibrated" giving a qrg that disagrees with the majority.
792                                 # and we want to store the key that corresponds to majority opinion. 
793                                 my $nqrg = nearest(1, $qrg * 10);  # normalised to nearest Khz
794                                 my $nsp = "$r->[RCall]|$nqrg";
795                                 if ($sp ne $nsp) {
796                                         dbg("RBN:SKIM CHANGE KEY sp '$sp' -> '$nsp' for storage") if  $rbnskim && isdbg('rbnskim');
797                                         delete $spots->{$sp};
798                                         $spots->{$nsp} = [$now, $cand->[CQual]];
799                                 } else {
800                                         $spots->{$sp} = [$now, $cand->[CQual]];
801                                 }
802                         }
803                         else {
804                                 dbg sprintf("RBN: QUEUE key: '$sp' SEND time not yet reached %.1f secs left", $cand->[CTime] + $dwelltime - $now) if isdbg 'rbnqueue'; 
805                         }
806                 }
807                 if (isdbg('rbntimer')) {
808                         my $diff = _diffus($ta);
809                         dbg "RBN: TIMER process queue for call: $dxchan->{call} $items spots $diff uS";
810                 }
811         }
812 }
813
814 sub per_minute
815 {
816         foreach my $dxchan (DXChannel::get_all()) {
817                 next unless $dxchan->is_rbn;
818                 dbg "RBN:STATS minute $dxchan->{call} raw: $dxchan->{noraw} retrieved spots: $dxchan->{norbn} delivered: $dxchan->{nospot} after filtering to users: " . scalar keys %{$dxchan->{nousers}} if isdbg('rbnstats');
819                 if ($dxchan->{noraw} == 0 && $dxchan->{lasttime} > 60) {
820                         LogDbg('RBN', "RBN: no input from $dxchan->{call}, disconnecting");
821                         $dxchan->disconnect;
822                 }
823                 $dxchan->{noraw} = $dxchan->{norbn} = $dxchan->{nospot} = 0; $dxchan->{nousers} = {};
824                 $runtime{$dxchan->{call}} += 60;
825         }
826
827         # save the spot cache
828         write_cache() unless $main::systime + $startup_delay < $main::systime;;
829 }
830
831 sub per_10_minute
832 {
833         my $count = 0;
834         my $removed = 0;
835         while (my ($k,$cand) = each %{$spots}) {
836                 next if $k eq 'VERSION';
837                 next if $k =~ /^O\|/;
838                 next if $k =~ /^SKIM\|/;
839                 
840                 if ($main::systime - $cand->[CTime] > $minspottime*2) {
841                         delete $spots->{$k};
842                         ++$removed;
843                 }
844                 else {
845                         ++$count;
846                 }
847         }
848         dbg "RBN:STATS spot cache remain: $count removed: $removed"; # if isdbg('rbn');
849         foreach my $dxchan (DXChannel::get_all()) {
850                 next unless $dxchan->is_rbn;
851                 my $nq = keys %{$dxchan->{queue}};
852                 my $pc = $dxchan->{noraw10} ? sprintf("%.1f%%",$dxchan->{norbn10}*100/$dxchan->{noraw10}) : '0.0%';
853                 dbg "RBN:STATS 10-minute $dxchan->{call} queue: $nq raw: $dxchan->{noraw10} retrieved spots: $dxchan->{norbn10} ($pc) delivered: $dxchan->{nospot10} after filtering to  users: " . scalar keys %{$dxchan->{nousers10}};
854                 $dxchan->{noraw10} = $dxchan->{norbn10} = $dxchan->{nospot10} = 0; $dxchan->{nousers10} = {};
855         }
856 }
857
858 sub per_hour
859 {
860         foreach my $dxchan (DXChannel::get_all()) {
861                 next unless $dxchan->is_rbn;
862                 my $nq = keys %{$dxchan->{queue}};
863                 my $pc = $dxchan->{norawhour} ? sprintf("%.1f%%",$dxchan->{norbnhour}*100/$dxchan->{norawhour}) : '0.0%';
864                 dbg "RBN:STATS hour $dxchan->{call} queue: $nq raw: $dxchan->{norawhour} retrieved spots: $dxchan->{norbnhour} ($pc) delivered: $dxchan->{nospothour} after filtering to users: " . scalar keys %{$dxchan->{nousershour}};
865                 $dxchan->{norawhour} = $dxchan->{norbnhour} = $dxchan->{nospothour} = 0; $dxchan->{nousershour} = {};
866         }
867 }
868
869 sub finish
870 {
871         write_cache();
872 }
873
874 sub write_cache
875 {
876         my $ta = [ gettimeofday ];
877         $json->indent(1)->canonical(1) if isdbg 'rbncache';
878         my $s = eval {$json->encode($spots)};
879         if ($s) {
880                 my $fh = IO::File->new(">$cachefn") or confess("writing $cachefn $!");
881                 $fh->print($s);
882                 $fh->close;
883         } else {
884                 dbg("RBN:Write_cache error '$@'");
885                 return;
886         }
887         $json->indent(0)->canonical(0);
888         my $diff = _diffms($ta);
889         my $size = sprintf('%.3fKB', (length($s) / 1000));
890         dbg("RBN:WRITE_CACHE size: $size time to write: $diff mS");
891 }
892
893 sub check_cache
894 {
895         if (-e $cachefn) {
896                 my $mt = (stat($cachefn))[9];
897                 my $t = $main::systime - $mt || 1;
898                 my $p = difft($mt, 2);
899                 if ($t < $cache_valid) {
900                         dbg("RBN:check_cache '$cachefn' spot cache exists, created $p ago and not too old");
901                         my $fh = IO::File->new($cachefn);
902                         my $s;
903                         if ($fh) {
904                                 local $/ = undef;
905                                 $s = <$fh>;
906                                 dbg("RBN:check_cache cache read size " . length $s);
907                                 $fh->close;
908                         } else {
909                                 dbg("RBN:check_cache file read error $!");
910                                 return undef;
911                         }
912                         if ($s) {
913                                 eval {$spots = $json->decode($s)};
914                                 if ($spots && ref $spots) {     
915                                         if (exists $spots->{VERSION} && $spots->{VERSION} == $DATA_VERSION) {
916                                                 # now clean out anything that has spot build ups in progress
917                                                 while (my ($k, $cand) = each %$spots) {
918                                                         next if $k eq 'VERSION';
919                                                         next if $k =~ /^O\|/;
920                                                         next if $k =~ /^SKIM\|/;
921                                                         if (@$cand > CData) {
922                                                                 $spots->{$k} = [$cand->[CTime], $cand->[CQual]];
923                                                         }
924                                                 }
925                                                 dbg("RBN:check_cache spot cache restored");
926                                                 return 1;
927                                         } 
928                                 }
929                                 dbg("RBN::checkcache error decoding $@");
930                         }
931                 } else {
932                         my $d = difft($main::systime-$cache_valid);
933                         dbg("RBN::checkcache '$cachefn' created $p ago is too old (> $d), ignored");
934                 }
935         } else {
936                 dbg("RBN:check_cache '$cachefn' spot cache not present");
937         }
938         
939         return undef;
940 }
941
942 1;