[libmessage-passing-zeromq-perl] 02/78: Input code
Jonas Smedegaard
js at alioth.debian.org
Mon Sep 30 09:28:15 UTC 2013
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository libmessage-passing-zeromq-perl.
commit d2c99c56b7caf854ec411e12ac2a27981248904f
Author: Tomas Doran <bobtfish at bobtfish.net>
Date: Sat Feb 25 09:34:16 2012 +0000
Input code
---
lib/Log/Stash/Input/ZeroMQ.pm | 75 ++++++++++++++++++++++++++++++++++++++++-
1 file changed, 74 insertions(+), 1 deletion(-)
diff --git a/lib/Log/Stash/Input/ZeroMQ.pm b/lib/Log/Stash/Input/ZeroMQ.pm
index c7ff739..11d923d 100644
--- a/lib/Log/Stash/Input/ZeroMQ.pm
+++ b/lib/Log/Stash/Input/ZeroMQ.pm
@@ -1,10 +1,83 @@
package Log::Stash::Input::ZeroMQ;
use Moose;
-use ZeroMQ;
+use ZeroMQ qw/:all/;
use namespace::autoclean;
with 'Log::Stash::Mixin::Input';
+has socket_bind => (
+ isa => 'Str',
+ default => 'tcp://*:5558',
+);
+
+has _ctx => (
+ is => 'ro',
+ isa => 'ZeroMQ::Context',
+ default => sub { ZeroMQ::Context->new() },
+);
+
+has _socket => (
+ is => 'ro',
+ isa => 'ZeroMQ::Socket',
+ lazy => 1,
+ default => sub {
+ my $self = shift;
+ my $socket = $self->_ctx->socket(ZMQ_SUB);
+ $socket->setsockopt(ZMQ_SUBSCRIBE, '');
+ $socket->setsockopt(ZMQ_HWM, 100000); # Buffer up to 100k messages.
+ $socket->bind($self->socket_bind);
+ return $socket;
+ },
+ handles => {
+ _zmq_recv => 'recv',
+ },
+);
+
+sub _try_rx {
+ my $self = shift();
+ my $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
+ if ($msg) {
+ my $data = try { $self->decode($msg) }
+ catch { warn $_ };
+ $self->output_to->consume($data);
+ }
+ return $msg;
+}
+
+has _io_reader => (
+ is => 'ro',
+ lazy => 1,
+ default => sub {
+ my $weak_self = shift;
+ weaken($weak_self);
+ AE::io $weak_self->_socket->getsockopt( ZMQ_FD ), 0,
+ sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
+ },
+);
+
+# Note that we need this timer as ZMQ is magic..
+# Just checking our local FD for readability will not always
+# be enough, as the client end of ZQM may not start pushing messages to us,
+# ergo we call ->recv explicitly on the socket to get messages
+# which may be pre-buffered at a client as fast as possible (i.e. before
+# the client pushes another message).
+has _zmq_timer => (
+ is => 'ro',
+ lazy => 1,
+ default => sub {
+ my $weak_self = shift;
+ weaken($weak_self);
+ AnyEvent->timer(after => 1, interval => 1,
+ cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
+ },
+);
+
+sub BUILD {
+ my $self = shift;
+ $self->_io_reader;
+ $self->_zmq_timer;
+}
+
1;
=head1 NAME
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmessage-passing-zeromq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list