my %outstanding;
-#
-# standard http get handler
-#
-sub handle_get
+sub new
{
- my $conn = shift;
- my $msg = shift;
+ my $pkg = shift;
+ my $call = shift;
+ my $handler = shift;
+
+ my $conn = $pkg->SUPER::new($handler);
+ $conn->{caller} = ref $call ? $call->call : $call;
- my $state = $conn->{_asstate};
+ # make it persistent
+ $outstanding{$conn} = $conn;
- dbg("asyncmsg: $state $msg") if isdbg('async');
+ return $conn;
+}
+
+sub handle_getpost
+{
+ my ($conn, $ua, $tx) = @_;
# no point in going on if there is no-one wanting the output anymore
my $dxchan = DXChannel::get($conn->{caller});
return;
}
- if ($state eq 'waitreply') {
- # look at the reply code and decide whether it is a success
- my ($http, $code, $ascii) = $msg =~ m|(HTTP/\d\.\d)\s+(\d+)\s+(.*)|;
- if ($code == 200) {
- # success
- $conn->{_asstate} = 'waitblank';
- } elsif ($code == 302) {
- # redirect
- $conn->{_asstate} = 'waitlocation';
- } else {
- $dxchan->send("$code $ascii");
- $conn->disconnect;
- }
- } elsif ($state eq 'waitlocation') {
- my ($path) = $msg =~ m|Location:\s*(.*)|;
- if ($path) {
- my $newconn;
- my @uri = split m|/+|, $path;
- if ($uri[0] eq 'http:') {
- shift @uri;
- my $host = shift @uri;
- my $newpath = '/' . join('/', @uri);
- $newpath .= '/' if $path =~ m|/$|;
- $newconn = _getpost(ref $conn, $conn->{_assort}, $conn->{caller}, $host, 80, $newpath, @{$conn->{_asargs}});
- } elsif ($path =~ m|^/|) {
- $newconn = _getpost(ref $conn, $conn->{_assort}, $conn->{caller}, $conn->{peerhost}, $conn->{peerport}, $path, @{$conn->{_asargs}});
- }
- if ($newconn) {
- # copy over any elements in $conn that are not in $newconn
- while (my ($k,$v) = each %$conn) {
- dbg("async: $state copying over $k -> \$newconn") if isdbg('async');
- $newconn{$k} = $v unless exists $newconn{$k};
- }
- }
- delete $conn->{on_disconnect};
- $conn->disconnect;
- }
- } elsif ($state eq 'waitblank') {
- unless ($msg) {
- $conn->{_asstate} = 'indata';
- }
- } elsif ($conn->{_asstate} eq 'indata') {
+ my @lines = split qr{\r?\n}, $tx->res->body;
+
+ foreach my $msg(@lines) {
+ dbg("AsyncMsg: $conn->{_asstate} $msg") if isdbg('async');
+
if (my $filter = $conn->{_asfilter}) {
no strict 'refs';
# this will crash if the command has been redefined and the filter is a
$dxchan->send("$prefix$msg");
}
}
-}
-
-#
-# simple raw handler
-#
-# Just outputs everything
-#
-sub handle_raw
-{
- my $conn = shift;
- my $msg = shift;
-
- # no point in going on if there is no-one wanting the output anymore
- my $dxchan = DXChannel::get($conn->{caller});
- unless ($dxchan) {
- $conn->disconnect;
- return;
- }
-
- # send out the data
- my $prefix = $conn->{prefix} || '';
- $dxchan->send("$prefix$msg");
-}
-
-sub new
-{
- my $pkg = shift;
- my $call = shift;
- my $handler = shift;
-
- my $conn = $pkg->SUPER::new($handler);
- $conn->{caller} = ref $call ? $call->call : $call;
-
- # make it persistent
- $outstanding{$conn} = $conn;
- return $conn;
+ $conn->disconnect;
}
# This does a http get on a path on a host and
my $sort = shift;
my $call = shift;
my $host = shift;
- my $port = shift;
my $path = shift;
my %args = @_;
- my $conn = $pkg->new($call, \&handle_get);
+ my $conn = $pkg->new($call);
$conn->{_asargs} = [@_];
$conn->{_asstate} = 'waitreply';
$conn->{_asfilter} = delete $args{filter} if exists $args{filter};
$conn->{prefix} = delete $args{prefix} if exists $args{prefix};
+ $conn->{prefix} ||= '';
$conn->{on_disconnect} = delete $args{on_disc} || delete $args{on_disconnect};
$conn->{path} = $path;
- $conn->{host} = $host;
- $conn->{port} = $port;
+ $conn->{host} = $conn->{peerhost} = $host;
+ $conn->{port} = $conn->{peerport} = delete $args{port} || 80;
+ $conn->{sort} = 'outgoing';
$conn->{_assort} = $sort;
+ $conn->{csort} = 'http';
+
+ my $ua = Mojo::UserAgent->new;
+ my $s;
+ $s .= $host;
+ $s .= ":$port" unless $conn->{port} == 80;
+ $s .= $path;
+ dbg("AsyncMsg: $sort $s") if isdbg('async');
- $r = $conn->connect($host, $port, on_connect=>sub {$conn->_on_getpost_connect(@_)});
+ my $tx = $ua->build_tx($sort => $s);
+ $ua->on(error => sub { $conn->_error(@_); });
+# $tx->on(error => sub { $conn->_error(@_); });
+# $tx->on(finish => sub { $conn->disconnect; });
+
+ $ua->start($tx => sub { $conn->handle_getpost(@_) });
+
- return $r ? $conn : undef;
+ $conn->{mojo} = $ua;
+ return $conn if $tx;
+
+ $conn->disconnect;
+ return undef;
}
-sub _on_getpost_connect
+sub _dxchan_send
{
my $conn = shift;
-
- dbg("Sending '$conn->{_assort} $conn->{path} HTTP/1.0'") if isdbg('async');
- $conn->send_later("$conn->{_assort} $conn->{path} HTTP/1.0\n");
-
- my $h = delete $args{Host} || $host;
- my $u = delete $args{'User-Agent'} || "DxSpider;$main::version;$main::build;$^O;$main::mycall";
- my $d = delete $args{data};
-
- $conn->send_later("Host: $h\n");
- $conn->send_later("User-Agent: $u\n");
- while (my ($k,$v) = each %args) {
- $conn->send_later("$k: $v\n");
- }
- $conn->send_later("\n$d") if defined $d;
+ my $msg = shift;
+ my $dxchan = DXChannel::get($conn->{caller});
+ $dxchan->send($msg) if $dxchan;
}
+sub _error
+{
+ my ($conn, $e, $err);
+ dbg("Async: $conn->host:$conn->port path $conn->{path} error $err") if isdbg('chan');
+ $conn->_dxchan_send("$conn->{prefix}$msg");
+ $conn->disconnect;
+}
+
sub get
{
my $pkg = shift;
my $handler = delete $args{handler} || \&handle_raw;
my $conn = $pkg->new($call, $handler);
$conn->{prefix} = delete $args{prefix} if exists $args{prefix};
+ $conn->{prefix} ||= '';
$r = $conn->connect($host, $port, on_connect => &_on_raw_connect);
return $r ? $conn : undef;
}
+#
+# simple raw handler
+#
+# Just outputs everything
+#
+sub handle_raw
+{
+ my $conn = shift;
+ my $msg = shift;
+
+ # no point in going on if there is no-one wanting the output anymore
+ my $dxchan = DXChannel::get($conn->{caller});
+ unless ($dxchan) {
+ $conn->disconnect;
+ return;
+ }
+
+ # send out the data
+ $dxchan->send("$conn->{prefix}$msg");
+}
+
+
sub _on_raw_connect
{
my $conn = shift;
$ondisc->($conn, $dxchan)
}
}
+ delete $conn->{mojo};
delete $outstanding{$conn};
$conn->SUPER::disconnect;
}