r50363 - in /trunk/libgearman-client-perl: ./ debian/ lib/Gearman/ t/

jawnsy-guest at users.alioth.debian.org jawnsy-guest at users.alioth.debian.org
Wed Jan 6 14:34:45 UTC 2010


Author: jawnsy-guest
Date: Wed Jan  6 14:34:39 2010
New Revision: 50363

URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=50363
Log:
New upstream release

Added:
    trunk/libgearman-client-perl/t/51-large_args.t
      - copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/51-large_args.t
    trunk/libgearman-client-perl/t/60-stop-if.t
      - copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/60-stop-if.t
    trunk/libgearman-client-perl/t/65-responseparser.t
      - copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/65-responseparser.t
Modified:
    trunk/libgearman-client-perl/CHANGES
    trunk/libgearman-client-perl/MANIFEST
    trunk/libgearman-client-perl/META.yml
    trunk/libgearman-client-perl/debian/changelog
    trunk/libgearman-client-perl/lib/Gearman/Client.pm
    trunk/libgearman-client-perl/lib/Gearman/Objects.pm
    trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm
    trunk/libgearman-client-perl/lib/Gearman/Task.pm
    trunk/libgearman-client-perl/lib/Gearman/Taskset.pm
    trunk/libgearman-client-perl/lib/Gearman/Util.pm
    trunk/libgearman-client-perl/lib/Gearman/Worker.pm
    trunk/libgearman-client-perl/t/10-all.t
    trunk/libgearman-client-perl/t/30-maxqueue.t
    trunk/libgearman-client-perl/t/40-prefix.t
    trunk/libgearman-client-perl/t/worker.pl

Modified: trunk/libgearman-client-perl/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/CHANGES?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/CHANGES (original)
+++ trunk/libgearman-client-perl/CHANGES Wed Jan  6 14:34:39 2010
@@ -1,3 +1,42 @@
+1.10 (2009-10-04)
+
+     -- Make workers wake up periodically for a particular server to make sure they aren't
+        stale connections. This happened naturally (although at relatively low interval) in
+        previous releases.
+
+     -- Help prevent leaks when Gearman::Task->add_hook is used.
+
+     -- Change worker to only 'wake up' against a gearmand that has woken it up, this prevents
+        a constant flood of pre-sleep command from arriving at an otherwise silent gearmand.
+
+     -- Fix issue where prefixes were double-prepended on function names on worker
+        upon reconnect.
+
+     -- Fix issue where the other response parser code in Util would silently truncate
+        argument strings longer than 20*the socket buffer size.
+
+     -- Fix issue where the ResponseParser class would not correctly handle messages with
+        zero-length bodies.
+
+     -- Make the Gearman::Task class autoload Storable and fail gracefully when it's not
+        loadable.
+
+     -- Initial fold-in of exceptions support in the gearman client, makes an option
+        to the gearman server to enable it, and is disabled by default. Workers will
+        will attempt to throw exceptions anytime Storable is available.
+
+     -- fix jobs of > 32kilobytes in size so they work properly (workers would disconnect
+        when a job greater than 32kb would arrive.)
+
+     -- expose the time that the last job was processed in the stop_if hook of worker.
+        Since a jobserver wakes up all workers in the case of a new job to be processed
+        the concept of is_idle doesn't actually reflect if a worker has procssed jobs,
+        rather it indicates whether the jobserver has been silent for 10 seconds.
+
+     -- change server polling order in workers to start at a random point in the list
+        during every worker pass. So we drain jobs from all servers rather than
+        working on each of them in order.
+
 1.09 (2007-06-29)
 
      -- document the license and copyright

Modified: trunk/libgearman-client-perl/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/MANIFEST?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/MANIFEST (original)
+++ trunk/libgearman-client-perl/MANIFEST Wed Jan  6 14:34:39 2010
@@ -19,7 +19,10 @@
 t/30-maxqueue.t
 t/40-prefix.t
 t/50-wait_timeout.t
+t/51-large_args.t
+t/60-stop-if.t
 t/lib/GearTestLib.pm
 t/TestGearman.pm
 t/worker.pl
+t/65-responseparser.t
 TODO

