[libanyevent-handle-udp-perl] 02/60: Various additions to code and documentation
Jonas Smedegaard
js at alioth.debian.org
Mon Sep 30 10:05:36 UTC 2013
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository libanyevent-handle-udp-perl.
commit d5f65250e631c9668aa82f2bac8125421cdfafc9
Author: Leon Timmermans <fawaka at gmail.com>
Date: Mon Aug 22 14:34:06 2011 +0200
Various additions to code and documentation
---
dist.ini | 1 +
lib/AnyEvent/Handle/UDP.pm | 201 +++++++++++++++++++++++++++++++++++++++++++-
t/10-basics.t | 27 ++++++
3 files changed, 228 insertions(+), 1 deletion(-)
diff --git a/dist.ini b/dist.ini
index 1a96859..aa77af4 100644
--- a/dist.ini
+++ b/dist.ini
@@ -20,6 +20,7 @@ copyright_year = 2011
[PkgVersion]
;Testing
+[CompileTests]
[PodSyntaxTests]
[PodCoverageTests]
[KwaliteeTests]
diff --git a/lib/AnyEvent/Handle/UDP.pm b/lib/AnyEvent/Handle/UDP.pm
index 9e108d6..6472d05 100644
--- a/lib/AnyEvent/Handle/UDP.pm
+++ b/lib/AnyEvent/Handle/UDP.pm
@@ -1,5 +1,204 @@
package AnyEvent::Handle::UDP;
use strict;
-use warnings;
+use warnings FATAL => 'all';
+
+use AnyEvent qw//;
+use AnyEvent::Util qw/fh_nonblocking/;
+use AnyEvent::Socket qw//;
+
+use Carp qw/croak/;
+use Const::Fast qw/const/;
+use Class::XSAccessor accessors => [qw/on_recv on_error receive_size/], getters => [qw/fh/];
+use Errno qw/EAGAIN EWOULDBLOCK/;
+use Socket qw/SOL_SOCKET SO_REUSEADDR/;
+
+const my $default_recv_size => 1500;
+
+sub new {
+ my ($class, %options) = @_;
+
+ croak 'on_recv option is mandatory' if not defined $options{on_recv};
+ $options{receive_size} ||= $default_recv_size;
+
+ my $self = bless { map { ( $_ => $options{$_} ) } qw/on_recv receive_size on_error/ }, $class;
+
+ $self->bind_to($options{bind}) if $options{bind};
+ $self->connect_to($options{connect}) if $options{connect};
+ return $self;
+}
+
+sub bind_to {
+ my ($self, $addr) = @_;
+ if (ref $addr) {
+ my ($host, $port) = @{$addr};
+ _on_addr($self, $host, $port, sub {
+ bind $self->{fh}, $_[0] or $self->_error(1, "Could not bind: $!");
+ setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error($!, 1, "Couldn't set so_reuseaddr: $!");
+ });
+ }
+ else {
+ bind $self->{fh}, $addr or $self->_error(1, "Could not bind: $!");
+ setsockopt $self->{fh}, SOL_SOCKET, SO_REUSEADDR, 1 or $self->_error(1, "Couldn't set so_reuseaddr: $!");
+ }
+ return;
+}
+
+sub connect_to {
+ my ($self, $addr) = @_;
+ if (ref $addr) {
+ my ($host, $port) = @{$addr};
+ _on_addr($self, $host, $port, sub { connect $self->{fh}, $_[0] or $self->_error(1, "Could not connect: $!") });
+ }
+ else {
+ connect $self->{fh}, $addr or $self->_error(1, "Could not connect: $!")
+ }
+ return;
+}
+
+sub _on_addr {
+ my ($self, $host, $port, $on_success) = @_;
+
+ AnyEvent::Socket::resolve_sockaddr($host, $port, 'udp', 0, undef, sub {
+ my @targets = @_;
+ while (1) {
+ my $target = shift @targets or $self->_error(1, "No such host '$host' or port '$port'");
+
+ my ($domain, $type, $proto, $sockaddr) = @{$target};
+ my $full = join ':', $domain, $type, $proto;
+ if ($self->{fh}) {
+ return redo if $self->{full} ne $full;
+ }
+ else {
+ socket $self->{fh}, $domain, $type, $proto or redo;
+ fh_nonblocking $self->{fh}, 1;
+ $self->{full} = $full;
+ }
+
+ $on_success->($sockaddr);
+
+ $self->{reader} = AE::io $self->{fh}, 0, sub {
+ while (defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
+ $self->{on_recv}($buffer, $self, $addr);
+ }
+ $self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
+ return;
+ };
+
+ last;
+ }
+ });
+ return;
+}
+
+sub _error {
+ my ($self, $fatal, $message) = @_;
+
+ if ($self->{on_error}) {
+ $self->{on_error}($self, $fatal, $message);
+ $self->destroy if $fatal;
+ } else {
+ $self->destroy;
+ croak "AnyEvent::Handle uncaught error: $message";
+ }
+ return;
+}
+
+sub push_send {
+ my ($self, $message, $to) = @_;
+ my $ret = $self->_send();
+ $self->_push_writer($message, $to) if not defined $ret and ($! == EAGAIN or $! == EWOULDBLOCK);
+ return;
+}
+
+sub _send {
+ my ($self, $message, $to) = @_;
+ my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0;
+ $self->on_error->($self->{fh}, 1, "$!") if not defined $ret and ($! != EAGAIN and $! != EWOULDBLOCK);
+ return $ret;
+}
+
+sub _push_writer {
+ my ($self, $message, $to) = @_;
+ push @{$self->{buffers}}, [ $message, $to ];
+ $self->{writer} ||= AE::io $self->{fh}, 1, sub {
+ if (@{$self->{buffers}}) {
+ while (my $msg = shift @{$self->{buffers}}) {
+ if (not defined $self->_send(@{$msg})) {
+ unshift @{$self->{buffers}}, $msg;
+ last;
+ }
+ }
+ }
+ else {
+ delete $self->{writer};
+ }
+ };
+ return;
+}
+
+sub destroy {
+ my $self = shift;
+ $self->DESTROY;
+ %{$self} = ();
+ return;
+}
+
+sub DESTROY {
+ my $self = shift;
+ # XXX
+ return;
+}
1;
+
+__END__
+
+# ABSTRACT: client/server UDP handles for AnyEvent
+
+=head1 DESCRIPTION
+
+This module is an abstraction around UDP sockets for use with AnyEvent.
+
+=method new
+
+Create a new UDP handle. Its arguments are all optional, though using either connect or bind (or both) is strongly recommended.
+
+=over 4
+
+=item * connect
+
+Set the address to which datagrams are sent by default, and the only address from which datagrams are received. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.
+
+=item * bind
+
+The address to bind the socket to. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.
+
+=item * on_recv
+
+The callback for when a package arrives. It takes three arguments: the datagram, the handle and the address the datagram was received from.
+
+=item * on_error
+
+The callback for when an error occurs. It takes three arguments: the handle, a boolean indicating the error is fatal or not, and the error message.
+
+=item * receive_size
+
+The buffer size for the receiving in bytes. It defaults to 1500, which is slightly more than the MTA on ethernet.
+
+=back
+
+=method bind_to($address)
+
+Bind to the specified addres. Note that a bound socket may be rebound to another address. C<$address> must be in the same form as the bind argument to new.
+
+=method connect_to($address)
+
+Connect to the specified address. Note that a connected socket may be reconnected to another address. C<$address> must be in the same form as the connect argument to new.
+
+=method push_send($message, $to?)
+
+Try to send a message. If a socket is not connected a receptient address must also be given. It is connected giving a receptient may not work as expected, depending on your platform.
+
+=head1 BACKWARDS COMPATIBILITY
+
+This module is B<not> backwards compatible in any way with the previous module of the same name by Jan Henning Thorsen. That module was broken by AnyEvent itself and now considered defunct.
diff --git a/t/10-basics.t b/t/10-basics.t
new file mode 100644
index 0000000..aa1da30
--- /dev/null
+++ b/t/10-basics.t
@@ -0,0 +1,27 @@
+#! perl
+
+use strict;
+use warnings FATAL => 'all';
+use Test::More tests => 2;
+use AnyEvent::Handle::UDP;
+use IO::Socket::INET;
+
+alarm 3;
+
+{
+ my $cb = AE::cv;
+ my $server = AnyEvent::Handle::UDP->new(bind => [ localhost => 1382 ], on_recv => $cb);
+ my $client = IO::Socket::INET->new(PeerHost => 'localhost', PeerPort => 1382, Proto => 'udp');
+ send $client, "Hello", 0;
+ is($cb->recv, "Hello", 'received "Hello"');
+}
+
+{
+ my $cb = AE::cv;
+ my $server = IO::Socket::INET->new(LocalHost => 'localhost', LocalPort => 1383, Proto => 'udp') or die $!;
+ my $client = AnyEvent::Handle::UDP->new(connect => [ localhost => 1383 ], on_recv => $cb);
+ send $client->fh, "Hello", 0;
+ my $client_addr = recv $server, my ($message), 1500, 0 or die "Could not receive: $!";
+ send $server, "World", 0, $client_addr or die "Could not send: $!";
+ is($cb->recv, "World", 'received "World"');
+}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-handle-udp-perl.git
More information about the Pkg-perl-cvs-commits
mailing list