+sub _on_connect
+{
+ my $conn = shift;
+ my $handle = shift;
+ undef $conn->{sock};
+ my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
+ $sock->on(read => sub {$conn->_rcv($_[1]);} );
+ $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;});
+ $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;});
+ $sock->timeout(0);
+ $sock->start;
+ $conn->{peerhost} = eval { $handle->peerhost; };
+ dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
+ if ($conn->{on_connect}) {
+ &{$conn->{on_connect}}($conn, $handle);
+ }
+}
+
+sub is_connected
+{
+ my $conn = shift;
+ my $sock = $conn->{sock};
+ return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
+}
+
+sub connect {
+ my ($pkg, $to_host, $to_port, %args) = @_;
+ my $timeout = delete $args{timeout} || $connect_timeout;
+