Modified: trunk/libgearman-client-perl/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/META.yml?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/META.yml (original)
+++ trunk/libgearman-client-perl/META.yml Wed Jan  6 14:34:39 2010
@@ -1,11 +1,11 @@
 # http://module-build.sourceforge.net/META-spec.html
 #XXXXXXX This is a prototype!!!  It will change in the future!!! XXXXX#
 name:         Gearman
-version:      1.09
+version:      1.10
 version_from: lib/Gearman/Client.pm
 installdirs:  site
 requires:
     String::CRC32:                 0
 
 distribution_type: module
-generated_by: ExtUtils::MakeMaker version 6.17
+generated_by: ExtUtils::MakeMaker version 6.30

Modified: trunk/libgearman-client-perl/debian/changelog
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/debian/changelog?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/debian/changelog (original)
+++ trunk/libgearman-client-perl/debian/changelog Wed Jan  6 14:34:39 2010
@@ -1,6 +1,7 @@
-libgearman-client-perl (1.09-1.1) UNRELEASED; urgency=low
+libgearman-client-perl (1.10-1) UNRELEASED; urgency=low
 
   * Take over package (Closes: #549364)
+  * New upstream release
   * Use new short debhelper 7 rules format
   * No longer install README and HACKING
   * Update watch file to match upstream
@@ -9,7 +10,7 @@
   * Slight rewrite of the control description
   * Refresh copyright information to DEP5 format
 
- -- Jonathan Yu <jawnsy at cpan.org>  Tue, 05 Jan 2010 17:46:49 -0500
+ -- Jonathan Yu <jawnsy at cpan.org>  Wed, 06 Jan 2010 09:36:17 -0500
 
 libgearman-client-perl (1.09-1) unstable; urgency=low
 

Modified: trunk/libgearman-client-perl/lib/Gearman/Client.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Client.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Client.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Client.pm Wed Jan  6 14:34:39 2010
@@ -1,11 +1,9 @@
 #!/usr/bin/perl
 
-#TODO: timeout isn't supported by this client API yet.
-
 package Gearman::Client;
 
 our $VERSION;
-$VERSION = '1.09';
+$VERSION = '1.10';
 
 use strict;
 use IO::Socket::INET;
@@ -26,11 +24,15 @@
     $self->{sock_cache} = {};
     $self->{hooks} = {};
     $self->{prefix} = '';
+    $self->{exceptions} = 0;
 
     $self->debug($opts{debug}) if $opts{debug};
 
     $self->set_job_servers(@{ $opts{job_servers} })
         if $opts{job_servers};
+
+    $self->{exceptions} = delete $opts{exceptions}
+        if exists $opts{exceptions};
 
     $self->prefix($opts{prefix}) if $opts{prefix};
 
@@ -98,7 +100,7 @@
 
     my $ts = $self->new_task_set;
     $ts->add_task($task);
-    $ts->wait;
+    $ts->wait(timeout => $task->timeout);
 
     return $did_err ? undef : $ret;
 
@@ -113,12 +115,13 @@
     my ($jst, $jss) = $self->_get_random_js_sock;
     return 0 unless $jss;
 
-    my $req = $task->pack_submit_packet("background");
+    my $req = $task->pack_submit_packet($self, "background");
     my $len = length($req);
     my $rv = $jss->write($req, $len);
 
     my $err;
     my $res = Gearman::Util::read_res_packet($jss, \$err);
+    $self->_put_js_sock($jst, $jss);
     return 0 unless $res && $res->{type} eq "job_created";
     return "$jst//${$res->{blobref}}";
 }
@@ -175,6 +178,28 @@
     return Gearman::JobStatus->new(@args);
 }
 
+sub _option_request {
+    my Gearman::Client $self = shift;
+    my $sock = shift;
+    my $option = shift;
+
+    my $req = Gearman::Util::pack_req_command("option_req",
+                                              $option);
+    my $len = length($req);
+    my $rv = $sock->write($req, $len);
+
+    my $err;
+    my $res = Gearman::Util::read_res_packet($sock, \$err);
+
+    return unless $res;
+
+    return 0 if $res->{type} eq "error";
+    return 1 if $res->{type} eq "option_res";
+
+    warn "Got unknown response to option request: $res->{type}\n";
+    return;
+}
+
 # returns a socket from the cache.  it should be returned to the
 # cache with _put_js_sock.  the hostport isn't verified. the caller
 # should verify that $hostport is in the set of jobservers.
@@ -192,6 +217,14 @@
 
     setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
     $sock->autoflush(1);
