r51542 - in /trunk/gearman-server: ./ debian/ lib/Gearman/ lib/Gearman/Server/
jawnsy-guest at users.alioth.debian.org
jawnsy-guest at users.alioth.debian.org
Mon Jan 25 15:22:42 UTC 2010
Author: jawnsy-guest
Date: Mon Jan 25 15:22:36 2010
New Revision: 51542
URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=51542
Log:
* New upstream release
* Rename gearman-server.{default,init} to libgearman-server-perl...
Added:
trunk/gearman-server/MANIFEST.SKIP
- copied unchanged from r51540, branches/upstream/gearman-server/current/MANIFEST.SKIP
trunk/gearman-server/debian/libgearman-server-perl.default
- copied unchanged from r51540, trunk/gearman-server/debian/gearman-server.default
trunk/gearman-server/debian/libgearman-server-perl.init
- copied unchanged from r51540, trunk/gearman-server/debian/gearman-server.init
trunk/gearman-server/debian/libgearman-server-perl.install
trunk/gearman-server/lib/Gearman/Server/Listener.pm
- copied unchanged from r51540, branches/upstream/gearman-server/current/lib/Gearman/Server/Listener.pm
Removed:
trunk/gearman-server/debian/gearman-server.default
trunk/gearman-server/debian/gearman-server.init
Modified:
trunk/gearman-server/CHANGES
trunk/gearman-server/MANIFEST
trunk/gearman-server/META.yml
trunk/gearman-server/debian/changelog
trunk/gearman-server/gearmand
trunk/gearman-server/lib/Gearman/Server.pm
trunk/gearman-server/lib/Gearman/Server/Client.pm
trunk/gearman-server/lib/Gearman/Server/Job.pm
Modified: trunk/gearman-server/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/CHANGES?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/CHANGES (original)
+++ trunk/gearman-server/CHANGES Mon Jan 25 15:22:36 2010
@@ -1,3 +1,47 @@
+1.11 2010-01-17
+
+ * Don't get stuck in the wakeup loop. Dummy.
+
+1.10 2009-10-04
+
+ * Read client input more aggressively. Speed improvement.
+
+ * Add text commands 'jobs' and 'clients' which give information allowing you to trace
+ jobs from clients to workers while they are running.
+
+ * Flush buffers to client on EOF (assume half-closed). This makes things like netcat
+ work better as a way to speak text protocol to the gearmand.
+
+ * Add command-line option to adjust a delay before more workers are woken up.
+ This acts as an anti-starvation mechanism in case of lower wake up counts.
+ -Default is .1 seconds, formerly this option was not needed because all workers
+ were woken up at the time of job submission.
+
+ * Add command-line option to adjust number of workers to wake up per job injected.
+ -Default is 3, formerly was -1 (wake up all as fast as possible)
+
+ * Add command-line option to change the number of sockets accepted at once per
+ listener socket.
+ -Default is now 10, formerly used to be 1.
+
+ * Add exceptions passing support to gearman server classes, using new options
+ support.
+
+ * Add options support to server clients, so they can subscribe to newer protocol
+ features.
+
+ * Change listening socket to be a real Danga::Socket subclass, this allows
+ pausing for a period of time when we run into accept errors. This will
+ fix the problem of gearmand spinning 100% cpu in those cases.
+
+ * Make gearmand a little more vocal about socket accept errors.
+
+ * add fast read concept to server reading from client codepath. This drastically
+ improves performance of jobs over 1k in size.
+
+ * fix in-process client and start_worker method calls to use non-blocking
+ sockets like they should.
+
1.09 2007-05-09
* make start_worker (for making worker child processes), return
Modified: trunk/gearman-server/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/MANIFEST?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/MANIFEST (original)
+++ trunk/gearman-server/MANIFEST Mon Jan 25 15:22:36 2010
@@ -3,6 +3,8 @@
lib/Gearman/Server.pm
lib/Gearman/Server/Client.pm
lib/Gearman/Server/Job.pm
+lib/Gearman/Server/Listener.pm
Makefile.PL
MANIFEST This list of files
-META.yml Module meta-data (added by MakeMaker)
+MANIFEST.SKIP
+META.yml Module meta-data (added by MakeMaker)
Modified: trunk/gearman-server/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/META.yml?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/META.yml (original)
+++ trunk/gearman-server/META.yml Mon Jan 25 15:22:36 2010
@@ -1,7 +1,7 @@
# http://module-build.sourceforge.net/META-spec.html
#XXXXXXX This is a prototype!!! It will change in the future!!! XXXXX#
name: Gearman-Server
-version: 1.09
+version: 1.11
version_from: lib/Gearman/Server.pm
installdirs: site
requires:
@@ -9,4 +9,4 @@
Gearman::Util: 0
distribution_type: module
-generated_by: ExtUtils::MakeMaker version 6.17
+generated_by: ExtUtils::MakeMaker version 6.30
Modified: trunk/gearman-server/debian/changelog
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/debian/changelog?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/debian/changelog (original)
+++ trunk/gearman-server/debian/changelog Mon Jan 25 15:22:36 2010
@@ -1,6 +1,7 @@
-gearman-server (1.09-2) UNRELEASED; urgency=low
+gearman-server (1.11-1) UNRELEASED; urgency=low
* Take over package (Closes: #549362)
+ * New upstream release
* Rewrite control description
* Use new short debhelper 7 rules format
* Standards-Version 3.8.3 (no changes)
@@ -8,8 +9,9 @@
* Rename binary package to libgearman-server-perl; the dummy
package can be removed after upgrade to squeeze
* Add a watch file
+ * Rename gearman-server.{default,init} to libgearman-server-perl...
- -- Jonathan Yu <jawnsy at cpan.org> Mon, 25 Jan 2010 10:27:14 -0500
+ -- Jonathan Yu <jawnsy at cpan.org> Mon, 25 Jan 2010 10:31:28 -0500
gearman-server (1.09-1) unstable; urgency=low
Added: trunk/gearman-server/debian/libgearman-server-perl.install
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/debian/libgearman-server-perl.install?rev=51542&op=file
==============================================================================
--- trunk/gearman-server/debian/libgearman-server-perl.install (added)
+++ trunk/gearman-server/debian/libgearman-server-perl.install Mon Jan 25 15:22:36 2010
@@ -1,0 +1,1 @@
+debian/tmp/usr/*
Modified: trunk/gearman-server/gearmand
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/gearmand?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/gearmand (original)
+++ trunk/gearman-server/gearmand Mon Jan 25 15:22:36 2010
@@ -34,6 +34,33 @@
=item --debug=1
Enable debugging (currently the only debug output is when a client or worker connects).
+
+=item --accept=10
+
+Number of new connections to accept each time we see a listening socket ready. This doesn't usually
+need to be tuned by anyone, however in dire circumstances you may need to do it quickly.
+
+=item --wakeup=3
+
+Number of workers to wake up per job inserted into the queue.
+
+Zero (0) is a perfectly acceptable answer, and can be used if you don't care much about job latency.
+This would bank on the base idea of a worker checking in with the server every so often.
+
+Negative One (-1) indicates that all sleeping workers should be woken up.
+
+All other negative numbers will cause the server to throw exception and not start.
+
+=item --wakeup-delay=
+
+Time interval before waking up more workers (the value specified by --wakeup) when jobs are still in
+the queue.
+
+Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on --wakeup, but
+is more cooperative in gearmand's multitasking model.
+
+Negative One (-1) means that this event won't happe, so only the initial workers will be woken up to
+handle jobs in the queue.
=back
@@ -91,6 +118,9 @@
$nokeepalive,
$notify_pid,
$opt_pidfile,
+ $accept,
+ $wakeup,
+ $wakeup_delay,
);
my $conf_port = 7003;
@@ -98,7 +128,10 @@
'd|daemonize' => \$daemonize,
'p|port=i' => \$conf_port,
'debug=i' => \$DEBUG,
- 'pidfile=s' => \$opt_pidfile,
+ 'pidfile=s' => \$opt_pidfile,
+ 'accept=i' => \$accept,
+ 'wakeup=i' => \$wakeup,
+ 'wakeup-delay=f' => \$wakeup_delay,
'notifypid|n=i' => \$notify_pid, # for test suite only.
);
@@ -110,8 +143,11 @@
$SIG{'PIPE'} = "IGNORE"; # handled manually
-my $server = Gearman::Server->new;
-my $ssock = $server->create_listening_sock($conf_port);
+my $server = Gearman::Server->new(
+ wakeup => $wakeup,
+ wakeup_delay => $wakeup_delay,
+ );
+my $ssock = $server->create_listening_sock($conf_port, accept_per_loop => $accept);
if ($opt_pidfile) {
open my $fh, '>', $opt_pidfile or die "Could not open $opt_pidfile: $!";
Modified: trunk/gearman-server/lib/Gearman/Server.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server.pm Mon Jan 25 15:22:36 2010
@@ -1,23 +1,74 @@
package Gearman::Server;
+
+=head1 NAME
+
+Gearman::Server - function call "router" and load balancer
+
+=head1 DESCRIPTION
+
+You run a Gearman server (or more likely, many of them for both
+high-availability and load balancing), then have workers (using
+L<Gearman::Worker> from the Gearman module, or libraries for other
+languages) register their ability to do certain functions to all of
+them, and then clients (using L<Gearman::Client>,
+L<Gearman::Client::Async>, etc) request work to be done from one of
+the Gearman servers.
+
+The servers connect them, routing function call requests to the
+appropriate workers, multiplexing responses to duplicate requests as
+requested, etc.
+
+More than likely, you want to use the provided L<gearmand> wrapper
+script, and not use Gearman::Server directly.
+
+=cut
+
use strict;
use Gearman::Server::Client;
+use Gearman::Server::Listener;
use Gearman::Server::Job;
-use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
+use Socket qw(IPPROTO_TCP SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
use Carp qw(croak);
use Sys::Hostname ();
+use IO::Handle ();
use fields (
'client_map', # fd -> Client
'sleepers', # func -> { "Client=HASH(0xdeadbeef)" => Client }
+ 'sleepers_list', # func -> [ Client, ... ], ...
'job_queue', # job_name -> [Job, Job*] (key only exists if non-empty)
'job_of_handle', # handle -> Job
'max_queue', # func -> configured max jobqueue size
'job_of_uniq', # func -> uniq -> Job
'handle_ct', # atomic counter
'handle_base', # atomic counter
+ 'listeners', # arrayref of listener objects
+ 'wakeup', # number of workers to wake
+ 'wakeup_delay', # seconds to wait before waking more workers
+ 'wakeup_timers', # func -> timer, timer to be canceled or adjusted when job grab/inject is called
);
-our $VERSION = "1.09";
+our $VERSION = "1.11";
+
+=head1 METHODS
+
+=head2 new
+
+ $server_object = Gearman::Server->new( %options )
+
+Creates and returns a new Gearman::Server object, which attaches itself to the Danga::Socket event loop. The server will begin operating when the Danga::Socket runloop is started. This means you need to start up the runloop before anything will happen.
+
+Options:
+
+=over
+
+=item port
+
+Specify a port which you would like the Gearman::Server to listen on for TCP connections (not necessary, but useful)
+
+=back
+
+=cut
sub new {
my ($class, %opts) = @_;
@@ -25,15 +76,37 @@
$self->{client_map} = {};
$self->{sleepers} = {};
+ $self->{sleepers_list} = {};
$self->{job_queue} = {};
$self->{job_of_handle} = {};
$self->{max_queue} = {};
$self->{job_of_uniq} = {};
+ $self->{listeners} = [];
+ $self->{wakeup} = 3;
+ $self->{wakeup_delay} = .1;
+ $self->{wakeup_timers} = {};
$self->{handle_ct} = 0;
$self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";
my $port = delete $opts{port};
+
+ my $wakeup = delete $opts{wakeup};
+
+ if (defined $wakeup) {
+ die "Invalid value passed in wakeup option"
+ if $wakeup < 0 && $wakeup != -1;
+ $self->{wakeup} = $wakeup;
+ }
+
+ my $wakeup_delay = delete $opts{wakeup_delay};
+
+ if (defined $wakeup_delay) {
+ die "Invalid value passed in wakeup_delay option"
+ if $wakeup_delay < 0 && $wakeup_delay != -1;
+ $self->{wakeup_delay} = $wakeup_delay;
+ }
+
croak("Unknown options") if %opts;
$self->create_listening_sock($port);
@@ -45,36 +118,34 @@
#warn "$msg\n";
}
+=head2 create_listening_sock
+
+ $server_object->create_listening_sock( $portnum )
+
+Add a TCP port listener for incoming Gearman worker and client connections.
+
+=cut
+
sub create_listening_sock {
- my ($self, $portnum) = @_;
+ my ($self, $portnum, %opts) = @_;
+
+ my $accept_per_loop = delete $opts{accept_per_loop};
+
+ warn "Extra options passed into create_listening_sock: " . join(', ', keys %opts) . "\n"
+ if keys %opts;
+
my $ssock = IO::Socket::INET->new(LocalPort => $portnum,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
- Listen => 10 )
+ Listen => 1024 )
or die "Error creating socket: $@\n";
- $self->setup_listening_sock($ssock);
+ my $listeners = $self->{listeners};
+ push @$listeners, Gearman::Server::Listener->new($ssock, $self, accept_per_loop => $accept_per_loop);
+
return $ssock;
-}
-
-sub setup_listening_sock {
- my ($self, $ssock) = @_;
-
- # make sure provided listening socket is non-blocking
- IO::Handle::blocking($ssock, 0);
- Danga::Socket->AddOtherFds(fileno($ssock) => sub {
- my $csock = $ssock->accept
- or return;
-
- $self->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));
-
- IO::Handle::blocking($csock, 0);
- setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
-
- $self->new_client($csock);
- });
}
sub new_client {
@@ -107,6 +178,9 @@
$csock->autoflush(1);
$psock->autoflush(1);
+ IO::Handle::blocking($csock, 0);
+ IO::Handle::blocking($psock, 0);
+
my $client = Gearman::Server::Client->new($csock, $self);
my ($package, $file, $line) = caller;
@@ -116,6 +190,16 @@
return $psock;
}
+
+=head2 start_worker
+
+ $pid = $server_object->start_worker( $prog )
+
+ ($pid, $client) = $server_object->start_worker( $prog )
+
+Fork and start a worker process named by C<$prog> and returns the pid (or pid and client object).
+
+=cut
sub start_worker {
my ($self, $prog) = @_;
@@ -149,6 +233,8 @@
}
close($psock);
+
+ IO::Handle::blocking($csock, 0);
my $sock = $csock;
my $client = Gearman::Server::Client->new($sock, $self);
@@ -184,21 +270,62 @@
sub wake_up_sleepers {
my ($self, $func) = @_;
+
+ if (my $existing_timer = delete($self->{wakeup_timers}->{$func})) {
+ $existing_timer->cancel();
+ }
+
+ return unless $self->_wake_up_some($func);
+
+ my $delay = $self->{wakeup_delay};
+
+ # -1 means don't setup a timer. 0 actually means go as fast as we can, cooperatively.
+ return if $delay == -1;
+
+ # If we're only going to wakeup 0 workers anyways, don't set up a timer.
+ return if $self->{wakeup} == 0;
+
+ my $timer = Danga::Socket->AddTimer($delay, sub {
+ # Be sure to not wake up more sleepers if we have no jobs in the queue.
+ # I know the object definition above says I can trust the func element to determine
+ # if there are items in the list, but I'm just gonna be safe, rather than sorry.
+ return unless @{$self->{job_queue}{$func} || []};
+ $self->wake_up_sleepers($func)
+ });
+ $self->{wakeup_timers}->{$func} = $timer;
+}
+
+# Returns true when there are still more workers to wake up
+# False if there are no sleepers
+sub _wake_up_some {
+ my ($self, $func) = @_;
my $sleepmap = $self->{sleepers}{$func} or return;
-
- foreach my $addr (keys %$sleepmap) {
- my Gearman::Server::Client $c = $sleepmap->{$addr};
+ my $sleeporder = $self->{sleepers_list}{$func} or return;
+
+ # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC
+
+ my $max = $self->{wakeup};
+
+ while (@$sleeporder) {
+ my Gearman::Server::Client $c = shift @$sleeporder;
next if $c->{closed} || ! $c->{sleeping};
+ if ($max-- <= 0) {
+ unshift @$sleeporder, $c;
+ return 1;
+ }
+ delete $sleepmap->{"$c"};
$c->res_packet("noop");
$c->{sleeping} = 0;
}
delete $self->{sleepers}{$func};
+ delete $self->{sleepers_list}{$func};
return;
}
sub on_client_sleep {
- my ($self, $cl) = @_;
+ my $self = shift;
+ my Gearman::Server::Client $cl = shift;
foreach my $cd (@{$cl->{can_do_list}}) {
# immediately wake the sleeper up if there are things to be done
@@ -209,7 +336,24 @@
}
my $sleepmap = ($self->{sleepers}{$cd} ||= {});
- $sleepmap->{"$cl"} ||= $cl;
+ my $count = $sleepmap->{"$cl"}++;
+
+ next if $count >= 2;
+
+ my $sleeporder = ($self->{sleepers_list}{$cd} ||= []);
+
+ # The idea here is to keep workers at the head of the list if they are doing work, hopefully
+ # this will allow extra workers that aren't needed to actually go 'idle' safely.
+ my $jobs_done = $cl->{jobs_done_since_sleep};
+
+ if ($jobs_done) {
+ unshift @$sleeporder, $cl;
+ } else {
+ push @$sleeporder, $cl;
+ }
+
+ $cl->{jobs_done_since_sleep} = 0;
+
}
}
@@ -231,6 +375,10 @@
sub note_job_finished {
my Gearman::Server $self = shift;
my Gearman::Server::Job $job = shift;
+
+ if (my Gearman::Server::Client $worker = $job->worker) {
+ $worker->{jobs_done_since_sleep}++;
+ }
if (length($job->{uniq})) {
delete $self->{job_of_uniq}{$job->{func}}{$job->{uniq}};
@@ -278,7 +426,7 @@
while (1) {
$job = shift @{$self->{job_queue}{$func}};
return $empty->() unless $job;
- return $job unless $job->{require_listener};
+ return $job unless $job->require_listener;
foreach my Gearman::Server::Client $c (@{$job->{listeners}}) {
return $job if $c && ! $c->{closed};
@@ -291,27 +439,6 @@
1;
__END__
-=head1 NAME
-
-Gearman::Server - function call "router" and load balancer
-
-=head1 DESCRIPTION
-
-You run a Gearman server (or more likely, many of them for both
-high-availability and load balancing), then have workers (using
-L<Gearman::Worker> from the Gearman module, or libraries for other
-languages) register their ability to do certain functions to all of
-them, and then clients (using L<Gearman::Client>,
-L<Gearman::Client::Async>, etc) request work to be done from one of
-the Gearman servers.
-
-The servers connect them, routing function call requests to the
-appropriate workers, multiplexing responses to duplicate requests as
-requested, etc.
-
-More than likely, you want to use the provided L<gearmand> wrapper
-script, and not use Gearman::Server directly.
-
=head1 SEE ALSO
L<gearmand>
Modified: trunk/gearman-server/lib/Gearman/Server/Client.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server/Client.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server/Client.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server/Client.pm Mon Jan 25 15:22:36 2010
@@ -27,6 +27,8 @@
'can_do', # { $job_name => $timeout } $timeout can be undef indicating no timeout
'can_do_list',
'can_do_iter',
+ 'fast_read',
+ 'fast_buffer',
'read_buf',
'sleeping', # 0/1: they've said they're sleeping and we haven't woken them up
'timer', # Timer for job cancellation
@@ -34,7 +36,13 @@
'client_id', # opaque string, no whitespace. workers give this so checker scripts
# can tell apart the same worker connected to multiple jobservers.
'server', # pointer up to client's server
+ 'options',
+ 'jobs_done_since_sleep',
);
+
+# 60k read buffer default, similar to perlbal's backend read.
+use constant READ_SIZE => 60 * 1024;
+use constant MAX_READ_SIZE => 512 * 1024;
# Class Method:
sub new {
@@ -43,6 +51,8 @@
$self = fields::new($self) unless ref $self;
$self->SUPER::new($sock);
+ $self->{fast_read} = undef; # Number of bytes to read as fast as we can (don't try to process them)
+ $self->{fast_buffer} = []; # Array of buffers used during fast read operation
$self->{read_buf} = '';
$self->{sleeping} = 0;
$self->{can_do} = {};
@@ -51,19 +61,43 @@
$self->{can_do_iter} = 0; # numeric iterator for where we start looking for jobs
$self->{client_id} = "-";
$self->{server} = $server;
+ $self->{options} = {};
+ $self->{jobs_done_since_sleep} = 0;
return $self;
}
+sub option {
+ my Gearman::Server::Client $self = shift;
+ my $option = shift;
+
+ return $self->{options}->{$option};
+}
+
sub close {
my Gearman::Server::Client $self = shift;
- while (my ($handle, $job) = each %{$self->{doing}}) {
+ my $doing = $self->{doing};
+
+ while (my ($handle, $job) = each %$doing) {
my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
$job->relay_to_listeners($msg);
$job->note_finished(0);
}
+ # Clear the doing list, since it may contain a set of jobs which contain
+ # references back to us.
+ %$doing = ();
+
+ # Remove self from sleepers, otherwise it will be leaked if another worker
+ # for the job never connects.
+ my $sleepers = $self->{server}{sleepers};
+ for my $job (@{ $self->{can_do_list} }) {
+ my $sleeping = $sleepers->{$job};
+ delete $sleeping->{$self};
+ delete $sleepers->{$job} unless %$sleeping;
+ }
+
$self->{server}->note_disconnected_client($self);
$self->CMD_reset_abilities;
@@ -75,9 +109,40 @@
sub event_read {
my Gearman::Server::Client $self = shift;
- my $bref = $self->read(1024);
- return $self->close unless defined $bref;
- $self->{read_buf} .= $$bref;
+ my $read_size = $self->{fast_read} || READ_SIZE;
+ my $bref = $self->read($read_size);
+
+ # Delay close till after buffers are written on EOF. If we are unable
+ # to write 'err' or 'hup' will be thrown and we'll close faster.
+ return $self->write(sub { $self->close } ) unless defined $bref;
+
+ if ($self->{fast_read}) {
+ push @{$self->{fast_buffer}}, $$bref;
+ $self->{fast_read} -= length($$bref);
+
+ # If fast_read is still positive, then we need to read more data
+ return if ($self->{fast_read} > 0);
+
+ # Append the whole giant read buffer to our main read buffer
+ $self->{read_buf} .= join('', @{$self->{fast_buffer}});
+
+ # Reset the fast read state for next time.
+ $self->{fast_buffer} = [];
+ $self->{fast_read} = undef;
+ } else {
+ # Exact read size length likely means we have more sitting on the
+ # socket. Buffer up to half a meg in one go.
+ if (length($$bref) == READ_SIZE) {
+ my $limit = int(MAX_READ_SIZE / READ_SIZE);
+ my @crefs = ($$bref);
+ while (my $cref = $self->read(READ_SIZE)) {
+ push(@crefs, $$cref);
+ last if (length($$cref) < READ_SIZE || $limit-- < 1);
+ }
+ $bref = \join('', @crefs);
+ }
+ $self->{read_buf} .= $$bref;
+ }
my $found_cmd;
do {
@@ -87,8 +152,9 @@
if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
my ($cmd, $len) = unpack("NN", $1);
if ($blen < $len + 12) {
- # not here yet.
- $found_cmd = 0;
+ # Start a fast read loop to get all the data we need, less
+ # what we already have in the buffer.
+ $self->{fast_read} = $len + 12 - $blen;
return;
}
@@ -215,6 +281,22 @@
return 1;
}
+sub CMD_work_exception {
+ my Gearman::Server::Client $self = shift;
+ my $ar = shift;
+
+ $$ar =~ s/^(.+?)\0//;
+ my $handle = $1;
+ my $job = $self->{doing}{$handle};
+
+ return $self->error_packet("not_worker") unless $job && $job->worker == $self;
+
+ my $msg = Gearman::Util::pack_res_command("work_exception", join("\0", $handle, $$ar));
+ $job->relay_to_option_listeners($msg, "exceptions");
+
+ return 1;
+}
+
sub CMD_pre_sleep {
my Gearman::Server::Client $self = shift;
$self->{'sleeping'} = 1;
@@ -293,6 +375,22 @@
}
$self->_setup_can_do_list;
+}
+
+sub CMD_option_req {
+ my Gearman::Server::Client $self = shift;
+ my $ar = shift;
+
+ my $success = sub {
+ return $self->res_packet("option_res", $$ar);
+ };
+
+ if ($$ar eq 'exceptions') {
+ $self->{options}->{exceptions} = 1;
+ return $success->();
+ }
+
+ return $self->error_packet("unknown_option");
}
sub CMD_set_client_id {
@@ -407,7 +505,7 @@
$self->$cmd_name(\$blob);
};
return $ret unless $@;
- print "Error: $@\n";
+ warn "Error: $@\n";
return $self->error_packet("server_error", $@);
}
@@ -507,6 +605,99 @@
}
$self->write( ".\n" );
+}
+
+=head2 "jobs"
+
+Output format is zero or more lines of:
+
+ [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n
+
+Follows by a single line of:
+
+ .\n
+
+\t is a literal tab character
+\n is perl's definition of newline (literal \n on linux, something else on win32)
+
+=cut
+
+sub TXTCMD_jobs {
+ my Gearman::Server::Client $self = shift;
+
+ foreach my $job ($self->{server}->jobs) {
+ my $func = $job->func;
+ my $uniq = $job->uniq;
+ my $worker_addr = "-";
+
+ if (my $worker = $job->worker) {
+ $worker_addr = $worker->peer_addr_string;
+ }
+
+ my $listeners = $job->listeners;
+
+ $self->write("$func\t$uniq\t$worker_addr\t$listeners\n");
+ }
+
+ $self->write(".\n");
+}
+
+=head2 "clients"
+
+Output format is zero or more sections of:
+
+=over
+
+One line of:
+
+ [Client Address]\n
+
+Followed by zero or more lines of:
+
+ \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n
+
+=back
+
+Follows by a single line of:
+
+ .\n
+
+\t is a literal tab character
+\n is perl's definition of newline (literal \n on linux, something else on win32)
+
+=cut
+
+sub TXTCMD_clients {
+ my Gearman::Server::Client $self = shift;
+
+ my %jobs_by_client;
+
+ foreach my $job ($self->{server}->jobs) {
+ foreach my $client ($job->listeners) {
+ my $ent = $jobs_by_client{$client} ||= [];
+ push @$ent, $job;
+ }
+ }
+
+ foreach my $client ($self->{server}->clients) {
+ my $client_addr = $client->peer_addr_string;
+ $self->write("$client_addr\n");
+ my $jobs = $jobs_by_client{$client} || [];
+
+ foreach my $job (@$jobs) {
+ my $func = $job->func;
+ my $uniq = $job->uniq;
+ my $worker_addr = "-";
+
+ if (my $worker = $job->worker) {
+ $worker_addr = $worker->peer_addr_string;
+ }
+ $self->write("\t$func\t$uniq\t$worker_addr\n");
+ }
+
+ }
+
+ $self->write(".\n");
}
sub TXTCMD_gladiator {
Modified: trunk/gearman-server/lib/Gearman/Server/Job.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server/Job.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server/Job.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server/Job.pm Mon Jan 25 15:22:36 2010
@@ -1,5 +1,6 @@
package Gearman::Server::Job;
use strict;
+use Scalar::Util;
use Sys::Hostname;
use fields (
@@ -61,9 +62,30 @@
}
}
+sub relay_to_option_listeners {
+ my Gearman::Server::Job $self = shift;
+ my $option = $_[1];
+ foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
+ next if !$c || $c->{closed};
+ next unless $c->option($option);
+ $c->write($_[0]);
+ }
+
+}
+
sub clear_listeners {
my Gearman::Server::Job $self = shift;
$self->{listeners} = [];
+}
+
+sub listeners {
+ my Gearman::Server::Job $self = shift;
+ return @{$self->{listeners}};
+}
+
+sub uniq {
+ my Gearman::Server::Job $self = shift;
+ return $self->{uniq};
}
sub note_finished {
More information about the Pkg-perl-cvs-commits
mailing list