[libmessage-passing-zeromq-perl] 02/78: Input code

Jonas Smedegaard js at alioth.debian.org
Mon Sep 30 09:28:15 UTC 2013


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository libmessage-passing-zeromq-perl.

commit d2c99c56b7caf854ec411e12ac2a27981248904f
Author: Tomas Doran <bobtfish at bobtfish.net>
Date:   Sat Feb 25 09:34:16 2012 +0000

    Input code
---
 lib/Log/Stash/Input/ZeroMQ.pm |   75 ++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 74 insertions(+), 1 deletion(-)

diff --git a/lib/Log/Stash/Input/ZeroMQ.pm b/lib/Log/Stash/Input/ZeroMQ.pm
index c7ff739..11d923d 100644
--- a/lib/Log/Stash/Input/ZeroMQ.pm
+++ b/lib/Log/Stash/Input/ZeroMQ.pm
@@ -1,10 +1,83 @@
 package Log::Stash::Input::ZeroMQ;
 use Moose;
-use ZeroMQ;
+use ZeroMQ qw/:all/;
 use namespace::autoclean;
 
 with 'Log::Stash::Mixin::Input';
 
+has socket_bind => (
+    isa => 'Str',
+    default => 'tcp://*:5558',
+);
+
+has _ctx => (
+    is => 'ro',
+    isa => 'ZeroMQ::Context',
+    default => sub { ZeroMQ::Context->new() },
+);
+
+has _socket => (
+    is => 'ro',
+    isa => 'ZeroMQ::Socket',
+    lazy => 1,
+    default => sub {
+        my $self = shift;
+        my $socket = $self->_ctx->socket(ZMQ_SUB);
+        $socket->setsockopt(ZMQ_SUBSCRIBE, '');
+        $socket->setsockopt(ZMQ_HWM, 100000); # Buffer up to 100k messages.
+        $socket->bind($self->socket_bind);
+        return $socket;
+    },
+    handles => {
+        _zmq_recv => 'recv',
+    },
+);
+
+sub _try_rx {
+    my $self = shift();
+    my $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
+    if ($msg) {
+        my $data = try { $self->decode($msg) }
+            catch { warn $_ };
+        $self->output_to->consume($data);
+    }
+    return $msg;
+}
+
+has _io_reader => (
+    is => 'ro',
+    lazy => 1,
+    default => sub {
+        my $weak_self = shift;
+        weaken($weak_self);
+        AE::io $weak_self->_socket->getsockopt( ZMQ_FD ), 0,
+            sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
+    },
+);
+
+# Note that we need this timer as ZMQ is magic..
+# Just checking our local FD for readability will not always
+# be enough, as the client end of ZQM may not start pushing messages to us,
+# ergo we call ->recv explicitly on the socket to get messages
+# which may be pre-buffered at a client as fast as possible (i.e. before
+# the client pushes another message).
+has _zmq_timer => (
+    is => 'ro',
+    lazy => 1,
+    default => sub {
+        my $weak_self = shift;
+        weaken($weak_self);
+        AnyEvent->timer(after => 1, interval => 1,
+            cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
+    },
+);
+
+sub BUILD {
+    my $self = shift;
+    $self->_io_reader;
+    $self->_zmq_timer;
+}
+
 1;
 
 =head1 NAME

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmessage-passing-zeromq-perl.git



More information about the Pkg-perl-cvs-commits mailing list