+
+    # If exceptions support is to be requested, and the request fails, disable
+    # exceptions for this client.
+    if ($self->{exceptions} && ! $self->_option_request($sock, 'exceptions')) {
+        warn "Exceptions support denied by server, disabling.\n";
+        $self->{exceptions} = 0;
+    }
+
     return $sock;
 }
 

Modified: trunk/libgearman-client-perl/lib/Gearman/Objects.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Objects.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Objects.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Objects.pm Wed Jan  6 14:34:39 2010
@@ -13,6 +13,7 @@
             'hooks',       # hookname -> coderef
             'prefix',
             'debug',
+            'exceptions',
             );
 
 package Gearman::Taskset;
@@ -41,11 +42,13 @@
             'uniq',
             'on_complete',
             'on_fail',
+            'on_exception',
             'on_retry',
             'on_status',
             'on_post_hooks',   # used internally, when other hooks are done running, prior to cleanup
             'retry_count',
             'timeout',
+            'try_timeout',
             'high_priority',
 
             # from server:

Modified: trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm Wed Jan  6 14:34:39 2010
@@ -88,6 +88,11 @@
             $self->reset;
         }
     }
+
+    if (defined($self->{pkt}) && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len}) {
+        $self->on_packet($self->{pkt}, $self);
+        $self->reset;
+    }
 }
 
 # don't override:

Modified: trunk/libgearman-client-perl/lib/Gearman/Task.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Task.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Task.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Task.pm Wed Jan  6 14:34:39 2010
@@ -7,6 +7,20 @@
 use Gearman::Taskset;
 use Gearman::Util;
 
+BEGIN {
+    my $storable = eval { require Storable; 1 }
+    if !defined &RECEIVE_EXCEPTIONS || RECEIVE_EXCEPTIONS();
+
+    $storable ||= 0;
+
+    if (defined &RECEIVE_EXCEPTIONS) {
+        die "Exceptions support requires Storable: $@";
+    } else {
+        eval "sub RECEIVE_EXCEPTIONS () { $storable }";
+        die "Couldn't define RECEIVE_EXCEPTIONS: $@\n" if $@;
+    }
+}
+
 # constructor, given: ($func, $argref, $opts);
 sub new {
     my $class = shift;
@@ -22,8 +36,8 @@
 
     my $opts = shift || {};
     for my $k (qw( uniq
-                   on_complete on_fail on_retry on_status
-                   retry_count timeout high_priority
+                   on_complete on_exception on_fail on_retry on_status
+                   retry_count timeout high_priority try_timeout
                )) {
         $self->{$k} = delete $opts->{$k};
     }
@@ -111,6 +125,7 @@
 
 sub pack_submit_packet {
     my Gearman::Task $task = shift;
+    my Gearman::Client $client = shift;
     my $is_background = shift;
 
     my $mode = $is_background ?
@@ -121,7 +136,7 @@
 
     my $func = $task->{func};
 
-    if (my $prefix = $task->{taskset} && $task->{taskset}->client && $task->{taskset}->client->prefix) {
+    if (my $prefix = $client && $client->prefix) {
         $func = join "\t", $prefix, $task->{func};
     }
 
@@ -134,6 +149,7 @@
 
 sub fail {
     my Gearman::Task $task = shift;
+    my $reason = shift;
     return if $task->{is_finished};
 
     # try to retry, if we can
@@ -144,7 +160,7 @@
         return $task->{taskset}->add_task($task);
     }
 
-    $task->final_fail;
+    $task->final_fail($reason);
 }
 
 sub final_fail {
@@ -156,11 +172,22 @@
 
     $task->run_hook('final_fail', $task);
 
-    $task->{on_fail}->()       if $task->{on_fail};
-    $task->{on_post_hooks}->() if $task->{on_post_hooks};
+    $task->{on_fail}->($reason) if $task->{on_fail};
+    $task->{on_post_hooks}->()  if $task->{on_post_hooks};
     $task->wipe;
 
     return undef;
+}
+
+sub exception {
+    my Gearman::Task $task = shift;
+
+    return unless RECEIVE_EXCEPTIONS;
+
+    my $exception_ref = shift;
+    my $exception = Storable::thaw($$exception_ref);
+    $task->{on_exception}->($$exception) if $task->{on_exception};
+    return;
 }
 
 sub complete {
@@ -202,7 +229,7 @@
 
 sub wipe {
     my Gearman::Task $task = shift;
-    foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status)) {
+    foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status hooks)) {
         $task->{$f} = undef;
     }
 }
