]> gb7djk.dxcluster.net Git - spider.git/blob - perl/AsyncMsg.pm
add dbg line to dxcron::spawn_cmd
[spider.git] / perl / AsyncMsg.pm
1 #
2 # This class is the internal subclass that does various Async connects and
3 # retreivals of info. Typical uses (and specific support) include http get and
4 # post.
5
6 # This merely starts up a Msg handler (and no DXChannel) ($conn in other words)
7 # does the GET, parses out the result and the data and then (assuming a positive
8 # result and that the originating callsign is still online) punts out the data
9 # to the caller.
10 #
11 # It isn't designed to be very clever.
12 #
13 # Copyright (c) 2013 - Dirk Koopman G1TLH
14 #
15
16 package AsyncMsg;
17
18 use Msg;
19 use DXDebug;
20 use DXUtil;
21 use DXChannel;
22
23 use vars qw(@ISA $deftimeout);
24
25 @ISA = qw(Msg);
26 $deftimeout = 15;
27
28 my %outstanding;
29
30 sub new 
31 {
32         my $pkg = shift;
33         my $call = shift;
34         my $handler = shift;
35         
36         my $conn = $pkg->SUPER::new($handler);
37         $conn->{caller} = ref $call ? $call->call : $call;
38
39         # make it persistent
40         $outstanding{$conn} = $conn;
41         
42         return $conn;
43 }
44
45 sub handle_getpost
46 {
47         my ($conn, $ua, $tx) = @_;
48
49         # no point in going on if there is no-one wanting the output anymore
50         my $dxchan = DXChannel::get($conn->{caller});
51         unless ($dxchan) {
52                 $conn->disconnect;
53                 return;
54         }
55         
56         my @lines = split qr{\r?\n}, $tx->res->body;
57         
58         foreach my $msg(@lines) {
59                 dbg("AsyncMsg: $conn->{_asstate} $msg") if isdbg('async');
60                 
61                 if (my $filter = $conn->{_asfilter}) {
62                         no strict 'refs';
63                         # this will crash if the command has been redefined and the filter is a
64                         # function defined there whilst the request is in flight,
65                         # but this isn't exactly likely in a production environment.
66                         $filter->($conn, $msg, $dxchan);
67                 } else {
68                         my $prefix = $conn->{prefix} || '';
69                         $dxchan->send("$prefix$msg");
70                 }
71         }
72         
73         $conn->disconnect;
74 }
75
76 # This does a http get on a path on a host and
77 # returns the result (through an optional filter)
78 #
79 # expects to be called something like from a cmd.pl file:
80 #
81 # AsyncMsg->get($self, <host>, <port>, <path>, [<key=>value>...]
82
83 # Standard key => value pairs are:
84 #
85 # filter => CODE ref (e.g. sub { ... })
86 # prefix => <string>                 prefix output with this string
87 #
88 # Anything else is taken and sent as (extra) http header stuff e.g:
89 #
90 # 'User-Agent' => qq{DXSpider;$main::version;$main::build;$^O}
91 # 'Content-Type' => q{text/xml; charset=utf-8}
92 # 'Content-Length' => $lth
93 #
94 # Host: is always set to the name of the host (unless overridden)
95 # User-Agent: is set to default above (unless overridden)
96 #
97 sub _getpost
98 {
99         my $pkg = shift;
100         my $sort = shift;
101         my $call = shift;
102         my $host = shift;
103         my $path = shift;
104         my %args = @_;
105         
106
107         my $conn = $pkg->new($call);
108         $conn->{_asargs} = [@_];
109         $conn->{_asstate} = 'waitreply';
110         $conn->{_asfilter} = delete $args{filter} if exists $args{filter};
111         $conn->{prefix} = delete $args{prefix} if exists $args{prefix};
112         $conn->{prefix} ||= '';
113         $conn->{on_disconnect} = delete $args{on_disc} || delete $args{on_disconnect};
114         $conn->{path} = $path;
115         $conn->{host} = $conn->{peerhost} = $host;
116         $conn->{port} = $conn->{peerport} = delete $args{port} || 80;
117         $conn->{sort} = 'outgoing';
118         $conn->{_assort} = $sort;
119         $conn->{csort} = 'http';
120
121         my $data = delete $args{data};
122
123         my $ua =  Mojo::UserAgent->new;
124         my $s;
125         $s .= $host;
126         $s .= ":$port" unless $conn->{port} == 80;
127         $s .= $path;
128         dbg("AsyncMsg: $sort $s") if isdbg('async');
129         
130         my $tx = $ua->build_tx($sort => $s);
131         $ua->on(error => sub { $conn->_error(@_); });
132 #       $tx->on(error => sub { $conn->_error(@_); });
133 #       $tx->on(finish => sub { $conn->disconnect; });
134
135         $ua->on(start => sub {
136                                 my ($ua, $tx) = @_;
137                                 while (my ($k, $v) = each %args) {
138                                         dbg("AsyncMsg: attaching header $k: $v") if isdbg('async');
139                                         $tx->req->headers->header($k => $v);
140                                 }
141                                 if (defined $data) {
142                                         dbg("AsyncMsg: body ='$data'") if isdbg('async'); 
143                                         $tx->req->body($data);
144                                 }
145                         });
146         
147
148         $ua->start($tx => sub { $conn->handle_getpost(@_) }); 
149
150         
151         $conn->{mojo} = $ua;
152         return $conn if $tx;
153
154         $conn->disconnect;
155         return undef;
156 }
157
158 sub _dxchan_send
159 {
160         my $conn = shift;
161         my $msg = shift;
162         my $dxchan = DXChannel::get($conn->{caller});
163         $dxchan->send($msg) if $dxchan;
164 }
165
166 sub _error
167 {
168         my ($conn, $e, $err);
169         dbg("Async: $conn->host:$conn->port path $conn->{path} error $err") if isdbg('chan');
170         $conn->_dxchan_send("$conn->{prefix}$msg");
171         $conn->disconnect;
172 }
173         
174 sub get
175 {
176         my $pkg = shift;
177         _getpost($pkg, "GET", @_);
178 }
179
180 sub post
181 {
182         my $pkg = shift;
183         _getpost($pkg, "POST", @_);
184 }
185
186 # do a raw connection
187 #
188 # Async->raw($self, <host>, <port>, [handler => CODE ref], [prefix => <string>]);
189 #
190 # With no handler defined, everything sent by the connection will be sent to
191 # the caller.
192 #
193 # One can send stuff out on the connection by doing a standard "$conn->send_later(...)" 
194 # inside the (custom) handler.
195
196 sub raw
197 {
198         my $pkg = shift;
199         my $call = shift;
200         my $host = shift;
201         my $port = shift;
202
203         my %args = @_;
204
205         my $handler = delete $args{handler} || \&handle_raw;
206         
207         my $conn = $pkg->new($call, $handler);
208         $conn->{prefix} = delete $args{prefix} if exists $args{prefix};
209         $conn->{prefix} ||= '';
210         $conn->{on_disconnect} = delete $args{on_disc} || delete $args{on_disconnect};
211         $r = $conn->connect($host, $port, on_connect => &_on_raw_connect);
212         return $r ? $conn : undef;
213 }
214
215
216 # simple raw handler
217 #
218 # Just outputs everything
219 #
220 sub handle_raw
221 {
222         my $conn = shift;
223         my $msg = shift;
224
225         # no point in going on if there is no-one wanting the output anymore
226         my $dxchan = DXChannel::get($conn->{caller});
227         unless ($dxchan) {
228                 $conn->disconnect;
229                 return;
230         }
231
232         # send out the data
233         $dxchan->send("$conn->{prefix}$msg");
234 }
235
236
237 sub _on_raw_connect
238 {
239         my $conn = shift;
240         my $handle = shift;
241         dbg("AsyncMsg: Connected $conn->{cnum} to $conn->{host}:$conn->{port}") if isdbg('async');
242 }
243
244 sub _on_error
245 {
246         my $conn = shift;
247         my $msg = shift;
248         dbg("AsyncMsg: ***Connect $conn->{cnum} Failed to $conn->{host}:$conn->{port} $!") if isdbg('async');   
249 }
250
251 sub connect
252 {
253         my $conn = shift;
254         my $host = shift;
255         my $port = shift;
256         
257         # start a connection
258         my $r = $conn->SUPER::connect($host, $port, @_);
259
260         return $r;
261 }
262
263 sub disconnect
264 {
265         my $conn = shift;
266
267         if (my $ondisc = $conn->{on_disconnect}) {
268                 my $dxchan = DXChannel::get($conn->{caller});
269                 if ($dxchan) {
270                         no strict 'refs';
271                         $ondisc->($conn, $dxchan);
272                         delete $conn->{on_disconnect};
273                 }
274         }
275         delete $conn->{mojo};
276         delete $outstanding{$conn};
277         $conn->SUPER::disconnect;
278 }
279
280 sub DESTROY
281 {
282         my $conn = shift;
283         delete $outstanding{$conn};
284         $conn->SUPER::DESTROY;
285 }
286
287 1;
288