r51542 - in /trunk/gearman-server: ./ debian/ lib/Gearman/ lib/Gearman/Server/

jawnsy-guest at users.alioth.debian.org jawnsy-guest at users.alioth.debian.org
Mon Jan 25 15:22:42 UTC 2010


Author: jawnsy-guest
Date: Mon Jan 25 15:22:36 2010
New Revision: 51542

URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=51542
Log:
* New upstream release
* Rename gearman-server.{default,init} to libgearman-server-perl...

Added:
    trunk/gearman-server/MANIFEST.SKIP
      - copied unchanged from r51540, branches/upstream/gearman-server/current/MANIFEST.SKIP
    trunk/gearman-server/debian/libgearman-server-perl.default
      - copied unchanged from r51540, trunk/gearman-server/debian/gearman-server.default
    trunk/gearman-server/debian/libgearman-server-perl.init
      - copied unchanged from r51540, trunk/gearman-server/debian/gearman-server.init
    trunk/gearman-server/debian/libgearman-server-perl.install
    trunk/gearman-server/lib/Gearman/Server/Listener.pm
      - copied unchanged from r51540, branches/upstream/gearman-server/current/lib/Gearman/Server/Listener.pm
Removed:
    trunk/gearman-server/debian/gearman-server.default
    trunk/gearman-server/debian/gearman-server.init
Modified:
    trunk/gearman-server/CHANGES
    trunk/gearman-server/MANIFEST
    trunk/gearman-server/META.yml
    trunk/gearman-server/debian/changelog
    trunk/gearman-server/gearmand
    trunk/gearman-server/lib/Gearman/Server.pm
    trunk/gearman-server/lib/Gearman/Server/Client.pm
    trunk/gearman-server/lib/Gearman/Server/Job.pm

Modified: trunk/gearman-server/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/CHANGES?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/CHANGES (original)
+++ trunk/gearman-server/CHANGES Mon Jan 25 15:22:36 2010
@@ -1,3 +1,47 @@
+1.11 2010-01-17
+
+        * Don't get stuck in the wakeup loop. Dummy.
+
+1.10 2009-10-04
+
+        * Read client input more aggressively. Speed improvement.
+
+        * Add text commands 'jobs' and 'clients' which give information allowing you to trace
+          jobs from clients to workers while they are running.
+
+        * Flush buffers to client on EOF (assume half-closed). This makes things like netcat
+          work better as a way to speak text protocol to the gearmand.
+
+        * Add command-line option to adjust a delay before more workers are woken up.
+          This acts as an anti-starvation mechanism in case of lower wake up counts.
+            -Default is .1 seconds, formerly this option was not needed because all workers
+             were woken up at the time of job submission.
+
+        * Add command-line option to adjust number of workers to wake up per job injected.
+            -Default is 3, formerly was -1 (wake up all as fast as possible)
+
+        * Add command-line option to change the number of sockets accepted at once per
+          listener socket.
+            -Default is now 10, formerly used to be 1.
+
+        * Add exceptions passing support to gearman server classes, using new options
+          support.
+
+        * Add options support to server clients, so they can subscribe to newer protocol
+          features.
+
+        * Change listening socket to be a real Danga::Socket subclass, this allows
+          pausing for a period of time when we run into accept errors. This will
+          fix the problem of gearmand spinning 100% cpu in those cases.
+
+        * Make gearmand a little more vocal about socket accept errors.
+
+        * add fast read concept to server reading from client codepath. This drastically
+          improves performance of jobs over 1k in size.
+
+        * fix in-process client and start_worker method calls to use non-blocking
+          sockets like they should.
+
 1.09 2007-05-09
 
         * make start_worker (for making worker child processes), return

Modified: trunk/gearman-server/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/MANIFEST?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/MANIFEST (original)
+++ trunk/gearman-server/MANIFEST Mon Jan 25 15:22:36 2010
@@ -3,6 +3,8 @@
 lib/Gearman/Server.pm
 lib/Gearman/Server/Client.pm
 lib/Gearman/Server/Job.pm