@@ -210,6 +237,12 @@
 sub func {
     my Gearman::Task $task = shift;
     return $task->{func};
+}
+
+sub timeout {
+    my Gearman::Task $task = shift;
+    return $task->{timeout} unless @_;
+    return $task->{timeout} = shift;
 }
 1;
 __END__
@@ -304,6 +337,12 @@
 called. Defaults to 0, which means never.  Bypasses any retry_count
 remaining.
 
+=item * try_timeout
+
+Automatically fail, calling your on_retry callback (or on_fail if out of
+retries), after this many seconds have elapsed. Defaults to 0, which means
+never.
+
 =back
 
 =head2 $task->is_finished

Modified: trunk/libgearman-client-perl/lib/Gearman/Taskset.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Taskset.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Taskset.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Taskset.pm Wed Jan  6 14:34:39 2010
@@ -20,6 +20,7 @@
     $self->{client}      = $client;
     $self->{loaned_sock} = {};
     $self->{cancelled}   = 0;
+    $self->{hooks}       = {};
 
     return $self;
 }
@@ -113,7 +114,7 @@
     my $timeout;
     if (exists $opts{timeout}) {
         $timeout = delete $opts{timeout};
-        $timeout += Time::HiRes::time();
+        $timeout += Time::HiRes::time() if defined $timeout;
     }
 
     Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
@@ -196,7 +197,7 @@
 
     $ts->run_hook('add_task', $ts, $task);
 
-    my $req = $task->pack_submit_packet;
+    my $req = $task->pack_submit_packet($ts->client);
     my $len = length($req);
     my $rv = $task->{jssock}->syswrite($req, $len);
     die "Wrote $rv but expected to write $len" unless $rv == $len;
@@ -329,6 +330,21 @@
         return 1;
     }
 
