[libanyevent-handle-udp-perl] 30/60: Refactor bind/connect logic

Jonas Smedegaard js at alioth.debian.org
Mon Sep 30 10:05:41 UTC 2013


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository libanyevent-handle-udp-perl.

commit 427a3cff9cb9679eaf8ed2b37292ed4267350052
Author: Leon Timmermans <fawaka at gmail.com>
Date:   Mon May 28 15:56:16 2012 +0200

    Refactor bind/connect logic
---
 Changes                    |    1 +
 lib/AnyEvent/Handle/UDP.pm |  140 ++++++++++++++++++++++++++------------------
 t/10-basics.t              |    3 +-
 t/20-timeout.t             |    1 +
 4 files changed, 86 insertions(+), 59 deletions(-)

diff --git a/Changes b/Changes
index 6e31ce3..c9367ca 100644
--- a/Changes
+++ b/Changes
@@ -3,6 +3,7 @@ Revision history for AnyEvent-Handle-UDP
 {{$NEXT}}
           Make fh an IO::Socket object
           Don't insist on port number in tests
+          Refactor bind/connect logic
 
 0.035     2012-05-02 23:15:17 Europe/Amsterdam
           Make dependency on Sub::Name optional
diff --git a/lib/AnyEvent/Handle/UDP.pm b/lib/AnyEvent/Handle/UDP.pm
index fd5902c..45bce2c 100644
--- a/lib/AnyEvent/Handle/UDP.pm
+++ b/lib/AnyEvent/Handle/UDP.pm
@@ -10,8 +10,8 @@ use AnyEvent::Socket qw/parse_address/;
 
 use Carp qw/croak/;
 use Errno qw/EAGAIN EWOULDBLOCK EINTR ETIMEDOUT/;