+lib/Gearman/Server/Listener.pm
 Makefile.PL
 MANIFEST			This list of files
-META.yml                                 Module meta-data (added by MakeMaker)
+MANIFEST.SKIP
+META.yml			Module meta-data (added by MakeMaker)

Modified: trunk/gearman-server/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/META.yml?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/META.yml (original)
+++ trunk/gearman-server/META.yml Mon Jan 25 15:22:36 2010
@@ -1,7 +1,7 @@
 # http://module-build.sourceforge.net/META-spec.html
 #XXXXXXX This is a prototype!!!  It will change in the future!!! XXXXX#
 name:         Gearman-Server
-version:      1.09
+version:      1.11
 version_from: lib/Gearman/Server.pm
 installdirs:  site
 requires:
@@ -9,4 +9,4 @@
     Gearman::Util:                 0
 
 distribution_type: module
-generated_by: ExtUtils::MakeMaker version 6.17
+generated_by: ExtUtils::MakeMaker version 6.30

Modified: trunk/gearman-server/debian/changelog
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/debian/changelog?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/debian/changelog (original)
+++ trunk/gearman-server/debian/changelog Mon Jan 25 15:22:36 2010
@@ -1,6 +1,7 @@
-gearman-server (1.09-2) UNRELEASED; urgency=low
+gearman-server (1.11-1) UNRELEASED; urgency=low
 
   * Take over package (Closes: #549362)
+  * New upstream release
   * Rewrite control description
   * Use new short debhelper 7 rules format
   * Standards-Version 3.8.3 (no changes)
@@ -8,8 +9,9 @@
   * Rename binary package to libgearman-server-perl; the dummy
     package can be removed after upgrade to squeeze
   * Add a watch file
+  * Rename gearman-server.{default,init} to libgearman-server-perl...
 
- -- Jonathan Yu <jawnsy at cpan.org>  Mon, 25 Jan 2010 10:27:14 -0500
+ -- Jonathan Yu <jawnsy at cpan.org>  Mon, 25 Jan 2010 10:31:28 -0500
 
 gearman-server (1.09-1) unstable; urgency=low
 

Added: trunk/gearman-server/debian/libgearman-server-perl.install
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/debian/libgearman-server-perl.install?rev=51542&op=file
==============================================================================
--- trunk/gearman-server/debian/libgearman-server-perl.install (added)
+++ trunk/gearman-server/debian/libgearman-server-perl.install Mon Jan 25 15:22:36 2010
@@ -1,0 +1,1 @@
+debian/tmp/usr/*

Modified: trunk/gearman-server/gearmand
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/gearmand?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/gearmand (original)
+++ trunk/gearman-server/gearmand Mon Jan 25 15:22:36 2010
@@ -34,6 +34,33 @@
 =item --debug=1
 
 Enable debugging (currently the only debug output is when a client or worker connects).
+
+=item --accept=10
+
+Number of new connections to accept each time we see a listening socket ready. This doesn't usually
+need to be tuned by anyone, however in dire circumstances you may need to do it quickly.
+
+=item --wakeup=3
+
+Number of workers to wake up per job inserted into the queue.
+
+Zero (0) is a perfectly acceptable answer, and can be used if you don't care much about job latency.
+This would bank on the base idea of a worker checking in with the server every so often.
+
+Negative One (-1) indicates that all sleeping workers should be woken up.
+
+All other negative numbers will cause the server to throw exception and not start.
+
+=item --wakeup-delay=
+
+Time interval before waking up more workers (the value specified by --wakeup) when jobs are still in
+the queue.
+
+Zero (0) means go as fast as possible, but not all at the same time. Similar to -1 on --wakeup, but
+is more cooperative in gearmand's multitasking model.
+
+Negative One (-1) means that this event won't happe, so only the initial workers will be woken up to
+handle jobs in the queue.
 
 =back
 
@@ -91,6 +118,9 @@
     $nokeepalive,
     $notify_pid,
     $opt_pidfile,
+    $accept,
+    $wakeup,
+    $wakeup_delay,
    );
 my $conf_port = 7003;
 
@@ -98,7 +128,10 @@
                          'd|daemonize'    => \$daemonize,
                          'p|port=i'       => \$conf_port,
                          'debug=i'        => \$DEBUG,
-			 'pidfile=s'      => \$opt_pidfile,
+                         'pidfile=s'      => \$opt_pidfile,
+                         'accept=i'       => \$accept,
+                         'wakeup=i'       => \$wakeup,
+                         'wakeup-delay=f' => \$wakeup_delay,
                          'notifypid|n=i'  => \$notify_pid,  # for test suite only.
                          );
 
@@ -110,8 +143,11 @@
 
 $SIG{'PIPE'} = "IGNORE";  # handled manually
 
-my $server = Gearman::Server->new;
-my $ssock  = $server->create_listening_sock($conf_port);
+my $server = Gearman::Server->new(
+                                  wakeup          => $wakeup,
+                                  wakeup_delay    => $wakeup_delay,
+             );
+my $ssock  = $server->create_listening_sock($conf_port, accept_per_loop => $accept);
 
 if ($opt_pidfile) {
     open my $fh, '>', $opt_pidfile or die "Could not open $opt_pidfile: $!";

Modified: trunk/gearman-server/lib/Gearman/Server.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server.pm Mon Jan 25 15:22:36 2010
@@ -1,23 +1,74 @@
 package Gearman::Server;
+
+=head1 NAME
+
+Gearman::Server - function call "router" and load balancer
+
+=head1 DESCRIPTION
+
+You run a Gearman server (or more likely, many of them for both
+high-availability and load balancing), then have workers (using
+L<Gearman::Worker> from the Gearman module, or libraries for other
+languages) register their ability to do certain functions to all of
+them, and then clients (using L<Gearman::Client>,
+L<Gearman::Client::Async>, etc) request work to be done from one of
+the Gearman servers.
+
+The servers connect them, routing function call requests to the
+appropriate workers, multiplexing responses to duplicate requests as
+requested, etc.
+
+More than likely, you want to use the provided L<gearmand> wrapper
+script, and not use Gearman::Server directly.
+
+=cut
+
 use strict;
 use Gearman::Server::Client;
+use Gearman::Server::Listener;
 use Gearman::Server::Job;
-use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
+use Socket qw(IPPROTO_TCP SOL_SOCKET SOCK_STREAM AF_UNIX SOCK_STREAM PF_UNSPEC);
 use Carp qw(croak);
 use Sys::Hostname ();
+use IO::Handle ();
 
 use fields (
             'client_map',    # fd -> Client
             'sleepers',      # func -> { "Client=HASH(0xdeadbeef)" => Client }
+            'sleepers_list', # func -> [ Client, ... ], ...
             'job_queue',     # job_name -> [Job, Job*]  (key only exists if non-empty)
             'job_of_handle', # handle -> Job
             'max_queue',     # func -> configured max jobqueue size
             'job_of_uniq',   # func -> uniq -> Job
             'handle_ct',     # atomic counter
             'handle_base',   # atomic counter
+            'listeners',     # arrayref of listener objects
+            'wakeup',        # number of workers to wake
+            'wakeup_delay',  # seconds to wait before waking more workers
+            'wakeup_timers', # func -> timer, timer to be canceled or adjusted when job grab/inject is called
             );
 
-our $VERSION = "1.09";
+our $VERSION = "1.11";
+
+=head1 METHODS
+
+=head2 new
+
+  $server_object = Gearman::Server->new( %options )
+
+Creates and returns a new Gearman::Server object, which attaches itself to the Danga::Socket event loop. The server will begin operating when the Danga::Socket runloop is started. This means you need to start up the runloop before anything will happen.
+
+Options:
+
+=over
+
+=item port
+
+Specify a port which you would like the Gearman::Server to listen on for TCP connections (not necessary, but useful)
+
+=back
+
+=cut
 
 sub new {
     my ($class, %opts) = @_;
@@ -25,15 +76,37 @@
 
     $self->{client_map}    = {};
     $self->{sleepers}      = {};
+    $self->{sleepers_list} = {};
     $self->{job_queue}     = {};
     $self->{job_of_handle} = {};
     $self->{max_queue}     = {};
     $self->{job_of_uniq}   = {};
+    $self->{listeners}     = [];
+    $self->{wakeup}        = 3;
+    $self->{wakeup_delay}  = .1;
+    $self->{wakeup_timers} = {};
 
     $self->{handle_ct} = 0;
     $self->{handle_base} = "H:" . Sys::Hostname::hostname() . ":";
 
     my $port = delete $opts{port};
+
+    my $wakeup = delete $opts{wakeup};
+
+    if (defined $wakeup) {
+        die "Invalid value passed in wakeup option"
+            if $wakeup < 0 && $wakeup != -1;
+        $self->{wakeup} = $wakeup;
+    }
+
+    my $wakeup_delay = delete $opts{wakeup_delay};
+
+    if (defined $wakeup_delay) {
+        die "Invalid value passed in wakeup_delay option"
+            if $wakeup_delay < 0 && $wakeup_delay != -1;
+        $self->{wakeup_delay} = $wakeup_delay;
+    }
+
     croak("Unknown options") if %opts;
     $self->create_listening_sock($port);
 
@@ -45,36 +118,34 @@
     #warn "$msg\n";
 }
 
+=head2 create_listening_sock
+
+  $server_object->create_listening_sock( $portnum )
+
+Add a TCP port listener for incoming Gearman worker and client connections.
+
+=cut
+
 sub create_listening_sock {
-    my ($self, $portnum) = @_;
+    my ($self, $portnum, %opts) = @_;
+
+    my $accept_per_loop = delete $opts{accept_per_loop};
+
+    warn "Extra options passed into create_listening_sock: " . join(', ', keys %opts) . "\n"
+        if keys %opts;
+
     my $ssock = IO::Socket::INET->new(LocalPort => $portnum,
                                       Type      => SOCK_STREAM,
                                       Proto     => IPPROTO_TCP,
                                       Blocking  => 0,
                                       Reuse     => 1,
-                                      Listen    => 10 )
+                                      Listen    => 1024 )
         or die "Error creating socket: $@\n";
 
-    $self->setup_listening_sock($ssock);
+    my $listeners = $self->{listeners};
+    push @$listeners, Gearman::Server::Listener->new($ssock, $self, accept_per_loop => $accept_per_loop);
+
     return $ssock;
-}
-
-sub setup_listening_sock {
-    my ($self, $ssock) = @_;
-
-    # make sure provided listening socket is non-blocking
-    IO::Handle::blocking($ssock, 0);
-    Danga::Socket->AddOtherFds(fileno($ssock) => sub {
-        my $csock = $ssock->accept
-            or return;
-
-        $self->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));
-
-        IO::Handle::blocking($csock, 0);
-        setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
-
-        $self->new_client($csock);
-    });
 }
 
 sub new_client {
@@ -107,6 +178,9 @@
     $csock->autoflush(1);
     $psock->autoflush(1);
 
+    IO::Handle::blocking($csock, 0);
+    IO::Handle::blocking($psock, 0);
+
     my $client = Gearman::Server::Client->new($csock, $self);
 
     my ($package, $file, $line) = caller;
@@ -116,6 +190,16 @@
 
     return $psock;
 }
+
+=head2 start_worker
+
+  $pid = $server_object->start_worker( $prog )
+
+  ($pid, $client) = $server_object->start_worker( $prog )
+
+Fork and start a worker process named by C<$prog> and returns the pid (or pid and client object).
+
+=cut
 
 sub start_worker {
     my ($self, $prog) = @_;
@@ -149,6 +233,8 @@
     }
 
     close($psock);
+
+    IO::Handle::blocking($csock, 0);
     my $sock = $csock;
 
     my $client = Gearman::Server::Client->new($sock, $self);
@@ -184,21 +270,62 @@
 
 sub wake_up_sleepers {
     my ($self, $func) = @_;
+
+    if (my $existing_timer = delete($self->{wakeup_timers}->{$func})) {
+        $existing_timer->cancel();
+    }
+
+    return unless $self->_wake_up_some($func);
+
+    my $delay = $self->{wakeup_delay};
+
+    # -1 means don't setup a timer. 0 actually means go as fast as we can, cooperatively.
+    return if $delay == -1;
+
+    # If we're only going to wakeup 0 workers anyways, don't set up a timer.
+    return if $self->{wakeup} == 0;
+
+    my $timer = Danga::Socket->AddTimer($delay, sub {
+        # Be sure to not wake up more sleepers if we have no jobs in the queue.
+        # I know the object definition above says I can trust the func element to determine
+        # if there are items in the list, but I'm just gonna be safe, rather than sorry.
+        return unless @{$self->{job_queue}{$func} || []};
+        $self->wake_up_sleepers($func)
+    });
+    $self->{wakeup_timers}->{$func} = $timer;
+}
+
+# Returns true when there are still more workers to wake up
+# False if there are no sleepers
+sub _wake_up_some {
+    my ($self, $func) = @_;
     my $sleepmap = $self->{sleepers}{$func} or return;
-
-    foreach my $addr (keys %$sleepmap) {
-        my Gearman::Server::Client $c = $sleepmap->{$addr};
+    my $sleeporder = $self->{sleepers_list}{$func} or return;
+
+    # TODO SYNC UP STATE HERE IN CASE TWO LISTS END UP OUT OF SYNC
+
+    my $max = $self->{wakeup};
+
+    while (@$sleeporder) {
+        my Gearman::Server::Client $c = shift @$sleeporder;
         next if $c->{closed} || ! $c->{sleeping};
+        if ($max-- <= 0) {
+            unshift @$sleeporder, $c;
+            return 1;
+        }
+        delete $sleepmap->{"$c"};
         $c->res_packet("noop");
         $c->{sleeping} = 0;
     }
 
     delete $self->{sleepers}{$func};
+    delete $self->{sleepers_list}{$func};
     return;
 }
 
 sub on_client_sleep {
-    my ($self, $cl) = @_;
+    my $self = shift;
+    my Gearman::Server::Client $cl = shift;
 
     foreach my $cd (@{$cl->{can_do_list}}) {
         # immediately wake the sleeper up if there are things to be done
@@ -209,7 +336,24 @@
         }
 
         my $sleepmap = ($self->{sleepers}{$cd} ||= {});
-        $sleepmap->{"$cl"} ||= $cl;
+        my $count = $sleepmap->{"$cl"}++;
+
+        next if $count >= 2;
+
+        my $sleeporder = ($self->{sleepers_list}{$cd} ||= []);
+
+        # The idea here is to keep workers at the head of the list if they are doing work, hopefully
+        # this will allow extra workers that aren't needed to actually go 'idle' safely.
+        my $jobs_done = $cl->{jobs_done_since_sleep};
+
+        if ($jobs_done) {
+            unshift @$sleeporder, $cl;
+        } else {
+            push @$sleeporder, $cl;
+        }
+
+        $cl->{jobs_done_since_sleep} = 0;
+
     }
 }
 
@@ -231,6 +375,10 @@
 sub note_job_finished {
     my Gearman::Server $self = shift;
     my Gearman::Server::Job $job = shift;
+
+    if (my Gearman::Server::Client $worker = $job->worker) {
+        $worker->{jobs_done_since_sleep}++;
+    }
 
     if (length($job->{uniq})) {
         delete $self->{job_of_uniq}{$job->{func}}{$job->{uniq}};
@@ -278,7 +426,7 @@
     while (1) {
         $job = shift @{$self->{job_queue}{$func}};
         return $empty->() unless $job;
-        return $job unless $job->{require_listener};
+        return $job unless $job->require_listener;
 
         foreach my Gearman::Server::Client $c (@{$job->{listeners}}) {
             return $job if $c && ! $c->{closed};
@@ -291,27 +439,6 @@
 1;
 __END__
 
-=head1 NAME
-
-Gearman::Server - function call "router" and load balancer
-
-=head1 DESCRIPTION
-
-You run a Gearman server (or more likely, many of them for both
-high-availability and load balancing), then have workers (using
-L<Gearman::Worker> from the Gearman module, or libraries for other
-languages) register their ability to do certain functions to all of
-them, and then clients (using L<Gearman::Client>,
-L<Gearman::Client::Async>, etc) request work to be done from one of
-the Gearman servers.
-
-The servers connect them, routing function call requests to the
-appropriate workers, multiplexing responses to duplicate requests as
-requested, etc.
-
-More than likely, you want to use the provided L<gearmand> wrapper
-script, and not use Gearman::Server directly.
-
 =head1 SEE ALSO
 
 L<gearmand>

Modified: trunk/gearman-server/lib/Gearman/Server/Client.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server/Client.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server/Client.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server/Client.pm Mon Jan 25 15:22:36 2010
@@ -27,6 +27,8 @@
             'can_do',  # { $job_name => $timeout } $timeout can be undef indicating no timeout
             'can_do_list',
             'can_do_iter',
+            'fast_read',
+            'fast_buffer',
             'read_buf',
             'sleeping',   # 0/1:  they've said they're sleeping and we haven't woken them up
             'timer', # Timer for job cancellation
@@ -34,7 +36,13 @@
             'client_id',  # opaque string, no whitespace.  workers give this so checker scripts
                           # can tell apart the same worker connected to multiple jobservers.
             'server',     # pointer up to client's server
+            'options',
+            'jobs_done_since_sleep',
             );
+
+# 60k read buffer default, similar to perlbal's backend read.
+use constant READ_SIZE => 60 * 1024;
+use constant MAX_READ_SIZE => 512 * 1024;
 
 # Class Method:
 sub new {
@@ -43,6 +51,8 @@
     $self = fields::new($self) unless ref $self;
     $self->SUPER::new($sock);
 
+    $self->{fast_read}   = undef; # Number of bytes to read as fast as we can (don't try to process them)
+    $self->{fast_buffer} = []; # Array of buffers used during fast read operation
     $self->{read_buf}    = '';
     $self->{sleeping}    = 0;
     $self->{can_do}      = {};
@@ -51,19 +61,43 @@
     $self->{can_do_iter} = 0;  # numeric iterator for where we start looking for jobs
     $self->{client_id}   = "-";
     $self->{server}      = $server;
+    $self->{options}     = {};
+    $self->{jobs_done_since_sleep} = 0;
 
     return $self;
 }
 
+sub option {
+    my Gearman::Server::Client $self = shift;
+    my $option = shift;
+
+    return $self->{options}->{$option};
+}
+
 sub close {
     my Gearman::Server::Client $self = shift;
 
-    while (my ($handle, $job) = each %{$self->{doing}}) {
+    my $doing = $self->{doing};
+
+    while (my ($handle, $job) = each %$doing) {
         my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
         $job->relay_to_listeners($msg);
         $job->note_finished(0);
     }
 
+    # Clear the doing list, since it may contain a set of jobs which contain
+    # references back to us.
+    %$doing = ();
+
+    # Remove self from sleepers, otherwise it will be leaked if another worker
+    # for the job never connects.
+    my $sleepers = $self->{server}{sleepers};
+    for my $job (@{ $self->{can_do_list} }) {
+        my $sleeping = $sleepers->{$job};
+        delete $sleeping->{$self};
+        delete $sleepers->{$job} unless %$sleeping;
+    }
+
     $self->{server}->note_disconnected_client($self);
 
     $self->CMD_reset_abilities;
@@ -75,9 +109,40 @@
 sub event_read {
     my Gearman::Server::Client $self = shift;
 
-    my $bref = $self->read(1024);
-    return $self->close unless defined $bref;
-    $self->{read_buf} .= $$bref;
+    my $read_size = $self->{fast_read} || READ_SIZE;
+    my $bref = $self->read($read_size);
+
+    # Delay close till after buffers are written on EOF. If we are unable
+    # to write 'err' or 'hup' will be thrown and we'll close faster.
+    return $self->write(sub { $self->close } ) unless defined $bref;
+
+    if ($self->{fast_read}) {
+        push @{$self->{fast_buffer}}, $$bref;
+        $self->{fast_read} -= length($$bref);
+
+        # If fast_read is still positive, then we need to read more data
+        return if ($self->{fast_read} > 0);
+
+        # Append the whole giant read buffer to our main read buffer
+        $self->{read_buf} .= join('', @{$self->{fast_buffer}});
+
+        # Reset the fast read state for next time.
+        $self->{fast_buffer} = [];
+        $self->{fast_read} = undef;
+    } else {
+        # Exact read size length likely means we have more sitting on the
+        # socket. Buffer up to half a meg in one go.
+        if (length($$bref) == READ_SIZE) {
+            my $limit = int(MAX_READ_SIZE / READ_SIZE);
+            my @crefs = ($$bref);
+            while (my $cref = $self->read(READ_SIZE)) {
+                push(@crefs, $$cref);
+                last if (length($$cref) < READ_SIZE || $limit-- < 1);
+            }
+            $bref = \join('', @crefs);
+        }
+        $self->{read_buf} .= $$bref;
+    }
 
     my $found_cmd;
     do {
@@ -87,8 +152,9 @@
         if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
             my ($cmd, $len) = unpack("NN", $1);
             if ($blen < $len + 12) {
-                # not here yet.
-                $found_cmd = 0;
+                # Start a fast read loop to get all the data we need, less
+                # what we already have in the buffer.
+                $self->{fast_read} = $len + 12 - $blen;
                 return;
             }
 
@@ -215,6 +281,22 @@
     return 1;
 }
 
+sub CMD_work_exception {
+    my Gearman::Server::Client $self = shift;
+    my $ar = shift;
+
+    $$ar =~ s/^(.+?)\0//;
+    my $handle = $1;
+    my $job = $self->{doing}{$handle};
+
+    return $self->error_packet("not_worker") unless $job && $job->worker == $self;
+
+    my $msg = Gearman::Util::pack_res_command("work_exception", join("\0", $handle, $$ar));
+    $job->relay_to_option_listeners($msg, "exceptions");
+
+    return 1;
+}
+
 sub CMD_pre_sleep {
     my Gearman::Server::Client $self = shift;
     $self->{'sleeping'} = 1;
@@ -293,6 +375,22 @@
     }
 
     $self->_setup_can_do_list;
+}
+
+sub CMD_option_req {
+    my Gearman::Server::Client $self = shift;
+    my $ar = shift;
+
+    my $success = sub {
+        return $self->res_packet("option_res", $$ar);
+    };
+
+    if ($$ar eq 'exceptions') {
+        $self->{options}->{exceptions} = 1;
+        return $success->();
+    }
+
+    return $self->error_packet("unknown_option");
 }
 
 sub CMD_set_client_id {
@@ -407,7 +505,7 @@
         $self->$cmd_name(\$blob);
     };
     return $ret unless $@;
-    print "Error: $@\n";
+    warn "Error: $@\n";
     return $self->error_packet("server_error", $@);
 }
 
@@ -507,6 +605,99 @@
     }
 
     $self->write( ".\n" );
+}
+
+=head2 "jobs"
+
+Output format is zero or more lines of:
+
+    [Job function name]\t[Uniq (coalescing) key]\t[Worker address]\t[Number of listeners]\n
+
+Follows by a single line of:
+
+    .\n
+
+\t is a literal tab character
+\n is perl's definition of newline (literal \n on linux, something else on win32)
+
+=cut
+
+sub TXTCMD_jobs {
+    my Gearman::Server::Client $self = shift;
+
+    foreach my $job ($self->{server}->jobs) {
+        my $func = $job->func;
+        my $uniq = $job->uniq;
+        my $worker_addr = "-";
+
+        if (my $worker = $job->worker) {
+            $worker_addr = $worker->peer_addr_string;
+        }
+
+        my $listeners = $job->listeners;
+
+        $self->write("$func\t$uniq\t$worker_addr\t$listeners\n");
+    }
+
+    $self->write(".\n");
+}
+
+=head2 "clients"
+
+Output format is zero or more sections of:
+
+=over
+
+One line of:
+
+    [Client Address]\n
+
+Followed by zero or more lines of:
+
+    \t[Job Function]\t[Uniq (coalescing) key]\t[Worker Address]\n
+
+=back
+
+Follows by a single line of:
+
+    .\n
+
+\t is a literal tab character
+\n is perl's definition of newline (literal \n on linux, something else on win32)
+
+=cut
+
+sub TXTCMD_clients {
+    my Gearman::Server::Client $self = shift;
+
+    my %jobs_by_client;
+
+    foreach my $job ($self->{server}->jobs) {
+        foreach my $client ($job->listeners) {
+            my $ent = $jobs_by_client{$client} ||= [];
+            push @$ent, $job;
+        }
+    }
+
+    foreach my $client ($self->{server}->clients) {
+        my $client_addr = $client->peer_addr_string;
+        $self->write("$client_addr\n");
+        my $jobs = $jobs_by_client{$client} || [];
+
+        foreach my $job (@$jobs) {
+            my $func = $job->func;
+            my $uniq = $job->uniq;
+            my $worker_addr = "-";
+
+            if (my $worker = $job->worker) {
+                $worker_addr = $worker->peer_addr_string;
+            }
+            $self->write("\t$func\t$uniq\t$worker_addr\n");
+        }
+
+    }
+
+    $self->write(".\n");
 }
 
 sub TXTCMD_gladiator {

Modified: trunk/gearman-server/lib/Gearman/Server/Job.pm
URL: http://svn.debian.org/wsvn/pkg-perl/trunk/gearman-server/lib/Gearman/Server/Job.pm?rev=51542&op=diff
==============================================================================
--- trunk/gearman-server/lib/Gearman/Server/Job.pm (original)
+++ trunk/gearman-server/lib/Gearman/Server/Job.pm Mon Jan 25 15:22:36 2010
@@ -1,5 +1,6 @@
 package Gearman::Server::Job;
 use strict;
+use Scalar::Util;
 use Sys::Hostname;
 
 use fields (
@@ -61,9 +62,30 @@
     }
 }
 
+sub relay_to_option_listeners {
+    my Gearman::Server::Job $self = shift;
+    my $option = $_[1];
+    foreach my Gearman::Server::Client $c (@{$self->{listeners}}) {
+        next if !$c || $c->{closed};
+        next unless $c->option($option);
+        $c->write($_[0]);
+    }
+
+}
+
 sub clear_listeners {
     my Gearman::Server::Job $self = shift;
     $self->{listeners} = [];
+}
+
+sub listeners {
+    my Gearman::Server::Job $self = shift;
+    return @{$self->{listeners}};
+}
+
+sub uniq {
+    my Gearman::Server::Job $self = shift;
+    return $self->{uniq};
 }
 
 sub note_finished {




More information about the Pkg-perl-cvs-commits mailing list