+    if ($res->{type} eq "work_exception") {
+        ${ $res->{'blobref'} } =~ s/^(.+?)\0//
+            or die "Bogus work_exception from server";
+        my $shandle = $1;
+        my $task_list = $ts->{waiting}{$shandle} or
+            die "Uhhhh:  got work_exception for unknown handle: $shandle\n";
+
+        my Gearman::Task $task = $task_list->[0] or
+            die "Uhhhh:  task_list is empty on work_exception for handle $shandle\n";
+
+        $task->exception($res->{'blobref'});
+
+        return 1;
+    }
+
     if ($res->{type} eq "work_status") {
         my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
 

Modified: trunk/libgearman-client-perl/lib/Gearman/Util.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Util.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Util.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Util.pm Wed Jan  6 14:34:39 2010
@@ -6,7 +6,7 @@
 # O: out of job server
 # W: worker
 # C: client of job server
-# J : jobserver
+# J: jobserver
 our %cmd = (
             1 =>  [ 'I', "can_do" ],     # from W:  [FUNC]
             23 => [ 'I', "can_do_timeout" ], # from W: FUNC[0]TIMEOUT
@@ -14,6 +14,9 @@
             3 =>  [ 'I', "reset_abilities" ],  # from W:  ---
             22 => [ 'I', "set_client_id" ],    # W->J: [RANDOM_STRING_NO_WHITESPACE]
             4 =>  [ 'I', "pre_sleep" ],  # from W: ---
+
+            26 => [ 'I', "option_req" ], # C->J: [OPT]
+            27 => [ 'O', "option_res" ], # J->C: [OPT]
 
             6 =>  [ 'O', "noop" ],        # J->W  ---
             7 =>  [ 'I', "submit_job" ],    # C->J  FUNC[0]UNIQ[0]ARGS
@@ -28,6 +31,7 @@
             12 => [ 'IO',  "work_status" ],   # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
             13 => [ 'IO',  "work_complete" ], # W->J/C: HANDLE[0]RES
             14 => [ 'IO',  "work_fail" ],     # W->J/C: HANDLE
+            25 => [ 'IO',  "work_exception" ], # W->J/C: HANDLE[0]EXCEPTION
 
             15 => [ 'I',  "get_status" ],  # C->J: HANDLE
             20 => [ 'O',  "status_res" ],  # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
@@ -100,8 +104,20 @@
     return $err->("malformed_magic") unless $magic eq "\0RES";
 
     if ($len) {
-        $rv = sysread($sock, $buf, $len);
-        return $err->("short_body") unless $rv == $len;
+        # Start off trying to read the whole buffer. Store the bits in an array
+        # one element for each read, then do a big join at the end. This minimizes
+        # the number of memory allocations we have to do.
+        my $readlen = $len;
+        my $lim = 20 + int( $len / 2**10 );
+        my @buffers;
+        for (my $i = 0; $readlen > 0 && $i < $lim; $i++) {
+            my $rv = sysread($sock, $buffers[$i], $readlen);
+            return $err->("short_body") unless $rv > 0;
+            last unless $rv > 0;
+            $readlen -= $rv;
+        }
+        $buf = join('', @buffers);
+        return $err->("short_body") unless length($buf) == $len; 
     }
 
     $type = $cmd{$type};

Modified: trunk/libgearman-client-perl/lib/Gearman/Worker.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Worker.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Worker.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Worker.pm Wed Jan  6 14:34:39 2010
@@ -69,13 +69,27 @@
             'last_connect_fail', # host:port -> unixtime
             'down_since',        # host:port -> unixtime
             'connecting',        # host:port -> unixtime connect started at
-            'can',               # func -> subref
-            'timeouts',          # func -> timeouts
+            'can',               # ability -> subref     (ability is func with optional prefix)
+            'timeouts',          # ability -> timeouts
             'client_id',         # random identifer string, no whitespace
             'parent_pipe',       # bool/obj:  if we're a child process of a gearman server,
                                  #   this is socket to our parent process.  also means parent
                                  #   sock can never disconnect or timeout, etc..
             );
+
+BEGIN {
+    my $storable = eval { require Storable; 1 }
+        if !defined &THROW_EXCEPTIONS || THROW_EXCEPTIONS();
+
+    $storable ||= 0;
+
+    if (defined &THROW_EXCEPTIONS) {
+        die "Exceptions support requires Storable: $@";
+    } else {
+        eval "sub THROW_EXCEPTIONS () { $storable }";
+        die "Couldn't define THROW_EXCEPTIONS: $@\n" if $@;
+    }
+}
 
 sub new {
     my ($class, %opts) = @_;
@@ -90,7 +104,7 @@
     $self->{can} = {};
     $self->{timeouts} = {};
     $self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1..30));
-    $self->{prefix}   = '';
+    $self->{prefix}   = undef;
 
     $self->debug($opts{debug}) if $opts{debug};
 
