r50363 - in /trunk/libgearman-client-perl: ./ debian/ lib/Gearman/ t/
jawnsy-guest at users.alioth.debian.org
jawnsy-guest at users.alioth.debian.org
Wed Jan 6 14:34:45 UTC 2010
Author: jawnsy-guest
Date: Wed Jan 6 14:34:39 2010
New Revision: 50363
URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=50363
Log:
New upstream release
Added:
trunk/libgearman-client-perl/t/51-large_args.t
- copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/51-large_args.t
trunk/libgearman-client-perl/t/60-stop-if.t
- copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/60-stop-if.t
trunk/libgearman-client-perl/t/65-responseparser.t
- copied unchanged from r50362, branches/upstream/libgearman-client-perl/current/t/65-responseparser.t
Modified:
trunk/libgearman-client-perl/CHANGES
trunk/libgearman-client-perl/MANIFEST
trunk/libgearman-client-perl/META.yml
trunk/libgearman-client-perl/debian/changelog
trunk/libgearman-client-perl/lib/Gearman/Client.pm
trunk/libgearman-client-perl/lib/Gearman/Objects.pm
trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm
trunk/libgearman-client-perl/lib/Gearman/Task.pm
trunk/libgearman-client-perl/lib/Gearman/Taskset.pm
trunk/libgearman-client-perl/lib/Gearman/Util.pm
trunk/libgearman-client-perl/lib/Gearman/Worker.pm
trunk/libgearman-client-perl/t/10-all.t
trunk/libgearman-client-perl/t/30-maxqueue.t
trunk/libgearman-client-perl/t/40-prefix.t
trunk/libgearman-client-perl/t/worker.pl
Modified: trunk/libgearman-client-perl/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/CHANGES?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/CHANGES (original)
+++ trunk/libgearman-client-perl/CHANGES Wed Jan 6 14:34:39 2010
@@ -1,3 +1,42 @@
+1.10 (2009-10-04)
+
+ -- Make workers wake up periodically for a particular server to make sure they aren't
+ stale connections. This happened naturally (although at relatively low interval) in
+ previous releases.
+
+ -- Help prevent leaks when Gearman::Task->add_hook is used.
+
+ -- Change worker to only 'wake up' against a gearmand that has woken it up, this prevents
+ a constant flood of pre-sleep command from arriving at an otherwise silent gearmand.
+
+ -- Fix issue where prefixes were double-prepended on function names on worker
+ upon reconnect.
+
+ -- Fix issue where the other response parser code in Util would silently truncate
+ argument strings longer than 20*the socket buffer size.
+
+ -- Fix issue where the ResponseParser class would not correctly handle messages with
+ zero-length bodies.
+
+ -- Make the Gearman::Task class autoload Storable and fail gracefully when it's not
+ loadable.
+
+ -- Initial fold-in of exceptions support in the gearman client, makes an option
+ to the gearman server to enable it, and is disabled by default. Workers will
+ will attempt to throw exceptions anytime Storable is available.
+
+ -- fix jobs of > 32kilobytes in size so they work properly (workers would disconnect
+ when a job greater than 32kb would arrive.)
+
+ -- expose the time that the last job was processed in the stop_if hook of worker.
+ Since a jobserver wakes up all workers in the case of a new job to be processed
+ the concept of is_idle doesn't actually reflect if a worker has procssed jobs,
+ rather it indicates whether the jobserver has been silent for 10 seconds.
+
+ -- change server polling order in workers to start at a random point in the list
+ during every worker pass. So we drain jobs from all servers rather than
+ working on each of them in order.
+
1.09 (2007-06-29)
-- document the license and copyright
Modified: trunk/libgearman-client-perl/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/MANIFEST?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/MANIFEST (original)
+++ trunk/libgearman-client-perl/MANIFEST Wed Jan 6 14:34:39 2010
@@ -19,7 +19,10 @@
t/30-maxqueue.t
t/40-prefix.t
t/50-wait_timeout.t
+t/51-large_args.t
+t/60-stop-if.t
t/lib/GearTestLib.pm
t/TestGearman.pm
t/worker.pl
+t/65-responseparser.t
TODO
Modified: trunk/libgearman-client-perl/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/META.yml?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/META.yml (original)
+++ trunk/libgearman-client-perl/META.yml Wed Jan 6 14:34:39 2010
@@ -1,11 +1,11 @@
# http://module-build.sourceforge.net/META-spec.html
#XXXXXXX This is a prototype!!! It will change in the future!!! XXXXX#
name: Gearman
-version: 1.09
+version: 1.10
version_from: lib/Gearman/Client.pm
installdirs: site
requires:
String::CRC32: 0
distribution_type: module
-generated_by: ExtUtils::MakeMaker version 6.17
+generated_by: ExtUtils::MakeMaker version 6.30
Modified: trunk/libgearman-client-perl/debian/changelog
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/debian/changelog?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/debian/changelog (original)
+++ trunk/libgearman-client-perl/debian/changelog Wed Jan 6 14:34:39 2010
@@ -1,6 +1,7 @@
-libgearman-client-perl (1.09-1.1) UNRELEASED; urgency=low
+libgearman-client-perl (1.10-1) UNRELEASED; urgency=low
* Take over package (Closes: #549364)
+ * New upstream release
* Use new short debhelper 7 rules format
* No longer install README and HACKING
* Update watch file to match upstream
@@ -9,7 +10,7 @@
* Slight rewrite of the control description
* Refresh copyright information to DEP5 format
- -- Jonathan Yu <jawnsy at cpan.org> Tue, 05 Jan 2010 17:46:49 -0500
+ -- Jonathan Yu <jawnsy at cpan.org> Wed, 06 Jan 2010 09:36:17 -0500
libgearman-client-perl (1.09-1) unstable; urgency=low
Modified: trunk/libgearman-client-perl/lib/Gearman/Client.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Client.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Client.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Client.pm Wed Jan 6 14:34:39 2010
@@ -1,11 +1,9 @@
#!/usr/bin/perl
-#TODO: timeout isn't supported by this client API yet.
-
package Gearman::Client;
our $VERSION;
-$VERSION = '1.09';
+$VERSION = '1.10';
use strict;
use IO::Socket::INET;
@@ -26,11 +24,15 @@
$self->{sock_cache} = {};
$self->{hooks} = {};
$self->{prefix} = '';
+ $self->{exceptions} = 0;
$self->debug($opts{debug}) if $opts{debug};
$self->set_job_servers(@{ $opts{job_servers} })
if $opts{job_servers};
+
+ $self->{exceptions} = delete $opts{exceptions}
+ if exists $opts{exceptions};
$self->prefix($opts{prefix}) if $opts{prefix};
@@ -98,7 +100,7 @@
my $ts = $self->new_task_set;
$ts->add_task($task);
- $ts->wait;
+ $ts->wait(timeout => $task->timeout);
return $did_err ? undef : $ret;
@@ -113,12 +115,13 @@
my ($jst, $jss) = $self->_get_random_js_sock;
return 0 unless $jss;
- my $req = $task->pack_submit_packet("background");
+ my $req = $task->pack_submit_packet($self, "background");
my $len = length($req);
my $rv = $jss->write($req, $len);
my $err;
my $res = Gearman::Util::read_res_packet($jss, \$err);
+ $self->_put_js_sock($jst, $jss);
return 0 unless $res && $res->{type} eq "job_created";
return "$jst//${$res->{blobref}}";
}
@@ -175,6 +178,28 @@
return Gearman::JobStatus->new(@args);
}
+sub _option_request {
+ my Gearman::Client $self = shift;
+ my $sock = shift;
+ my $option = shift;
+
+ my $req = Gearman::Util::pack_req_command("option_req",
+ $option);
+ my $len = length($req);
+ my $rv = $sock->write($req, $len);
+
+ my $err;
+ my $res = Gearman::Util::read_res_packet($sock, \$err);
+
+ return unless $res;
+
+ return 0 if $res->{type} eq "error";
+ return 1 if $res->{type} eq "option_res";
+
+ warn "Got unknown response to option request: $res->{type}\n";
+ return;
+}
+
# returns a socket from the cache. it should be returned to the
# cache with _put_js_sock. the hostport isn't verified. the caller
# should verify that $hostport is in the set of jobservers.
@@ -192,6 +217,14 @@
setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
$sock->autoflush(1);
+
+ # If exceptions support is to be requested, and the request fails, disable
+ # exceptions for this client.
+ if ($self->{exceptions} && ! $self->_option_request($sock, 'exceptions')) {
+ warn "Exceptions support denied by server, disabling.\n";
+ $self->{exceptions} = 0;
+ }
+
return $sock;
}
Modified: trunk/libgearman-client-perl/lib/Gearman/Objects.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Objects.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Objects.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Objects.pm Wed Jan 6 14:34:39 2010
@@ -13,6 +13,7 @@
'hooks', # hookname -> coderef
'prefix',
'debug',
+ 'exceptions',
);
package Gearman::Taskset;
@@ -41,11 +42,13 @@
'uniq',
'on_complete',
'on_fail',
+ 'on_exception',
'on_retry',
'on_status',
'on_post_hooks', # used internally, when other hooks are done running, prior to cleanup
'retry_count',
'timeout',
+ 'try_timeout',
'high_priority',
# from server:
Modified: trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/ResponseParser.pm Wed Jan 6 14:34:39 2010
@@ -88,6 +88,11 @@
$self->reset;
}
}
+
+ if (defined($self->{pkt}) && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len}) {
+ $self->on_packet($self->{pkt}, $self);
+ $self->reset;
+ }
}
# don't override:
Modified: trunk/libgearman-client-perl/lib/Gearman/Task.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Task.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Task.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Task.pm Wed Jan 6 14:34:39 2010
@@ -7,6 +7,20 @@
use Gearman::Taskset;
use Gearman::Util;
+BEGIN {
+ my $storable = eval { require Storable; 1 }
+ if !defined &RECEIVE_EXCEPTIONS || RECEIVE_EXCEPTIONS();
+
+ $storable ||= 0;
+
+ if (defined &RECEIVE_EXCEPTIONS) {
+ die "Exceptions support requires Storable: $@";
+ } else {
+ eval "sub RECEIVE_EXCEPTIONS () { $storable }";
+ die "Couldn't define RECEIVE_EXCEPTIONS: $@\n" if $@;
+ }
+}
+
# constructor, given: ($func, $argref, $opts);
sub new {
my $class = shift;
@@ -22,8 +36,8 @@
my $opts = shift || {};
for my $k (qw( uniq
- on_complete on_fail on_retry on_status
- retry_count timeout high_priority
+ on_complete on_exception on_fail on_retry on_status
+ retry_count timeout high_priority try_timeout
)) {
$self->{$k} = delete $opts->{$k};
}
@@ -111,6 +125,7 @@
sub pack_submit_packet {
my Gearman::Task $task = shift;
+ my Gearman::Client $client = shift;
my $is_background = shift;
my $mode = $is_background ?
@@ -121,7 +136,7 @@
my $func = $task->{func};
- if (my $prefix = $task->{taskset} && $task->{taskset}->client && $task->{taskset}->client->prefix) {
+ if (my $prefix = $client && $client->prefix) {
$func = join "\t", $prefix, $task->{func};
}
@@ -134,6 +149,7 @@
sub fail {
my Gearman::Task $task = shift;
+ my $reason = shift;
return if $task->{is_finished};
# try to retry, if we can
@@ -144,7 +160,7 @@
return $task->{taskset}->add_task($task);
}
- $task->final_fail;
+ $task->final_fail($reason);
}
sub final_fail {
@@ -156,11 +172,22 @@
$task->run_hook('final_fail', $task);
- $task->{on_fail}->() if $task->{on_fail};
- $task->{on_post_hooks}->() if $task->{on_post_hooks};
+ $task->{on_fail}->($reason) if $task->{on_fail};
+ $task->{on_post_hooks}->() if $task->{on_post_hooks};
$task->wipe;
return undef;
+}
+
+sub exception {
+ my Gearman::Task $task = shift;
+
+ return unless RECEIVE_EXCEPTIONS;
+
+ my $exception_ref = shift;
+ my $exception = Storable::thaw($$exception_ref);
+ $task->{on_exception}->($$exception) if $task->{on_exception};
+ return;
}
sub complete {
@@ -202,7 +229,7 @@
sub wipe {
my Gearman::Task $task = shift;
- foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status)) {
+ foreach my $f (qw(on_post_hooks on_complete on_fail on_retry on_status hooks)) {
$task->{$f} = undef;
}
}
@@ -210,6 +237,12 @@
sub func {
my Gearman::Task $task = shift;
return $task->{func};
+}
+
+sub timeout {
+ my Gearman::Task $task = shift;
+ return $task->{timeout} unless @_;
+ return $task->{timeout} = shift;
}
1;
__END__
@@ -304,6 +337,12 @@
called. Defaults to 0, which means never. Bypasses any retry_count
remaining.
+=item * try_timeout
+
+Automatically fail, calling your on_retry callback (or on_fail if out of
+retries), after this many seconds have elapsed. Defaults to 0, which means
+never.
+
=back
=head2 $task->is_finished
Modified: trunk/libgearman-client-perl/lib/Gearman/Taskset.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Taskset.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Taskset.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Taskset.pm Wed Jan 6 14:34:39 2010
@@ -20,6 +20,7 @@
$self->{client} = $client;
$self->{loaned_sock} = {};
$self->{cancelled} = 0;
+ $self->{hooks} = {};
return $self;
}
@@ -113,7 +114,7 @@
my $timeout;
if (exists $opts{timeout}) {
$timeout = delete $opts{timeout};
- $timeout += Time::HiRes::time();
+ $timeout += Time::HiRes::time() if defined $timeout;
}
Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
@@ -196,7 +197,7 @@
$ts->run_hook('add_task', $ts, $task);
- my $req = $task->pack_submit_packet;
+ my $req = $task->pack_submit_packet($ts->client);
my $len = length($req);
my $rv = $task->{jssock}->syswrite($req, $len);
die "Wrote $rv but expected to write $len" unless $rv == $len;
@@ -329,6 +330,21 @@
return 1;
}
+ if ($res->{type} eq "work_exception") {
+ ${ $res->{'blobref'} } =~ s/^(.+?)\0//
+ or die "Bogus work_exception from server";
+ my $shandle = $1;
+ my $task_list = $ts->{waiting}{$shandle} or
+ die "Uhhhh: got work_exception for unknown handle: $shandle\n";
+
+ my Gearman::Task $task = $task_list->[0] or
+ die "Uhhhh: task_list is empty on work_exception for handle $shandle\n";
+
+ $task->exception($res->{'blobref'});
+
+ return 1;
+ }
+
if ($res->{type} eq "work_status") {
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
Modified: trunk/libgearman-client-perl/lib/Gearman/Util.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Util.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Util.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Util.pm Wed Jan 6 14:34:39 2010
@@ -6,7 +6,7 @@
# O: out of job server
# W: worker
# C: client of job server
-# J : jobserver
+# J: jobserver
our %cmd = (
1 => [ 'I', "can_do" ], # from W: [FUNC]
23 => [ 'I', "can_do_timeout" ], # from W: FUNC[0]TIMEOUT
@@ -14,6 +14,9 @@
3 => [ 'I', "reset_abilities" ], # from W: ---
22 => [ 'I', "set_client_id" ], # W->J: [RANDOM_STRING_NO_WHITESPACE]
4 => [ 'I', "pre_sleep" ], # from W: ---
+
+ 26 => [ 'I', "option_req" ], # C->J: [OPT]
+ 27 => [ 'O', "option_res" ], # J->C: [OPT]
6 => [ 'O', "noop" ], # J->W ---
7 => [ 'I', "submit_job" ], # C->J FUNC[0]UNIQ[0]ARGS
@@ -28,6 +31,7 @@
12 => [ 'IO', "work_status" ], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
13 => [ 'IO', "work_complete" ], # W->J/C: HANDLE[0]RES
14 => [ 'IO', "work_fail" ], # W->J/C: HANDLE
+ 25 => [ 'IO', "work_exception" ], # W->J/C: HANDLE[0]EXCEPTION
15 => [ 'I', "get_status" ], # C->J: HANDLE
20 => [ 'O', "status_res" ], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
@@ -100,8 +104,20 @@
return $err->("malformed_magic") unless $magic eq "\0RES";
if ($len) {
- $rv = sysread($sock, $buf, $len);
- return $err->("short_body") unless $rv == $len;
+ # Start off trying to read the whole buffer. Store the bits in an array
+ # one element for each read, then do a big join at the end. This minimizes
+ # the number of memory allocations we have to do.
+ my $readlen = $len;
+ my $lim = 20 + int( $len / 2**10 );
+ my @buffers;
+ for (my $i = 0; $readlen > 0 && $i < $lim; $i++) {
+ my $rv = sysread($sock, $buffers[$i], $readlen);
+ return $err->("short_body") unless $rv > 0;
+ last unless $rv > 0;
+ $readlen -= $rv;
+ }
+ $buf = join('', @buffers);
+ return $err->("short_body") unless length($buf) == $len;
}
$type = $cmd{$type};
Modified: trunk/libgearman-client-perl/lib/Gearman/Worker.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/lib/Gearman/Worker.pm?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/lib/Gearman/Worker.pm (original)
+++ trunk/libgearman-client-perl/lib/Gearman/Worker.pm Wed Jan 6 14:34:39 2010
@@ -69,13 +69,27 @@
'last_connect_fail', # host:port -> unixtime
'down_since', # host:port -> unixtime
'connecting', # host:port -> unixtime connect started at
- 'can', # func -> subref
- 'timeouts', # func -> timeouts
+ 'can', # ability -> subref (ability is func with optional prefix)
+ 'timeouts', # ability -> timeouts
'client_id', # random identifer string, no whitespace
'parent_pipe', # bool/obj: if we're a child process of a gearman server,
# this is socket to our parent process. also means parent
# sock can never disconnect or timeout, etc..
);
+
+BEGIN {
+ my $storable = eval { require Storable; 1 }
+ if !defined &THROW_EXCEPTIONS || THROW_EXCEPTIONS();
+
+ $storable ||= 0;
+
+ if (defined &THROW_EXCEPTIONS) {
+ die "Exceptions support requires Storable: $@";
+ } else {
+ eval "sub THROW_EXCEPTIONS () { $storable }";
+ die "Couldn't define THROW_EXCEPTIONS: $@\n" if $@;
+ }
+}
sub new {
my ($class, %opts) = @_;
@@ -90,7 +104,7 @@
$self->{can} = {};
$self->{timeouts} = {};
$self->{client_id} = join("", map { chr(int(rand(26)) + 97) } (1..30));
- $self->{prefix} = '';
+ $self->{prefix} = undef;
$self->debug($opts{debug}) if $opts{debug};
@@ -113,6 +127,10 @@
sub _get_js_sock {
my Gearman::Worker $self = shift;
my $ipport = shift;
+ my %opts = @_;
+
+ my $on_connect = delete $opts{on_connect};
+ # Someday should warn when called with extra opts.
warn "getting job server socket: $ipport" if $self->debug;
@@ -153,7 +171,7 @@
$self->{sock_cache}{$ipport} = $sock;
- unless ($self->_on_connect($sock)) {
+ unless ($self->_on_connect($sock) && $on_connect && $on_connect->($sock)) {
delete $self->{sock_cache}{$ipport};
return undef;
}
@@ -171,9 +189,9 @@
return undef unless Gearman::Util::send_req($sock, \$cid_req);
# get this socket's state caught-up
- foreach my $func (keys %{$self->{can}}) {
- my $timeout = $self->{timeouts}->{$func};
- unless ($self->_set_ability($sock, $func, $timeout)) {
+ foreach my $ability (keys %{$self->{can}}) {
+ my $timeout = $self->{timeouts}->{$ability};
+ unless ($self->_set_ability($sock, $ability, $timeout)) {
return undef;
}
}
@@ -183,15 +201,13 @@
sub _set_ability {
my Gearman::Worker $self = shift;
- my ($sock, $func, $timeout) = @_;
-
- $func = join "\t", $self->prefix, $func if $self->prefix;
+ my ($sock, $ability, $timeout) = @_;
my $req;
if (defined $timeout) {
- $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
+ $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
} else {
- $req = Gearman::Util::pack_req_command("can_do", $func);
+ $req = Gearman::Util::pack_req_command("can_do", $ability);
}
return Gearman::Util::send_req($sock, \$req);
}
@@ -236,14 +252,32 @@
my $grab_req = Gearman::Util::pack_req_command("grab_job");
my $presleep_req = Gearman::Util::pack_req_command("pre_sleep");
- my %fd_map;
+
+ my $last_job_time;
+
+ # "Active" job servers are servers that have woken us up and should be
+ # queried to see if they have jobs for us to handle. On our first pass
+ # in the loop we contact all servers.
+ my %active_js = map { $_ => 1 } @{$self->{job_servers}};
+
+ # ( js => last_update_time, ... )
+ my %last_update_time;
while (1) {
-
- my @jss;
- my $need_sleep = 1;
-
- foreach my $js (@{ $self->{job_servers} }) {
+ # "Jobby" job servers are the set of server which we will contact
+ # on this pass through the loop, because we need to clear and use
+ # the "Active" set to plan for our next pass through the loop.
+ my @jobby_js = keys %active_js;
+
+ %active_js = ();
+
+ my $js_count = @jobby_js;
+ my $js_offset = int(rand($js_count));
+ my $is_idle = 0;
+
+ for (my $i = 0; $i < $js_count; $i++) {
+ my $js_index = ($i + $js_offset) % $js_count;
+ my $js = $jobby_js[$js_index];
my $jss = $self->_get_js_sock($js)
or next;
@@ -259,6 +293,7 @@
exit(0);
}
$self->uncache_sock($js, "grab_job_timeout");
+ delete $last_update_time{$js};
next;
}
@@ -269,6 +304,7 @@
my $timeout = $self->{parent_pipe} ? 5 : 0.50;
unless (Gearman::Util::wait_for_readability($jss->fileno, $timeout)) {
$self->uncache_sock($js, "grab_job_timeout");
+ delete $last_update_time{$js};
next;
}
@@ -278,13 +314,17 @@
$res = Gearman::Util::read_res_packet($jss, \$err);
unless ($res) {
$self->uncache_sock($js, "read_res_error");
+ delete $last_update_time{$js};
next;
}
} while ($res->{type} eq "noop");
- push @jss, [$js, $jss];
-
if ($res->{type} eq "no_job") {
+ unless (Gearman::Util::send_req($jss, \$presleep_req)) {
+ delete $last_update_time{$js};
+ $self->uncache_sock($js, "write_presleep_error");
+ }
+ $last_update_time{$js} = time;
next;
}
@@ -297,20 +337,28 @@
die $msg;
}
- $need_sleep = 0;
-
${ $res->{'blobref'} } =~ s/^(.+?)\0(.+?)\0//
or die "Uh, regexp on job_assign failed";
- my ($handle, $func) = ($1, $2);
- my $job = Gearman::Job->new($func, $res->{'blobref'}, $handle, $jss);
+ my ($handle, $ability) = ($1, $2);
+ my $job = Gearman::Job->new($ability, $res->{'blobref'}, $handle, $jss);
my $jobhandle = "$js//" . $job->handle;
$start_cb->($jobhandle) if $start_cb;
- my $handler = $self->{can}{$func};
+ my $handler = $self->{can}{$ability};
my $ret = eval { $handler->($job); };
- my $err = $@ || '';
- warn "Job '$func' died: $err" if $err;
+ my $err = $@;
+ warn "Job '$ability' died: $err" if $err;
+
+ $last_update_time{$js} = $last_job_time = time();
+
+ if (THROW_EXCEPTIONS && $err) {
+ my $exception_req = Gearman::Util::pack_req_command("work_exception", join("\0", $handle, Storable::nfreeze(\$err)));
+ unless (Gearman::Util::send_req($jss, \$exception_req)) {
+ $self->uncache_sock($js, "write_res_error");
+ next;
+ }
+ }
my $work_req;
if (defined $ret) {
@@ -324,29 +372,57 @@
unless (Gearman::Util::send_req($jss, \$work_req)) {
$self->uncache_sock($js, "write_res_error");
- }
- }
-
- my $is_idle = 0;
- if ($need_sleep) {
- $is_idle = 1;
- my $wake_vec = '';
+ next;
+ }
+
+ $active_js{$js} = 1;
+ }
+
+ my @jss;
+
+ my $on_connect = sub {
+ return Gearman::Util::send_req($_[0], \$presleep_req);
+ };
+
+ foreach my $js (@{$self->{job_servers}}) {
+ my $jss = $self->_get_js_sock($js, on_connect => $on_connect)
+ or next;
+ push @jss, [$js, $jss];
+ }
+
+ $is_idle = 1;
+ my $wake_vec = '';
+
+ foreach my $j (@jss) {
+ my ($js, $jss) = @$j;
+ my $fd = $jss->fileno;
+ vec($wake_vec, $fd, 1) = 1;
+ }
+
+ my $timeout = keys %active_js ? 0 : (10 + rand(2));
+
+
+ # chill for some arbitrary time until we're woken up again
+ my $nready = select(my $wout = $wake_vec, undef, undef, $timeout);
+
+ if ($nready) {
foreach my $j (@jss) {
my ($js, $jss) = @$j;
- unless (Gearman::Util::send_req($jss, \$presleep_req)) {
- $self->uncache_sock($js, "write_presleep_error");
- next;
- }
my $fd = $jss->fileno;
- vec($wake_vec, $fd, 1) = 1;
- }
-
- # chill for some arbitrary time until we're woken up again
- my $nready = select($wake_vec, undef, undef, 10);
- $is_idle = 0 if $nready;
- }
-
- return if $stop_if->($is_idle);
+ $active_js{$js} = 1
+ if vec($wout, $fd, 1);
+ }
+ }
+
+ $is_idle = 0 if keys %active_js;
+
+ return if $stop_if->($is_idle, $last_job_time);
+
+ my $update_since = time - (15 + rand 60);
+
+ while (my ($js, $last_update) = each %last_update_time) {
+ $active_js{$js} = 1 if $last_update < $update_since;
+ }
}
}
@@ -357,18 +433,32 @@
my $timeout = shift unless (ref $_[0] eq 'CODE');
my $subref = shift;
- $func = join "\t", $self->prefix, $func if $self->prefix;
+ my $prefix = $self->prefix;
+ my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
my $req;
if (defined $timeout) {
- $req = Gearman::Util::pack_req_command("can_do_timeout", "$func\0$timeout");
- $self->{timeouts}{$func} = $timeout;
+ $req = Gearman::Util::pack_req_command("can_do_timeout", "$ability\0$timeout");
+ $self->{timeouts}{$ability} = $timeout;
} else {
- $req = Gearman::Util::pack_req_command("can_do", $func);
+ $req = Gearman::Util::pack_req_command("can_do", $ability);
}
$self->_register_all($req);
- $self->{can}{$func} = $subref;
+ $self->{can}{$ability} = $subref;
+}
+
+sub unregister_function {
+ my Gearman::Worker $self = shift;
+ my $func = shift;
+
+ my $prefix = $self->prefix;
+ my $ability = defined($prefix) ? "$prefix\t$func" : "$func";
+
+ my $req = Gearman::Util::pack_req_command("cant_do", $ability);
+
+ $self->_register_all($req);
+ delete $self->{can}{$ability};
}
sub _register_all {
Modified: trunk/libgearman-client-perl/t/10-all.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/10-all.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/10-all.t (original)
+++ trunk/libgearman-client-perl/t/10-all.t Wed Jan 6 14:34:39 2010
@@ -8,7 +8,7 @@
use TestGearman;
if (start_server(PORT)) {
- plan tests => 32;
+ plan tests => 33;
} else {
plan skip_all => "Can't find server to test with";
exit 0;
@@ -31,7 +31,7 @@
start_worker(PORT, $NUM_SERVERS);
start_worker(PORT, $NUM_SERVERS);
-my $client = Gearman::Client->new;
+my $client = Gearman::Client->new(exceptions => 1);
isa_ok($client, 'Gearman::Client');
$client->job_servers(map { '127.0.0.1:' . (PORT + $_) } 0..$NUM_SERVERS);
@@ -70,6 +70,16 @@
## Test some failure conditions:
## Normal failure (worker returns undef or dies within eval).
is($client->do_task('fail'), undef, 'Job that failed naturally returned undef');
+
+## the die message is available in the on_fail sub
+my $msg = undef;
+$tasks = $client->new_task_set;
+$tasks->add_task('fail_die', undef, {
+ on_exception => sub { $msg = shift },
+});
+$tasks->wait;
+like($msg, qr/test reason/, 'the die message is available in the on_fail sub');
+
## Worker process exits.
is($client->do_task('fail_exit'), undef,
'Job that failed via exit returned undef');
@@ -222,7 +232,3 @@
$status = $client->get_status($handle);
} until $status->percent == 1;
-
-
-
-
Modified: trunk/libgearman-client-perl/t/30-maxqueue.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/30-maxqueue.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/30-maxqueue.t (original)
+++ trunk/libgearman-client-perl/t/30-maxqueue.t Wed Jan 6 14:34:39 2010
@@ -54,8 +54,8 @@
}
$tasks->wait;
-is($completed, 2, 'number of success'); # One starts immediately and on the queue
-is($failed, 3, 'number of failure'); # All the rest
+ok($completed == 2 || $completed == 1, 'number of success'); # One in the queue, plus one that may start immediately
+ok($failed == 3 || $failed== 4, 'number of failure'); # All the rest
Modified: trunk/libgearman-client-perl/t/40-prefix.t
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/40-prefix.t?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/40-prefix.t (original)
+++ trunk/libgearman-client-perl/t/40-prefix.t Wed Jan 6 14:34:39 2010
@@ -4,6 +4,7 @@
use Gearman::Client;
use Storable qw( freeze );
use Test::More;
+use Time::HiRes 'sleep';
use lib 't';
use TestGearman;
@@ -11,7 +12,7 @@
if (start_server(PORT)) {
- plan tests => 8;
+ plan tests => 9;
} else {
plan skip_all => "Can't find server to test with";
exit 0;
@@ -63,4 +64,17 @@
is($out{$k}, "$k from prefix_$k", "taskset from client_$k");
}
+## dispatch_background tasks also support prefixing
+my $bg_task = Gearman::Task->new('echo_sleep', \('sleep prefix test'));
+my $handle = $client_a->dispatch_background($bg_task);
+## wait for the task to be done
+my $status;
+my $n = 0;
+do {
+ sleep 0.1;
+ $n++;
+ diag "still waiting..." if $n == 12;
+ $status = $client_a->get_status($handle);
+} until $status->percent == 1 or $n == 20;
+is $status->percent, 1, "Background task completed using prefix";
Modified: trunk/libgearman-client-perl/t/worker.pl
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/libgearman-client-perl/t/worker.pl?rev=50363&op=diff
==============================================================================
--- trunk/libgearman-client-perl/t/worker.pl (original)
+++ trunk/libgearman-client-perl/t/worker.pl Wed Jan 6 14:34:39 2010
@@ -3,7 +3,7 @@
use lib 'lib';
use Gearman::Worker;
-use Storable qw( thaw );
+use Storable qw(thaw nfreeze);
use Getopt::Long qw( GetOptions );
GetOptions(
@@ -25,6 +25,7 @@
});
$worker->register_function(fail => sub { undef });
+$worker->register_function(fail_die => sub { die 'test reason' });
$worker->register_function(fail_exit => sub { exit 255 });
$worker->register_function(sleep => sub { sleep $_[0]->arg });
@@ -43,6 +44,13 @@
join " from ", $_[0]->arg, $prefix;
});
+$worker->register_function(echo_sleep => sub {
+ my($job) = @_;
+ $job->set_status(1, 1);
+ sleep 2; ## allow some time to read the status
+ join " from ", $_[0]->arg, $prefix;
+});
+
$worker->register_function(long => sub {
my($job) = @_;
@@ -50,9 +58,35 @@
sleep 2;
$job->set_status(100, 100);
sleep 2;
+ return $job->arg;
});
my $nsig;
$nsig = kill 'USR1', $notifypid if $notifypid;
-$worker->work while 1;
+my $work_exit = 0;
+
+$worker->register_function(work_exit => sub {
+ $work_exit = 1;
+});
+
+my ($is_idle, $last_job_time);
+
+$worker->register_function(check_stop_if => sub {
+ return nfreeze([$is_idle, $last_job_time]);
+});
+
+
+
+my $stop_if = sub {
+ ($is_idle, $last_job_time) = @_;
+
+ if ($work_exit) {
+ $work_exit = 0;
+ return 1;
+ }
+
+ return 0;
+};
+
+$worker->work(stop_if => $stop_if) while (1);
More information about the Pkg-perl-cvs-commits
mailing list