-use Scalar::Util qw/reftype looks_like_number weaken/;
-use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY/;
+use Scalar::Util qw/reftype looks_like_number weaken openhandle/;
+use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY AF_INET AF_INET6 sockaddr_family/;
 use Symbol qw/gensym/;
 
 BEGIN {
@@ -20,8 +20,7 @@ BEGIN {
 use namespace::clean;
 
 has fh => (
-	is => 'ro',
-	default => sub { bless gensym(), 'IO::Socket' },
+	is => 'lazy',
 	handles => [ qw/sockname peername/ ],
 );
 
@@ -37,11 +36,41 @@ has _connect_addr => (
 	predicate => '_has_connect_addr',
 );
 
+sub _build_fh {
+	my $self = shift;
+	my $ret = bless gensym(), 'IO::Socket';
+	$self->_connect_to($ret, $self->_connect_addr) if $self->_has_connect_addr;
+	$self->_bind_to($ret, $self->_bind_addr) if $self->_has_bind_addr;
+	return $ret;
+}
+
+has _reader => (
+	is => 'lazy',
+	init_arg => undef,
+);
+
+sub _build__reader {
+	my $self = shift;
+	return AE::io($self->fh, 0, sub {
+		while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) {
+			$self->timeout_reset;
+			$self->rtimeout_reset;
+			$self->on_recv->($buffer, $self, $addr);
+		}
+		$self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
+		return;
+	});
+}
+
+has _buffers => (
+	is => 'ro',
+	default => sub { [] },
+	init_arg => undef,
+);
+
 sub BUILD {
 	my $self = shift;
-	$self->bind_to($self->_bind_addr) if $self->_has_bind_addr;
-	$self->connect_to($self->_connect_addr) if $self->_has_connect_addr;
-	$self->{buffers} = [];
+	$self->_reader;
 	$self->_drained;
 	return;
 }
@@ -60,7 +89,7 @@ has on_drain => (
 	clearer => 'clear_on_drain',
 	trigger => sub {
 		my ($self, $callback) = @_;
-		$self->_drained if not @{ $self->{buffers} };
+		$self->_drained if not @{ $self->_buffers };
 	},
 );
 
@@ -87,11 +116,6 @@ has family => (
 	default => sub { 0 },
 );
 
-has _full => (
-	is => 'rw',
-	predicate => '_has_full',
-);
-
 has autoflush => (
 	is => 'rw',
 	default => sub { 0 },
@@ -171,74 +195,76 @@ for my $dir ('', 'r', 'w') {
 
 sub bind_to {
 	my ($self, $addr) = @_;
+	return $self->_bind_to($self->fh, $addr);
+}
+
+sub _bind_to {
+	my ($self, $fh, $addr) = @_;
+	my $bind_to = sub {
+		my ($domain, $type, $proto, $sockaddr) = @_;
+		if (!openhandle($fh)) {
+			socket $fh, $domain, $type, $proto or redo;
+			fh_nonblocking $fh, 1;
+		}
+		bind $fh, $sockaddr or $self->_error(1, "Could not bind: $!");
+		setsockopt $fh, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error($!, 1, "Couldn't set so_reuseaddr: $!");
+	};
 	if (ref $addr) {
 		my ($host, $port) = @{$addr};
-		_on_addr($self, $host, $port, sub {
-			bind $self->{fh}, $_[0] or $self->_error(1, "Could not bind: $!");
-			setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error($!, 1, "Couldn't set so_reuseaddr: $!");
-		});
+		_on_addr($self, $fh, $host, $port, $bind_to);
 	}
 	else {
-		bind $self->{fh}, $addr or $self->_error(1, "Could not bind: $!");
-		setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error(1, "Couldn't set so_reuseaddr: $!");
+		$bind_to->(sockaddr_family($addr), SOCK_DGRAM, 0, $addr);
 	}
 	return;
 }
 
 sub connect_to {
 	my ($self, $addr) = @_;
+	return $self->($self->fh, $addr);
+}
+
+sub _connect_to {
+	my ($self, $fh, $addr) = @_;
+	my $connect_to = sub {
+		my ($domain, $type, $proto, $sockaddr) = @_;
+		if (!openhandle($fh)) {
+			socket $fh, $domain, $type, $proto or redo;
+			fh_nonblocking $fh, 1;
+		}
+		connect $fh, $sockaddr or $self->_error(1, "Could not connect: $!");
+	};
 	if (ref $addr) {
 		my ($host, $port) = @{$addr};
-		_on_addr($self, $host, $port, sub { connect $self->{fh}, $_[0] or $self->_error(1, "Could not connect: $!") });
+		_on_addr($self, $fh, $host, $port, $connect_to);
 	}
 	else {
-		connect $self->{fh}, $addr or $self->_error(1, "Could not connect: $!")
+		$connect_to->(sockaddr_family($addr), SOCK_DGRAM, 0, $addr);
 	}
 	return;
 }
 
+sub _get_family {
+	my $fh = shift;
+	return if !openhandle($fh) || !getsockname $fh;
+	my $family = sockaddr_family(getsockname $fh);
+	return $family == AF_INET ? 4 : $family == AF_INET6 ? 6 : 0;
+}
+
 sub _on_addr {
-	my ($self, $host, $port, $on_success) = @_;
+	my ($self, $fh, $host, $port, $on_success) = @_;
 
-	AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', $self->family, SOCK_DGRAM, sub {
+	AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', _get_family($fh) || $self->family, SOCK_DGRAM, sub {
 		my @targets = @_;
 		while (1) {
 			my $target = shift @targets or $self->_error(1, "No such host '$host' or port '$port'");
-	 
-			my ($domain, $type, $proto, $sockaddr) = @{$target};
-			my $full = join ':', $domain, $type, $proto;
-			if ($self->_has_full) {
-				redo if $self->_full ne $full;
-			}
-			else {
-				socket $self->fh, $domain, $type, $proto or redo;
-				fh_nonblocking $self->fh, 1;
-				$self->_full($full);
-			}
-
-			$on_success->($sockaddr);
-			$self->_add_watcher;
-
+			$on_success->(@{$target});
 			last;
 		}
 	});
 	return;
 }
 
-sub _add_watcher {
-	my $self = shift;
-
-	$self->{reader} = AE::io $self->fh, 0, sub {
-		while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) {
-			$self->timeout_reset;
-			$self->rtimeout_reset;
-			$self->on_recv->($buffer, $self, $addr);
-		}
-		$self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
-		return;
-	};
-}
-
 sub _error {
 	my ($self, $fatal, $message) = @_;
 
@@ -258,7 +284,7 @@ sub push_send {
 	my ($self, $message, $to, $cv) = @_;
 	$to = AnyEvent::Socket::pack_sockaddr($to->[1], defined $to->[0] ? parse_address($to->[0]) : INADDR_ANY) if ref $to;
 	$cv ||= defined wantarray ? AnyEvent::CondVar->new : undef;
-	if ($self->autoflush and ! @{ $self->{buffers} }) {
+	if ($self->autoflush and ! @{ $self->_buffers }) {
 		my $ret = $self->_send($message, $to, $cv);
 		$self->_push_writer($message, $to, $cv) if not defined $ret and $non_fatal{$! + 0};
 		$self->_drained if $ret;
@@ -283,14 +309,14 @@ sub _send {
 
 sub _push_writer {
 	my ($self, $message, $to, $condvar) = @_;
-	push @{$self->{buffers}}, [ $message, $to, $condvar ];
+	push @{$self->_buffers}, [ $message, $to, $condvar ];
 	$self->{writer} ||= AE::io $self->{fh}, 1, sub {
-		if (@{ $self->{buffers} }) {
-			while (my $entry = shift @{$self->{buffers}}) {
+		if (@{ $self->_buffers }) {
+			while (my $entry = shift @{$self->_buffers}) {
 				my ($msg, $to, $cv) = @{$entry};
 				my $ret = $self->_send($msg, $to, $cv);
 				if (not defined $ret) {
-					unshift @{$self->{buffers}}, $entry;
+					unshift @{$self->_buffers}, $entry;
 					$self->_error->(1, "$!") if !$non_fatal{$! + 0};
 					last;
 				}
diff --git a/t/10-basics.t b/t/10-basics.t
index d9592c9..95b99bd 100644
--- a/t/10-basics.t
+++ b/t/10-basics.t
@@ -25,8 +25,7 @@ alarm 3;
 		is($message, "Hello", "received \"Hello\"");
 		$handle->push_send("World", $client_addr);
 	});
-	my $port = (unpack_sockaddr_in($server->sockname))[0];
-	my $client = AnyEvent::Handle::UDP->new(connect => [ localhost => $port ], on_recv => $cb);
+	my $client = AnyEvent::Handle::UDP->new(connect => $server->sockname, on_recv => $cb);
 	$client->push_send("Hello");
 	is($cb->recv, "World", 'received "World"');
 }
diff --git a/t/20-timeout.t b/t/20-timeout.t
index 76ce092..7201cb1 100644
--- a/t/20-timeout.t
+++ b/t/20-timeout.t
@@ -14,6 +14,7 @@ alarm 12;
 	my $cb = AE::cv;
 	my $cb2 = AE::cv;
 	my $server = AnyEvent::Handle::UDP->new(
+		bind => [ localhost => 0 ],
 		on_recv => $cb, 
 		timeout => 3,    on_timeout => sub { $cb->croak("Timeout") },
 		rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") }

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-handle-udp-perl.git



More information about the Pkg-perl-cvs-commits mailing list