@@ -113,6 +127,10 @@
 sub _get_js_sock {
     my Gearman::Worker $self = shift;
     my $ipport = shift;
+    my %opts = @_;
+
+    my $on_connect = delete $opts{on_connect};
+    # Someday should warn when called with extra opts.
 
     warn "getting job server socket: $ipport" if $self->debug;
 
@@ -153,7 +171,7 @@
 
     $self->{sock_cache}{$ipport} = $sock;
 
-    unless ($self->_on_connect($sock)) {
+    unless ($self->_on_connect($sock) && $on_connect && $on_connect->($sock)) {
         delete $self->{sock_cache}{$ipport};
         return undef;
     }
@@ -171,9 +189,9 @@
     return undef unless Gearman::Util::send_req($sock, \$cid_req);
 
     # get this socket's state caught-up
-    foreach my $func (keys %{$self->{can}}) {
-        my $timeout = $self->{timeouts}->{$func};
-        unless ($self->_set_ability($sock, $func, $timeout)) {
+    foreach my $ability (keys %{$self->{can}}) {
+        my $timeout = $self->{timeouts}->{$ability};
+        unless ($self->_set_ability($sock, $ability, $timeout)) {
             return undef;
         }
     }
@@ -183,15 +201,13 @@
 
 sub _set_ability {
     my Gearman::Worker $self = shift;
-    my ($sock, $func, $timeout) = @_;
-
-    $func = join "\t", $self->prefix, $func if $self->prefix;
+    my ($sock, $ability, $timeout) = @_;
 
     my $req;
     if (defined $timeout) {
-        $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
+        $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
     } else {
-        $req = Gearman::Util::pack_req_command("can_do", $func);
+        $req = Gearman::Util::pack_req_command("can_do", $ability);
     }
     return Gearman::Util::send_req($sock, \$req);
 }
@@ -236,14 +252,32 @@
 
     my $grab_req = Gearman::Util::pack_req_command("grab_job");
     my $presleep_req = Gearman::Util::pack_req_command("pre_sleep");
-    my %fd_map;
+
+    my $last_job_time;
+
+    # "Active" job servers are servers that have woken us up and should be
+    # queried to see if they have jobs for us to handle. On our first pass
+    # in the loop we contact all servers.
+    my %active_js = map { $_ => 1 } @{$self->{job_servers}};
+
+    # ( js => last_update_time, ... )
+    my %last_update_time;
 
     while (1) {
-
-        my @jss;
-        my $need_sleep = 1;
-
-        foreach my $js (@{ $self->{job_servers} }) {
+        # "Jobby" job servers are the set of server which we will contact
+        # on this pass through the loop, because we need to clear and use
+        # the "Active" set to plan for our next pass through the loop.
+        my @jobby_js = keys %active_js;
+
+        %active_js = ();
+
+        my $js_count = @jobby_js;
+        my $js_offset = int(rand($js_count));
+        my $is_idle = 0;
+
+        for (my $i = 0; $i < $js_count; $i++) {
+            my $js_index = ($i + $js_offset) % $js_count;
+            my $js = $jobby_js[$js_index];
             my $jss = $self->_get_js_sock($js)
                 or next;
 
@@ -259,6 +293,7 @@
                     exit(0);
                 }
                 $self->uncache_sock($js, "grab_job_timeout");
+                delete $last_update_time{$js};
                 next;
             }
 
@@ -269,6 +304,7 @@
             my $timeout = $self->{parent_pipe} ? 5 : 0.50;
             unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout)) {
                 $self->uncache_sock($js, "grab_job_timeout");
+                delete $last_update_time{$js};
                 next;
             }
 
@@ -278,13 +314,17 @@
                 $res = Gearman::Util::read_res_packet($jss, \$err);
                 unless ($res) {
                     $self->uncache_sock($js, "read_res_error");
+                    delete $last_update_time{$js};
                     next;
                 }
             } while ($res->{type} eq "noop");
 
-            push @jss, [$js, $jss];
-
             if ($res->{type} eq "no_job") {
+                unless (Gearman::Util::send_req($jss, \$presleep_req)) {
+                    delete $last_update_time{$js};
+                    $self->uncache_sock($js, "write_presleep_error");
+                }
+                $last_update_time{$js} = time;
                 next;
             }
 
@@ -297,20 +337,28 @@
                 die $msg;
             }
 
-            $need_sleep = 0;
-
             ${ $res->{'blobref'} } =~ s/^(.+?)\0(.+?)\0//
                 or die "Uh, regexp on job_assign failed";
-            my ($handle, $func) = ($1, $2);
-            my $job = Gearman::Job->new($func, $res->{'blobref'}, $handle, $jss);
+            my ($handle, $ability) = ($1, $2);
+            my $job = Gearman::Job->new($ability, $res->{'blobref'}, $handle, $jss);
 
             my $jobhandle = "$js//" . $job->handle;
             $start_cb->($jobhandle) if $start_cb;
 
-            my $handler = $self->{can}{$func};
+            my $handler = $self->{can}{$ability};
             my $ret = eval { $handler->($job); };
-            my $err = $@ || '';
-            warn "Job '$func' died: $err" if $err;
+            my $err = $@;
+            warn "Job '$ability' died: $err" if $err;
+
+            $last_update_time{$js} = $last_job_time = time();
+
+            if (THROW_EXCEPTIONS && $err) {
+                my $exception_req = Gearman::Util::pack_req_command("work_exception", join("\0", $handle, Storable::nfreeze(\$err)));
+                unless (Gearman::Util::send_req($jss, \$exception_req)) {
+                    $self->uncache_sock($js, "write_res_error");
+                    next;
+                }
+            }
 
             my $work_req;
             if (defined $ret) {
@@ -324,29 +372,57 @@
 
             unless (Gearman::Util::send_req($jss, \$work_req)) {
                 $self->uncache_sock($js, "write_res_error");
-            }
-        }
-
-        my $is_idle = 0;
-        if ($need_sleep) {
-            $is_idle = 1;
-            my $wake_vec = '';
+                next;
+            }
+
+            $active_js{$js} = 1;
+        }
+
+        my @jss;
+
+        my $on_connect = sub {
+            return Gearman::Util::send_req($_[0], \$presleep_req);
+        };
+
+        foreach my $js (@{$self->{job_servers}}) {
+            my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
+                or next;
+            push @jss, [$js, $jss];
+        }
+
+        $is_idle = 1;
+        my $wake_vec = '';
+
+        foreach my $j (@jss) {
+            my ($js, $jss) = @$j;
+            my $fd = $jss->fileno;
+            vec($wake_vec, $fd, 1) = 1;
+        }
+
+        my $timeout = keys %active_js ? 0 : (10 + rand(2));
+
+
+        # chill for some arbitrary time until we're woken up again
+        my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
+
+        if ($nready) {
             foreach my $j (@jss) {
                 my ($js, $jss) = @$j;
-                unless (Gearman::Util::send_req($jss, \$presleep_req)) {
-                    $self->uncache_sock($js, "write_presleep_error");
-                    next;
-                }
                 my $fd = $jss->fileno;
-                vec($wake_vec, $fd, 1) = 1;
-            }
-
-            # chill for some arbitrary time until we're woken up again
-            my $nready = select($wake_vec, undef, undef, 10);
-            $is_idle = 0 if $nready;
-        }
-
-        return if $stop_if->($is_idle);
+                $active_js{$js} = 1
+                    if vec($wout, $fd, 1);
+            }
+        }
+
+        $is_idle = 0 if keys %active_js;
+
+        return if $stop_if->($is_idle, $last_job_time);
+
+        my $update_since = time - (15 + rand 60);
+
+        while (my ($js, $last_update) = each %last_update_time) {
+            $active_js{$js} = 1 if $last_update < $update_since;
+        }
     }
 
 }
