[libanyevent-handle-udp-perl] 30/60: Refactor bind/connect logic
Jonas Smedegaard
js at alioth.debian.org
Mon Sep 30 10:05:41 UTC 2013
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository libanyevent-handle-udp-perl.
commit 427a3cff9cb9679eaf8ed2b37292ed4267350052
Author: Leon Timmermans <fawaka at gmail.com>
Date: Mon May 28 15:56:16 2012 +0200
Refactor bind/connect logic
---
Changes | 1 +
lib/AnyEvent/Handle/UDP.pm | 140 ++++++++++++++++++++++++++------------------
t/10-basics.t | 3 +-
t/20-timeout.t | 1 +
4 files changed, 86 insertions(+), 59 deletions(-)
diff --git a/Changes b/Changes
index 6e31ce3..c9367ca 100644
--- a/Changes
+++ b/Changes
@@ -3,6 +3,7 @@ Revision history for AnyEvent-Handle-UDP
{{$NEXT}}
Make fh an IO::Socket object
Don't insist on port number in tests
+ Refactor bind/connect logic
0.035 2012-05-02 23:15:17 Europe/Amsterdam
Make dependency on Sub::Name optional
diff --git a/lib/AnyEvent/Handle/UDP.pm b/lib/AnyEvent/Handle/UDP.pm
index fd5902c..45bce2c 100644
--- a/lib/AnyEvent/Handle/UDP.pm
+++ b/lib/AnyEvent/Handle/UDP.pm
@@ -10,8 +10,8 @@ use AnyEvent::Socket qw/parse_address/;
use Carp qw/croak/;
use Errno qw/EAGAIN EWOULDBLOCK EINTR ETIMEDOUT/;
-use Scalar::Util qw/reftype looks_like_number weaken/;
-use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY/;
+use Scalar::Util qw/reftype looks_like_number weaken openhandle/;
+use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY AF_INET AF_INET6 sockaddr_family/;
use Symbol qw/gensym/;
BEGIN {
@@ -20,8 +20,7 @@ BEGIN {
use namespace::clean;
has fh => (
- is => 'ro',
- default => sub { bless gensym(), 'IO::Socket' },
+ is => 'lazy',
handles => [ qw/sockname peername/ ],
);
@@ -37,11 +36,41 @@ has _connect_addr => (
predicate => '_has_connect_addr',
);
+sub _build_fh {
+ my $self = shift;
+ my $ret = bless gensym(), 'IO::Socket';
+ $self->_connect_to($ret, $self->_connect_addr) if $self->_has_connect_addr;
+ $self->_bind_to($ret, $self->_bind_addr) if $self->_has_bind_addr;
+ return $ret;
+}
+
+has _reader => (
+ is => 'lazy',
+ init_arg => undef,
+);
+
+sub _build__reader {
+ my $self = shift;
+ return AE::io($self->fh, 0, sub {
+ while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) {
+ $self->timeout_reset;
+ $self->rtimeout_reset;
+ $self->on_recv->($buffer, $self, $addr);
+ }
+ $self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
+ return;
+ });
+}
+
+has _buffers => (
+ is => 'ro',
+ default => sub { [] },
+ init_arg => undef,
+);
+
sub BUILD {
my $self = shift;
- $self->bind_to($self->_bind_addr) if $self->_has_bind_addr;
- $self->connect_to($self->_connect_addr) if $self->_has_connect_addr;
- $self->{buffers} = [];
+ $self->_reader;
$self->_drained;
return;
}
@@ -60,7 +89,7 @@ has on_drain => (
clearer => 'clear_on_drain',
trigger => sub {
my ($self, $callback) = @_;
- $self->_drained if not @{ $self->{buffers} };
+ $self->_drained if not @{ $self->_buffers };
},
);
@@ -87,11 +116,6 @@ has family => (
default => sub { 0 },
);
-has _full => (
- is => 'rw',
- predicate => '_has_full',
-);
-
has autoflush => (
is => 'rw',
default => sub { 0 },
@@ -171,74 +195,76 @@ for my $dir ('', 'r', 'w') {
sub bind_to {
my ($self, $addr) = @_;
+ return $self->_bind_to($self->fh, $addr);
+}
+
+sub _bind_to {
+ my ($self, $fh, $addr) = @_;
+ my $bind_to = sub {
+ my ($domain, $type, $proto, $sockaddr) = @_;
+ if (!openhandle($fh)) {
+ socket $fh, $domain, $type, $proto or redo;
+ fh_nonblocking $fh, 1;
+ }
+ bind $fh, $sockaddr or $self->_error(1, "Could not bind: $!");
+ setsockopt $fh, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error($!, 1, "Couldn't set so_reuseaddr: $!");
+ };
if (ref $addr) {
my ($host, $port) = @{$addr};
- _on_addr($self, $host, $port, sub {
- bind $self->{fh}, $_[0] or $self->_error(1, "Could not bind: $!");
- setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error($!, 1, "Couldn't set so_reuseaddr: $!");
- });
+ _on_addr($self, $fh, $host, $port, $bind_to);
}
else {
- bind $self->{fh}, $addr or $self->_error(1, "Could not bind: $!");
- setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error(1, "Couldn't set so_reuseaddr: $!");
+ $bind_to->(sockaddr_family($addr), SOCK_DGRAM, 0, $addr);
}
return;
}
sub connect_to {
my ($self, $addr) = @_;
+ return $self->($self->fh, $addr);
+}
+
+sub _connect_to {
+ my ($self, $fh, $addr) = @_;
+ my $connect_to = sub {
+ my ($domain, $type, $proto, $sockaddr) = @_;
+ if (!openhandle($fh)) {
+ socket $fh, $domain, $type, $proto or redo;
+ fh_nonblocking $fh, 1;
+ }
+ connect $fh, $sockaddr or $self->_error(1, "Could not connect: $!");
+ };
if (ref $addr) {
my ($host, $port) = @{$addr};
- _on_addr($self, $host, $port, sub { connect $self->{fh}, $_[0] or $self->_error(1, "Could not connect: $!") });
+ _on_addr($self, $fh, $host, $port, $connect_to);
}
else {
- connect $self->{fh}, $addr or $self->_error(1, "Could not connect: $!")
+ $connect_to->(sockaddr_family($addr), SOCK_DGRAM, 0, $addr);
}
return;
}
+sub _get_family {
+ my $fh = shift;
+ return if !openhandle($fh) || !getsockname $fh;
+ my $family = sockaddr_family(getsockname $fh);
+ return $family == AF_INET ? 4 : $family == AF_INET6 ? 6 : 0;
+}
+
sub _on_addr {
- my ($self, $host, $port, $on_success) = @_;
+ my ($self, $fh, $host, $port, $on_success) = @_;
- AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', $self->family, SOCK_DGRAM, sub {
+ AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', _get_family($fh) || $self->family, SOCK_DGRAM, sub {
my @targets = @_;
while (1) {
my $target = shift @targets or $self->_error(1, "No such host '$host' or port '$port'");
-
- my ($domain, $type, $proto, $sockaddr) = @{$target};
- my $full = join ':', $domain, $type, $proto;
- if ($self->_has_full) {
- redo if $self->_full ne $full;
- }
- else {
- socket $self->fh, $domain, $type, $proto or redo;
- fh_nonblocking $self->fh, 1;
- $self->_full($full);
- }
-
- $on_success->($sockaddr);
- $self->_add_watcher;
-
+ $on_success->(@{$target});
last;
}
});
return;
}
-sub _add_watcher {
- my $self = shift;
-
- $self->{reader} = AE::io $self->fh, 0, sub {
- while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) {
- $self->timeout_reset;
- $self->rtimeout_reset;
- $self->on_recv->($buffer, $self, $addr);
- }
- $self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
- return;
- };
-}
-
sub _error {
my ($self, $fatal, $message) = @_;
@@ -258,7 +284,7 @@ sub push_send {
my ($self, $message, $to, $cv) = @_;
$to = AnyEvent::Socket::pack_sockaddr($to->[1], defined $to->[0] ? parse_address($to->[0]) : INADDR_ANY) if ref $to;
$cv ||= defined wantarray ? AnyEvent::CondVar->new : undef;
- if ($self->autoflush and ! @{ $self->{buffers} }) {
+ if ($self->autoflush and ! @{ $self->_buffers }) {
my $ret = $self->_send($message, $to, $cv);
$self->_push_writer($message, $to, $cv) if not defined $ret and $non_fatal{$! + 0};
$self->_drained if $ret;
@@ -283,14 +309,14 @@ sub _send {
sub _push_writer {
my ($self, $message, $to, $condvar) = @_;
- push @{$self->{buffers}}, [ $message, $to, $condvar ];
+ push @{$self->_buffers}, [ $message, $to, $condvar ];
$self->{writer} ||= AE::io $self->{fh}, 1, sub {
- if (@{ $self->{buffers} }) {
- while (my $entry = shift @{$self->{buffers}}) {
+ if (@{ $self->_buffers }) {
+ while (my $entry = shift @{$self->_buffers}) {
my ($msg, $to, $cv) = @{$entry};
my $ret = $self->_send($msg, $to, $cv);
if (not defined $ret) {
- unshift @{$self->{buffers}}, $entry;
+ unshift @{$self->_buffers}, $entry;
$self->_error->(1, "$!") if !$non_fatal{$! + 0};
last;
}
diff --git a/t/10-basics.t b/t/10-basics.t
index d9592c9..95b99bd 100644
--- a/t/10-basics.t
+++ b/t/10-basics.t
@@ -25,8 +25,7 @@ alarm 3;
is($message, "Hello", "received \"Hello\"");
$handle->push_send("World", $client_addr);
});
- my $port = (unpack_sockaddr_in($server->sockname))[0];
- my $client = AnyEvent::Handle::UDP->new(connect => [ localhost => $port ], on_recv => $cb);
+ my $client = AnyEvent::Handle::UDP->new(connect => $server->sockname, on_recv => $cb);
$client->push_send("Hello");
is($cb->recv, "World", 'received "World"');
}
diff --git a/t/20-timeout.t b/t/20-timeout.t
index 76ce092..7201cb1 100644
--- a/t/20-timeout.t
+++ b/t/20-timeout.t
@@ -14,6 +14,7 @@ alarm 12;
my $cb = AE::cv;
my $cb2 = AE::cv;
my $server = AnyEvent::Handle::UDP->new(
+ bind => [ localhost => 0 ],
on_recv => $cb,
timeout => 3, on_timeout => sub { $cb->croak("Timeout") },
rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") }
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-handle-udp-perl.git
More information about the Pkg-perl-cvs-commits
mailing list