@@ -357,18 +433,32 @@
     my $timeout = shift unless (ref $_[0] eq 'CODE');
     my $subref = shift;
 
-    $func = join "\t", $self->prefix, $func if $self->prefix;
+    my $prefix = $self->prefix;
+    my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
 
     my $req;
     if (defined $timeout) {
-        $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
-        $self->{timeouts}{$func} = $timeout;
+        $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
+        $self->{timeouts}{$ability} = $timeout;
     } else {
-        $req = Gearman::Util::pack_req_command("can_do", $func);
+        $req = Gearman::Util::pack_req_command("can_do", $ability);
     }
 
     $self->_register_all($req);
-    $self->{can}{$func} = $subref;
+    $self->{can}{$ability} = $subref;
+}
+
+sub unregister_function {
+    my Gearman::Worker $self = shift;
+    my $func = shift;
+
+    my $prefix = $self->prefix;
+    my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
+
+    my $req = Gearman::Util::pack_req_command("cant_do", $ability);
+
+    $self->_register_all($req);
+    delete $self->{can}{$ability};
 }
 
 sub _register_all {

Modified: trunk/libgearman-client-perl/t/10-all.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/10-all.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/10-all.t (original)
+++ trunk/libgearman-client-perl/t/10-all.t Wed Jan  6 14:34:39 2010
@@ -8,7 +8,7 @@
 use TestGearman;
 
 if (start_server(PORT)) {
-    plan tests => 32;
+    plan tests => 33;
 } else {
     plan skip_all => "Can't find server to test with";
     exit 0;
@@ -31,7 +31,7 @@
 start_worker(PORT, $NUM_SERVERS);
 start_worker(PORT, $NUM_SERVERS);
 
-my $client = Gearman::Client->new;
+my $client = Gearman::Client->new(exceptions => 1);
 isa_ok($client, 'Gearman::Client');
 $client->job_servers(map { '127.0.0.1:' . (PORT + $_) } 0..$NUM_SERVERS);
 
@@ -70,6 +70,16 @@
 ## Test some failure conditions:
 ## Normal failure (worker returns undef or dies within eval).
 is($client->do_task('fail'), undef, 'Job that failed naturally returned undef');
+
+## the die message is available in the on_fail sub
+my $msg = undef;
+$tasks = $client->new_task_set;
+$tasks->add_task('fail_die', undef, {
+        on_exception => sub { $msg = shift },
+});
+$tasks->wait;
+like($msg, qr/test reason/, 'the die message is available in the on_fail sub');
+
 ## Worker process exits.
 is($client->do_task('fail_exit'), undef,
     'Job that failed via exit returned undef');
@@ -222,7 +232,3 @@
     $status = $client->get_status($handle);
 } until $status->percent == 1;
 
-
-
-
-

Modified: trunk/libgearman-client-perl/t/30-maxqueue.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/30-maxqueue.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/30-maxqueue.t (original)
+++ trunk/libgearman-client-perl/t/30-maxqueue.t Wed Jan  6 14:34:39 2010
@@ -54,8 +54,8 @@
 }
 $tasks->wait;
 
-is($completed, 2, 'number of success'); # One starts immediately and on the queue
-is($failed, 3, 'number of failure'); # All the rest
+ok($completed == 2 || $completed == 1, 'number of success'); # One in the queue, plus one that may start immediately
+ok($failed == 3 || $failed== 4, 'number of failure'); # All the rest
 
 
 

Modified: trunk/libgearman-client-perl/t/40-prefix.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/40-prefix.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/40-prefix.t (original)
+++ trunk/libgearman-client-perl/t/40-prefix.t Wed Jan  6 14:34:39 2010
@@ -4,6 +4,7 @@
 use Gearman::Client;
 use Storable qw( freeze );
 use Test::More;
+use Time::HiRes 'sleep';
 
 use lib 't';
 use TestGearman;
@@ -11,7 +12,7 @@
 
 
 if (start_server(PORT)) {
-    plan tests => 8;
+    plan tests => 9;
 } else {
     plan skip_all => "Can't find server to test with";
     exit 0;
@@ -63,4 +64,17 @@
     is($out{$k}, "$k from prefix_$k", "taskset from client_$k");
 }
 
+## dispatch_background tasks also support prefixing
+my $bg_task = Gearman::Task->new('echo_sleep', \('sleep prefix test'));
+my $handle = $client_a->dispatch_background($bg_task);
 
+## wait for the task to be done
+my $status;
+my $n = 0;
+do {
+    sleep 0.1;
+    $n++;
+    diag "still waiting..." if $n == 12;
+    $status = $client_a->get_status($handle);
+} until $status->percent == 1 or $n == 20;
+is $status->percent, 1, "Background task completed using prefix";

Modified: trunk/libgearman-client-perl/t/worker.pl
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/worker.pl?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/worker.pl (original)
+++ trunk/libgearman-client-perl/t/worker.pl Wed Jan  6 14:34:39 2010
@@ -3,7 +3,7 @@
 
 use lib 'lib';
 use Gearman::Worker;
-use Storable qw( thaw );
+use Storable qw(thaw nfreeze);
 use Getopt::Long qw( GetOptions );
 
 GetOptions(
@@ -25,6 +25,7 @@
 });
 
 $worker->register_function(fail => sub { undef });
+$worker->register_function(fail_die => sub { die 'test reason' });
 $worker->register_function(fail_exit => sub { exit 255 });
 
 $worker->register_function(sleep => sub { sleep $_[0]->arg });
@@ -43,6 +44,13 @@
     join " from ", $_[0]->arg, $prefix;
 });
 
+$worker->register_function(echo_sleep => sub {
+    my($job) = @_;
+    $job->set_status(1, 1);
+    sleep 2; ## allow some time to read the status
+    join " from ", $_[0]->arg, $prefix;
+});
+
 
 $worker->register_function(long => sub {
     my($job) = @_;
@@ -50,9 +58,35 @@
     sleep 2;
     $job->set_status(100, 100);
     sleep 2;
+    return $job->arg;
 });
 
 my $nsig;
 $nsig = kill 'USR1', $notifypid if $notifypid;
 
-$worker->work while 1;
+my $work_exit = 0;
+
+$worker->register_function(work_exit => sub {
+    $work_exit = 1;
+});
+
+my ($is_idle, $last_job_time);
+
+$worker->register_function(check_stop_if => sub {
+    return nfreeze([$is_idle, $last_job_time]);
+});
+
+
+
+my $stop_if = sub {
+    ($is_idle, $last_job_time) = @_;
+
+    if ($work_exit) {
+        $work_exit = 0;
+        return 1;
+    }
+
+    return 0;
+};
+
+$worker->work(stop_if => $stop_if) while (1);




More information about the Pkg-perl-cvs-commits mailing list