[libtheschwartz-perl] 12/43: [svn-upgrade] Integrating new upstream version, libtheschwartz-perl (1.07)

dom at earth.li dom at earth.li
Mon May 9 20:11:00 UTC 2016


This is an automated email from the git hooks/post-receive script.

dom pushed a commit to branch master
in repository libtheschwartz-perl.

commit 997b0e7fb1d0c687a1f7cb28af426b60017f8690
Author: Dominic Hargreaves <dom at earth.li>
Date:   Tue Aug 19 00:01:29 2008 +0000

    [svn-upgrade] Integrating new upstream version, libtheschwartz-perl (1.07)
---
 CHANGES                      |  17 ++++
 MANIFEST                     |  23 +++--
 META.yml                     |   2 +-
 doc/schema-postgres.sql      |  70 +++++++++++++++
 extras/thetop                | 209 +++++++++++++++++++++++++++++++++++++++++++
 lib/TheSchwartz.pm           | 200 +++++++++++++++++++++++++++++++++++++----
 lib/TheSchwartz/FuncMap.pm   |  11 ++-
 lib/TheSchwartz/Worker.pm    |   6 +-
 server/bin/schwartzd         |  59 ++++++++++++
 server/doc/deps.txt          |  25 ++++++
 server/doc/protocol.txt      |  56 ++++++++++++
 server/t/00-start-ping.t     |  16 ++++
 server/t/01-insert-and-get.t |  45 ++++++++++
 server/t/lib/testlib.pl      | 174 +++++++++++++++++++++++++++++++++++
 t/api.t                      |  17 +++-
 t/priority.t                 |  72 +++++++++++++++
 t/scoreboard.t               |  78 ++++++++++++++++
 t/server-time.t              |  20 +++++
 18 files changed, 1073 insertions(+), 27 deletions(-)

diff --git a/CHANGES b/CHANGES
index 5161eb1..4ee5403 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,20 @@
+1.07 (2008-07-31)
+    - bchoate: Updates to support optional prioritization of jobs.
+    - ykerherve: Croak with a nice message id a driver cannot be
+      found for a handle
+
+1.06 (2007-09-07)
+    - Code to allow a 'top' like view of runnin schwartz workers.
+    - include postgres schema in docs.  from Michael Zedeler
+      <michael at zedeler.dk>  Currently not tested in regression
+      tests, though, so not "officially" supported yet.
+    - start of work on gearman-based schwartz server.
+
+1.05
+
+    - Set TheSchwartz::Job::insert_time to current server time when
+      inserting a new job.
+
 1.04 (2007-05-22)
 
     - no code changes, just packaging/dep/test fixes, as pointed out
diff --git a/MANIFEST b/MANIFEST
index d69416c..b1d3798 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -1,10 +1,16 @@
-bin/schwartzmon
 CHANGES
+MANIFEST			This list of files
+MANIFEST.SKIP
+META.yml
+Makefile.PL
+bin/schwartzmon
 doc/http-mappings.txt
 doc/notes.txt
+doc/schema-postgres.sql
 doc/schema.sql
-extras/check_schwartz
 extras/TheSchwartz.spec
+extras/check_schwartz
+extras/thetop
 inc/Module/AutoInstall.pm
 inc/Module/Install.pm
 inc/Module/Install/AutoInstall.pm
@@ -24,10 +30,12 @@ lib/TheSchwartz/FuncMap.pm
 lib/TheSchwartz/Job.pm
 lib/TheSchwartz/JobHandle.pm
 lib/TheSchwartz/Worker.pm
-Makefile.PL
-MANIFEST			This list of files
-MANIFEST.SKIP
-META.yml
+server/bin/schwartzd
+server/doc/deps.txt
+server/doc/protocol.txt
+server/t/00-start-ping.t
+server/t/01-insert-and-get.t
+server/t/lib/testlib.pl
 t/05-job-ctor.t
 t/api.t
 t/cleanup.t
@@ -45,8 +53,11 @@ t/lib/db-common.pl
 t/parallel-workers.t
 t/pod-coverage.t
 t/pod.t
+t/priority.t
 t/replace-with.t
 t/retry-delay.t
 t/schema-sqlite.sql
+t/scoreboard.t
+t/server-time.t
 t/unique.t
 t/work-before-funcids-exist.t
diff --git a/META.yml b/META.yml
index 4ec1fdd..55ad529 100644
--- a/META.yml
+++ b/META.yml
@@ -15,4 +15,4 @@ requires:
   Data::ObjectDriver: 0.04
   Digest::MD5: 0
   Storable: 0
-version: 1.04
+version: 1.07
diff --git a/doc/schema-postgres.sql b/doc/schema-postgres.sql
new file mode 100644
index 0000000..c6ad03f
--- /dev/null
+++ b/doc/schema-postgres.sql
@@ -0,0 +1,70 @@
+-- From: Michael Zedeler <michael at zedeler.dk>
+-- Date: July 30, 2007 7:31:55 AM PDT
+-- To: cpan at sixapart.com
+-- Subject: TheSchwartz database schema for postgresql
+--
+-- Hi.
+--
+-- I couldn't find any useful postgresql compatible schema file for  
+-- the tables that TheSchwartz seems to depend on, so I rewrote the  
+-- one supplied in the package.
+--
+-- Here it is. Feel free to include it in the next release.
+--
+-- Regards,
+--
+-- Michael.
+
+
+CREATE TABLE funcmap (
+        funcid SERIAL,
+        funcname       VARCHAR(255) NOT NULL,
+        UNIQUE(funcname)
+);
+
+CREATE TABLE job (
+        jobid           SERIAL,
+        funcid          INT NOT NULL,
+        arg             BYTEA,
+        uniqkey         VARCHAR(255) NULL,
+        insert_time     INTEGER,
+        run_after       INTEGER NOT NULL,
+        grabbed_until   INTEGER NOT NULL,
+        priority        SMALLINT,
+        coalesce        VARCHAR(255),
+        UNIQUE(funcid, uniqkey)
+);
+
+CREATE INDEX job_funcid_runafter ON job (funcid, run_after);
+
+CREATE INDEX job_funcid_coalesce ON job (funcid, coalesce);
+
+CREATE TABLE note (
+        jobid           BIGINT NOT NULL,
+        notekey         VARCHAR(255),
+        PRIMARY KEY (jobid, notekey),
+        value           BYTEA
+);
+
+CREATE TABLE error (
+        error_time      INTEGER NOT NULL,
+        jobid           BIGINT NOT NULL,
+        message         VARCHAR(255) NOT NULL,
+        funcid          INT NOT NULL DEFAULT 0
+);
+
+CREATE INDEX error_funcid_errortime ON error (funcid, error_time);
+CREATE INDEX error_time ON error (error_time);
+CREATE INDEX error_jobid ON error (jobid);
+
+CREATE TABLE exitstatus (
+        jobid           BIGINT PRIMARY KEY NOT NULL,
+        funcid          INT NOT NULL DEFAULT 0,
+        status          SMALLINT,
+        completion_time INTEGER,
+        delete_after    INTEGER
+);
+
+CREATE INDEX exitstatus_funcid ON exitstatus (funcid);
+CREATE INDEX exitstatus_deleteafter ON exitstatus (delete_after);
+
diff --git a/extras/thetop b/extras/thetop
new file mode 100755
index 0000000..275b943
--- /dev/null
+++ b/extras/thetop
@@ -0,0 +1,209 @@
+#!/usr/bin/perl -w
+
+=pod
+
+=head1 NAME
+
+thetop - A 'top' utility for the schwartz
+
+=head1 SYNOPSIS
+
+  thetop [--func FORMAT] [--arg FORMAT] [--sort ARGS] [--delay SECS] [--score-dir DIR]
+
+=head1 DESCRIPTION
+
+
+=cut
+
+#--------------------------------------#
+# Dependencies
+
+use strict;
+
+use Getopt::Long;
+use Term::Cap;
+use POSIX;
+
+#--------------------------------------#
+# Global Variables
+
+use vars qw( $OSPEED );
+
+BEGIN {
+    my $termios = POSIX::Termios->new;
+    $termios->getattr;
+    $OSPEED = $termios->getospeed || 9600;
+};
+
+our $TERM = Term::Cap->Tgetent({OSPEED=>$OSPEED});
+
+#--------------------------------------#
+# Main Program
+
+my ($score_dir, $delay, $func_col, @arg_col, $sort);
+
+GetOptions('score-dir=s' => \$score_dir,
+           'delay|d=s'   => \$delay,
+           'func=s'      => \$func_col,
+           'arg=s'       => \@arg_col,
+           'sort|s=s'    => \$sort,
+          );
+
+# Make sure we know where to find the scoreboard files
+unless ($score_dir) {
+    foreach my $d (qw(/var/run /dev/shm /tmp)) {
+        if (-e "$d/theschwartz") {
+            $score_dir = "$d/theschwartz";
+            last;
+        }
+    }
+
+    die "Can't find scoreboard directory.  Use '--score-dir'\n"
+      unless $score_dir;
+}
+
+# If we got some formatting instructions for the arg column, parse it out
+my %arg_col_by_func;
+if (@arg_col) {
+    foreach my $a (@arg_col) {
+        if ($a =~ /=/) {
+            my ($func, $fmt) = split('=', $a);
+            $arg_col_by_func{$func} = $fmt;
+        } else {
+            $arg_col_by_func{'__ALL__'} = $a;
+        }
+    }
+}
+
+# Make sure to give a reasonable default for delay
+$delay ||= 3;
+
+# Start reporting
+clr_screen();
+while (1) {
+    report($score_dir, $func_col, \%arg_col_by_func, $sort);
+    sleep($delay);
+    clr_screen();
+}
+
+################################################################################
+
+sub report {
+    my ($dir, $func_col, $arg_col_by_func, $sort) = @_;
+
+    # Find the files available
+    opendir(SD, $dir) or die "Can't read directory '$dir': $!\n";
+    my @files = map { $dir."/$_" } readdir(SD);
+    closedir(SD);
+
+    # Grab the data out of them
+    my @data;
+    foreach my $f (@files) {
+        next unless $f =~ /scoreboard\.[0-9]+$/;
+        open(SF, '<', $f) or die "Can't open score file '$f': $!\n";
+        my %dat = map { chomp; split('=') } <SF>;
+        close(SF);
+
+        $dat{arg_array} = [split(',', $dat{arg}||'')];
+        push @data, \%dat;
+    }
+
+    my $num = scalar(@data);
+    my $width = 80-17-$num;
+    printf("Workers: %d total %${width}s\n\n", $num, scalar localtime);
+    printf("% 5s % 20s % 2s % 7s % 41s\n", 'PID', 'FUNC', 'S', 'TIME', 'ARGS');
+    foreach my $d (sort { order_by($sort, $a, $b) } @data) {
+        my $func_str = fmt_func($d, $func_col);
+
+        printf("% 5s % 20s % 2s % 7s % 41s\n",
+               $d->{pid},
+               $func_str,
+               ($d->{done} ? 'S' : 'R'),
+               fmt_time($d),
+               fmt_arg($d, $arg_col_by_func, $func_str),
+              );
+    }
+}
+
+sub order_by {
+    my ($sort, $a, $b) = @_;
+
+    if ($sort) {
+
+    } else {
+        # Default to push running tasks to the top
+        return ($a->{done}||0) <=> ($b->{done}||0) ||
+               ($a->{started}||0) <=> ($b->{started}||0);
+    }
+}
+
+sub fmt_func {
+    my ($d, $fmt) = @_;
+    my $val = $d->{funcname};
+
+    if ($fmt) {
+        if ($fmt eq 'trim') {
+            $val =~ s/^.+:://g;
+        } else {
+            $val =~ /($fmt)/;
+            $val = $1;
+        }
+    }
+
+    return substr($val, 0, 20),
+}
+
+sub fmt_time {
+    my ($d) = @_;
+    my $secs = ($d->{done}||time) - $d->{started};
+
+    if ($secs < 60) {
+        return sprintf("%02d:%02d", 0, $secs);
+    } elsif ($secs < 3600) {
+        my $min = int($secs/60);
+        $secs = $secs%60;
+        return sprintf("%02d:%02d", $min, $secs);
+    } else {
+        my $hr  = int($secs/60/60);
+        my $min = int($secs/60%60);
+        $secs = $secs%60;
+        return sprintf("%d:%02d:%02d", $hr, $min, $secs);
+    }
+}
+
+## Format the arguments by interpreting the args as either a hash or an array
+## and printing out the appropriate element.
+
+sub fmt_arg {
+    my ($d, $arg_col_by_func, $func_str) = @_;
+    my $val = $d->{arg};
+    my $func_orig = $d->{funcname};
+
+    if ($arg_col_by_func) {
+        my $fmt = ($arg_col_by_func{$func_str}  ||
+                   $arg_col_by_func{$func_orig} ||
+                   $arg_col_by_func{'__ALL__'});
+        if ($fmt) {
+            my $arg_array = $d->{arg_array};
+
+            # If its a number treat the args as an array
+            if ($fmt =~ /^[0-9]+$/) {
+                $val = $arg_array->[$fmt];
+            }
+            # otherwise, treat the args as a hash
+            else {
+                # Compensate for odd numbers of args
+                push @$arg_array, undef if scalar(@$arg_array) % 2;
+
+                my %h = @$arg_array;
+                $val = $h{$fmt};
+            }
+        }
+    }
+
+    return substr($val||'', 0, 41),
+}
+
+sub clr_screen {
+    $TERM->Tputs('cl', 1, \*STDOUT);
+}
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index db70422..b0d114f 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,10 +1,10 @@
-# $Id: TheSchwartz.pm 122 2007-05-22 17:11:37Z bradfitz $
+# $Id: TheSchwartz.pm 138 2008-06-16 17:36:19Z ykerherve $
 
 package TheSchwartz;
 use strict;
-use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration );
+use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
 
-our $VERSION = "1.04";
+our $VERSION = "1.07";
 
 use Carp qw( croak );
 use Data::ObjectDriver::Errors;
@@ -35,7 +35,9 @@ sub new {
     my $databases = delete $args{databases};
 
     $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
+    $client->set_prioritize(delete $args{prioritize});
     $client->set_verbose(delete $args{verbose});
+    $client->set_scoreboard(delete $args{scoreboard});
     $client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
     croak "unknown options ", join(', ', keys %args) if keys %args;
 
@@ -72,7 +74,8 @@ sub driver_for {
     if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
         $driver = $client->{cached_drivers}{$hashdsn}{driver};
     } else {
-        my $db = $client->{databases}{$hashdsn};
+        my $db = $client->{databases}{$hashdsn}
+            or croak "Ouch, I don't know about a database whose hash is $hashdsn";
         $driver = Data::ObjectDriver::Driver::DBI->new(
                 dsn      => $db->{dsn},
                 username => $db->{user},
@@ -80,8 +83,8 @@ sub driver_for {
                 ($db->{prefix} ? (prefix   => $db->{prefix}) : ()),
         );
         if ($cache_duration) {
-            $client->{cached_drivers}{$hashdsn}{driver} = $driver;            
-            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;                        
+            $client->{cached_drivers}{$hashdsn}{driver} = $driver;
+            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
         }
     }
     return $driver;
@@ -165,12 +168,19 @@ sub list_jobs {
             } $driver->search('TheSchwartz::Job' => {
                 funcid        => $funcid,
                 @options
-                }, { limit => $limit });
+                }, { limit => $limit,
+                    ( $client->prioritize ? ( sort => 'priority',
+                    direction => 'descend' ) : () )
+                });
         } else {
             push @jobs, $driver->search('TheSchwartz::Job' => {
                 funcid        => $funcid,
                 @options
-                }, { limit => $limit });
+                }, { limit => $limit,
+                    ( $client->prioritize ? ( sort => 'priority',
+                        direction => 'descend' ) : () )
+                }
+            );
         }
     }
     return @jobs;
@@ -213,7 +223,11 @@ sub _find_job_with_coalescing {
                     run_after     => \ "<= $unixtime",
                     grabbed_until => \ "<= $unixtime",
                     coalesce      => { op => $op, value => $coval },
-                }, { limit => $FIND_JOB_BATCH_SIZE });
+                }, { limit => $FIND_JOB_BATCH_SIZE,
+                    ( $client->prioritize ? ( sort => 'priority',
+                        direction => 'descend' ) : () )
+                }
+            );
         };
         if ($@) {
             unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -252,7 +266,11 @@ sub find_job_for_workers {
                     funcid        => \@ids,
                     run_after     => \ "<= $unixtime",
                     grabbed_until => \ "<= $unixtime",
-                }, { limit => $FIND_JOB_BATCH_SIZE });
+                }, { limit => $FIND_JOB_BATCH_SIZE,
+                    ( $client->prioritize ? ( sort => 'priority',
+                    direction => 'descend' ) : () )
+                }
+            );
         };
         if ($@) {
             unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -268,6 +286,13 @@ sub find_job_for_workers {
     }
 }
 
+sub get_server_time {
+    my TheSchwartz $client = shift;
+    my($driver) = @_;
+    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
+    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
+}
+
 sub _grab_a_job {
     my TheSchwartz $client = shift;
     my $hashdsn = shift;
@@ -286,8 +311,7 @@ sub _grab_a_job {
         my $worker_class = $job->funcname;
         my $old_grabbed_until = $job->grabbed_until;
 
-        my $unixtime_sql = $driver->dbd->sql_for_unixtime;
-        my $server_time = $driver->rw_handle->selectrow_array("SELECT $unixtime_sql")
+        my $server_time = $client->get_server_time($driver)
             or die "expected a server time";
 
         $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
@@ -329,6 +353,12 @@ sub insert_job_to_driver {
         ## on the hashed DSN. Also: this might fail, if the database is dead.
         $job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
 
+        ## This is sub-optimal because of clock skew, but something is
+        ## better than a NULL value. And currently, nothing in TheSchwartz
+        ## code itself uses insert_time. TODO: use server time, but without
+        ## having to do a roundtrip just to get the server time.
+        $job->insert_time(time);
+
         ## Now, insert the job. This also might fail.
         $driver->insert($job);
     };
@@ -490,7 +520,8 @@ sub work_once {
 
     my $class = $job ? $job->funcname : undef;
     if ($job) {
-        $job->debug("TheSchwartz::work_once got job of class '$class'");
+        my $priority = $job->priority ? ", priority " . $job->priority : "";
+        $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
     } else {
         $client->debug("TheSchwartz::work_once found no jobs");
     }
@@ -566,6 +597,118 @@ sub set_verbose {
     $client->{verbose} = $logger;
 }
 
+sub scoreboard {
+    my TheSchwartz $client = shift;
+
+    return $client->{scoreboard};
+}
+
+sub set_scoreboard {
+    my TheSchwartz $client = shift;
+    my ($dir) = @_;
+
+    return unless $dir;
+
+    # They want the scoreboard but don't care where it goes
+    if (($dir eq '1') or ($dir eq 'on')) {
+        # Find someplace in tmpfs to save this
+        foreach my $d (qw(/var/run /dev/shm)) {
+            $dir = $d;
+            last if -e $dir;
+        }
+    }
+
+    $dir .= '/theschwartz';
+    unless (-e $dir) {
+        mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
+    }
+
+    $client->{scoreboard} = $dir."/scoreboard.$$";
+}
+
+sub start_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    # Don't do anything of (for some reason) we don't have a current job
+    my $job = $client->current_job;
+    return unless $job;
+
+    my $class = $job->funcname;
+
+    open(SB, '>', $scoreboard)
+      or $job->debug("Could not write scoreboard '$scoreboard': $!");
+    print SB join("\n", ("pid=$$",
+                         'funcname='.($class||''),
+                         'started='.($job->grabbed_until-($class->grab_for||1)),
+                         'arg='._serialize_args($job->arg),
+                        )
+                 ), "\n";
+    close(SB);
+
+    return;
+}
+
+# Quick and dirty serializer.  Don't use Data::Dumper because we don't need to
+# recurse indefinitely and we want to truncate the output produced
+sub _serialize_args {
+    my ($args) = @_;
+
+    if (ref $args) {
+        if (ref $args eq 'HASH') {
+            return join ',',
+                   map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
+                   keys %$args;
+        } elsif (ref $args eq 'ARRAY') {
+            return join ',',
+                   map { substr($_||'', 0, 200) }
+                   @$args;
+        }
+    } else {
+        return $args;
+    }
+}
+
+sub end_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    my $job = $client->current_job;
+
+    open(SB, '>>', $scoreboard)
+      or $job->debug("Could not append scoreboard '$scoreboard': $!");
+    print SB "done=".time."\n";
+    close(SB);
+
+    return;
+}
+
+sub clean_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    unlink($scoreboard);
+}
+
+sub prioritize {
+    my TheSchwartz $client = shift;
+    return $client->{prioritize};
+}
+
+sub set_prioritize {
+    my TheSchwartz $client = shift;
+    $client->{prioritize} = shift;
+}
+
 # current job being worked.  so if something dies, work_safely knows which to mark as dead.
 sub current_job {
     my TheSchwartz $client = shift;
@@ -577,6 +720,15 @@ sub set_current_job {
     $client->{current_job} = shift;
 }
 
+DESTROY {
+    foreach my $arg (@_) {
+        # Call 'clean_scoreboard' on TheSchwartz objects
+        if (ref($arg) and $arg->isa('TheSchwartz')) {
+            $arg->clean_scoreboard;
+        }
+    }
+}
+
 1;
 
 __END__
@@ -605,14 +757,14 @@ TheSchwartz - reliable job queue
     sub work {
         my $class = shift;
         my TheSchwartz::Job $job = shift;
-        
+
         print "Workin' hard or hardly workin'? Hyuk!!\n";
 
         $job->completed();
     }
 
     package main;
-    
+
     my $client = TheSchwartz->new( databases => $DATABASE_INFO );
     $client->can_do('MyWorker');
     $client->work();
@@ -681,6 +833,12 @@ it is called to log debug messages. If C<verbose> is not a coderef but is some
 other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
 messages will not be logged.
 
+=item * C<prioritize>
+
+A value indicating whether to utilize the job 'priority' field when selecting
+jobs to be processed. If unspecified, jobs will always be executed in a
+randomized order.
+
 =item * C<driver_cache_expiration>
 
 Optional value to control how long database connections are cached for in seconds.
@@ -765,6 +923,10 @@ Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
 Adds the given C<TheSchwartz::Job> objects to one of the client's job
 databases. All the given jobs are recorded in I<one> job database.
 
+=head2 C<$client-E<gt>set_prioritize( $prioritize )>
+
+Set the C<prioritize> value as described in the constructor.
+
 =head1 WORKING
 
 The methods of TheSchwartz clients for use in worker processes are:
@@ -790,6 +952,10 @@ Find and perform any jobs C<$client> can do, forever. When no job is available,
 the working process will sleep for C<$delay> seconds (or 5, if not specified)
 before looking again.
 
+=head2 C<$client-E<gt>work_on($handle)>
+
+Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
+
 =head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
 
 Returns a C<TheSchwartz::Job> for a random job that the client can do. If
@@ -810,6 +976,10 @@ Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
 find matching jobs, with all the attendant performance implications for your
 job databases.
 
+=head2 C<$client-E<gt>get_server_time( $driver )>
+
+Given an open driver I<$driver> to a database, gets the current server time from the database.
+
 =head1 COPYRIGHT, LICENSE & WARRANTY
 
 This software is Copyright 2007, Six Apart Ltd, cpan at sixapart.com. All
diff --git a/lib/TheSchwartz/FuncMap.pm b/lib/TheSchwartz/FuncMap.pm
index 3179082..65c9ba9 100644
--- a/lib/TheSchwartz/FuncMap.pm
+++ b/lib/TheSchwartz/FuncMap.pm
@@ -1,4 +1,4 @@
-# $Id: FuncMap.pm 42 2006-05-16 05:15:30Z btrott $
+# $Id: FuncMap.pm 136 2008-02-23 01:28:13Z bchoate $
 
 package TheSchwartz::FuncMap;
 use strict;
@@ -16,9 +16,16 @@ sub create_or_find {
     my $class = shift;
     my($driver, $funcname) = @_;
 
+    ## Attempt to select funcmap record by name. If successful, return
+    ## object, otherwise proceed with insertion and return.
+    my ($map) = $driver->search('TheSchwartz::FuncMap' =>
+            { funcname => $funcname }
+        );
+    return $map if $map;
+
     ## Attempt to insert a new funcmap row. Since the funcname column is
     ## UNIQUE, if the row already exists, an exception will be thrown.
-    my $map = $class->new;
+    $map = $class->new;
     $map->funcname($funcname);
     eval { $driver->insert($map) };
 
diff --git a/lib/TheSchwartz/Worker.pm b/lib/TheSchwartz/Worker.pm
index bbe61ab..2bfeaae 100644
--- a/lib/TheSchwartz/Worker.pm
+++ b/lib/TheSchwartz/Worker.pm
@@ -1,4 +1,4 @@
-# $Id: Worker.pm 106 2006-10-16 21:42:32Z mpaschal $
+# $Id: Worker.pm 134 2007-08-22 22:04:38Z garth $
 
 package TheSchwartz::Worker;
 use strict;
@@ -24,6 +24,8 @@ sub work_safely {
 
     $job->debug("Working on $class ...");
     $job->set_as_current;
+    $client->start_scoreboard;
+
     eval {
         $res = $class->work($job);
     };
@@ -37,6 +39,8 @@ sub work_safely {
         $cjob->failed('Job did not explicitly complete, fail, or get replaced');
     }
 
+    $client->end_scoreboard;
+
     # FIXME: this return value is kinda useless/undefined.  should we even return anything?  any callers? -brad
     return $res;
 }
diff --git a/server/bin/schwartzd b/server/bin/schwartzd
new file mode 100755
index 0000000..1c34808
--- /dev/null
+++ b/server/bin/schwartzd
@@ -0,0 +1,59 @@
+#!/usr/bin/perl
+
+use strict;
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+use Gearman::Worker;
+use TheSchwartz;
+use JSON::Any;
+
+my $j = JSON::Any->new;
+
+my $ts = TheSchwartz->new(databases => [{
+    dsn => "dbi:mysql:database=t_sch_unnamed",
+    user => "root",
+    pass => "",
+}]);
+
+# FIXME: use embedded gearman server, and workers be child processes
+
+my $worker = Gearman::Worker->new;
+$worker->job_servers('127.0.0.1:7003');
+
+$worker->register_function("insert_job" => handler(\&insert_job));
+$worker->work while 1;
+
+############################################################################
+
+sub handler {
+    my ($code) = @_;
+    return sub {
+	my $job = shift;
+	my $arg = $job->arg;
+	my $jreq = eval { $j->jsonToObj($job->arg) };
+	unless ($jreq) {
+	    die "not a valid JSON request";
+	}
+	return $code->($job, $jreq);
+    };
+}
+
+sub insert_job {
+    my ($job, $json) = @_;
+    my $funcname = $json->{funcname} or die "No funcname";
+    my $job = TheSchwartz::Job->new(
+				    funcname => $json->{funcname},
+				    arg      => $json->{arg},
+				    uniqkey  => $json->{uniqkey},
+				    coalesce => $json->{coalesce},
+				    );
+    my $h = $ts->insert($job) or
+        die "insert_failure\n";
+    return $h->as_string;
+}
+
diff --git a/server/doc/deps.txt b/server/doc/deps.txt
new file mode 100644
index 0000000..53d5366
--- /dev/null
+++ b/server/doc/deps.txt
@@ -0,0 +1,25 @@
+TheSchwartz (itself)
+Data::ObjectDriver
+  DBI
+  Class::Accessor::Fast (libclass-accessor-perl)
+  Class::Trigger (libclass-trigger-perl)
+     libio-stringy-perl
+     Class::Data::Inheritable
+  List::Util
+  Test::Exception -- build_requires (libtest-exception-perl)
+
+Digest::MD5
+Storable
+
+Gearman::Server
+
+Danga::Socket
+Sys::Syscall
+
+JSON::Any
+JSON
+
+----
+Perl client:
+   Gearman::Client
+   String::CRC32
diff --git a/server/doc/protocol.txt b/server/doc/protocol.txt
new file mode 100644
index 0000000..24134a3
--- /dev/null
+++ b/server/doc/protocol.txt
@@ -0,0 +1,56 @@
+grab_job:  {
+   can_do: [@can_do],
+}
+  -> {
+     jobid,
+     job,
+     arg,
+     # failure_count?
+  }
+  (or)
+  -> Nothing
+
+
+insert_job: {
+  job: "foo",
+  arg: "lskdjflksdjflskdf",
+  uniqkey: "blah",
+  run_after: $unix_time,
+  coalesce: "to_foo",
+}
+  -> jobid
+
+
+# atomic insert multiple jobs:
+insert_jobs:
+   [ {...}, {...}, ]
+
+   -> @jobids
+
+mark_completed: {
+   jobid: 5,
+   replace_with: [@jobs],    #optional
+}
+   -> { handles => [@handles] }
+
+mark_failed: {
+   jobid: 5,
+   message: "error message",
+   exit_status: 6,
+   retry_in: 80,   # optional.  if not present, no retry.
+}
+
+get_failure_log: {
+   jobid: 6
+}
+  -> [ {time:2342342,exitstatus ... }, {....}, {...} ]
+
+get_status: {
+   jobid: 6,
+}
+  -> {
+       exitstatus: 0,
+     }
+
+
+
diff --git a/server/t/00-start-ping.t b/server/t/00-start-ping.t
new file mode 100644
index 0000000..32ae203
--- /dev/null
+++ b/server/t/00-start-ping.t
@@ -0,0 +1,16 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+require 't/lib/testlib.pl';
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+
diff --git a/server/t/01-insert-and-get.t b/server/t/01-insert-and-get.t
new file mode 100644
index 0000000..9f934c7
--- /dev/null
+++ b/server/t/01-insert-and-get.t
@@ -0,0 +1,45 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+BEGIN {
+    require 't/lib/testlib.pl';
+}
+use Gearman::Client;
+use Data::Dumper;
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+my $cl = $srv->gearman_client;
+
+my $ret;
+
+# FIXME: test currently requires running gearmand on localhost
+{
+    use IO::Socket::INET;
+    my $sock = IO::Socket::INET->new(PeerAddr => "127.0.0.1:7003");
+    ok($sock, "local gearmand is up for testing")
+        or die "can't continue";
+}
+
+sub do_req {
+    my $req = shift;
+    my $ret = $cl->do_task("insert_job", json($req));
+    return undef unless $ret;
+    return $$ret unless $$ret =~ /^\s*[\[\{]/;
+    return unjson($$ret);
+}
+
+$ret = do_req({
+    funcname => "foo",
+    arg => "fooarg",
+});
+like($ret, qr/^\w+-\d+$/, "got a job handle");
+
diff --git a/server/t/lib/testlib.pl b/server/t/lib/testlib.pl
new file mode 100644
index 0000000..8668800
--- /dev/null
+++ b/server/t/lib/testlib.pl
@@ -0,0 +1,174 @@
+# $Id: db-common.pl 91 2006-08-17 00:39:55Z bradfitz $
+
+use strict;
+use File::Spec;
+use Carp qw(croak);
+use DBI;
+use FindBin;
+use JSON::Any;
+
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+sub json {
+    return JSON::Any->objToJson(shift);
+}
+
+sub unjson {
+    return JSON::Any->json_to_obj(shift);
+}
+
+sub test_client {
+    my %opts = @_;
+    my $dbs     = delete $opts{dbs};
+    my $init    = delete $opts{init};
+    my $pfx     = delete $opts{dbprefix};
+    croak "'dbs' not an ARRAY" unless ref $dbs eq "ARRAY";
+    croak "unknown opts" if %opts;
+    $init = 1 unless defined $init;
+
+    if ($init) {
+        setup_dbs({ prefix => $pfx }, $dbs);
+    }
+
+    return TheSchwartz->new(databases => [
+                                          map { {
+                                              dsn  => dsn_for($_),
+                                              user => "root",
+                                              pass => "",
+                                              prefix => $pfx,
+                                          } } @$dbs
+                                          ]);
+}
+
+package TestDB;
+use strict;
+sub new {
+    my $class = shift;
+    my $name = shift || "unnamed";
+    my $db = TestDB::MySQL->new($name) || TestDB::SQLite->new($name);
+    if ($db) {
+	my $dbh = $db->dbh;
+	my $schema = $db->schema_file;
+        my @sql = _load_sql($schema);
+        for my $sql (@sql) {
+	    $db->alter_create(\$sql);
+            $dbh->do($sql);
+        }
+        $dbh->disconnect;
+	return $db;
+    }
+
+    eval {
+	Test::More::plan(skip_all => "MySQL or SQLite not available for testing");
+    };
+    if ($@) {
+	return undef;
+    }
+    exit(0);
+}
+
+sub dbh {
+    my ($self) = @_;
+    return DBI->connect($self->dsn, "root", "", { RaiseError => 1 });
+}
+
+sub alter_create {
+    my $sqlref = shift;
+    # subclasses can override
+}
+
+sub _load_sql {
+    my($file) = @_;
+    open my $fh, $file or die "Can't open $file: $!";
+    my $sql = do { local $/; <$fh> };
+    close $fh;
+    split /;\s*/, $sql;
+}
+
+package TestDB::MySQL;
+use strict;
+use base 'TestDB';
+
+sub new {
+    my ($class, $name) = @_;
+
+    my $dbh  = eval { _mysql_dbh() } or return undef;
+    my $self = bless {
+	basename  => $name,
+	dbname    => "t_sch_$name",
+	root_dbh  => $dbh,
+    }, $class;
+
+    $dbh->do("DROP DATABASE IF EXISTS $self->{dbname}");
+    $dbh->do("CREATE DATABASE $self->{dbname}");
+    return $self;
+}
+
+sub dsn {
+    my ($self) = @_;
+    return "DBI:mysql:" . $self->{dbname};
+}
+
+sub _mysql_dbh {
+    return DBI->connect("DBI:mysql:mysql", "root", "", { RaiseError => 1 })
+        or die "Couldn't connect to database";
+}
+
+sub alter_create {
+    my ($self, $sqlref) = @_;
+    $$sqlref .= " ENGINE=INNODB\n";
+}
+
+sub schema_file {
+    return "../doc/schema.sql";
+}
+
+package TestDB::SQLite;
+use strict;
+use base 'TestDB';
+
+sub new {
+    return undef;
+}
+
+package TestServer;
+use strict;
+
+sub new {
+    my ($class, $db) = @_;
+    $db ||= TestDB->new || return undef;
+    my $pid = fork;
+    die "out of memory" unless defined $pid;
+    if ($pid) {
+	return bless {
+	    pid => $pid,
+	}, $class;
+    }
+
+    my $bin = "$FindBin::Bin/../bin/schwartzd";
+    die "Not exist: $bin" unless -e $bin;
+    die "Not executable: $bin" unless -x $bin;
+    exec $bin;
+    die "Failed to exec test schwartzd!";
+}
+
+sub gearman_client {
+    my $self = shift;
+    my $cl = Gearman::Client->new;
+    $cl->job_servers('127.0.0.1:7003');
+    return $cl;
+}
+
+sub DESTROY {
+    my $self = shift;
+    if ($self->{pid}) {
+	kill 9, $self->{pid};
+    }
+}
+
+1;
diff --git a/t/api.t b/t/api.t
index bdb5790..f3a77b6 100644
--- a/t/api.t
+++ b/t/api.t
@@ -7,9 +7,9 @@ use warnings;
 require 't/lib/db-common.pl';
 
 use TheSchwartz;
-use Test::More tests => 80;
+use Test::More tests => 108;
 
-run_tests(40, sub {
+run_tests(42, sub {
     foreach my $pfx ("", "testprefix_") {
 
         my $client = test_client(dbs      => ['ts1'],
@@ -30,6 +30,7 @@ run_tests(40, sub {
         my $job = $handle->job;
         isa_ok $job, 'TheSchwartz::Job';
         is $job->funcname, 'feedmajor', 'handle->job gives us the right job';
+        cmp_ok $job->insert_time, '>', 0, 'insert_time is non-zero';
 
         # getting a handle object back
         my $hand2 = $client->handle_from_string($hstr);
@@ -80,6 +81,18 @@ run_tests(40, sub {
             ok($job);
             $handle = $client->insert($job);
             isa_ok $handle, 'TheSchwartz::JobHandle';
+            
+            my $same = $client->lookup_job($handle->as_string);
+            ok $same;
+            isa_ok $same, 'TheSchwartz::Job';
+            is $same->handle->as_string, $handle->as_string;
+            
+        }
+        
+        ## Just test that handles for unknown database croak with an explicit message
+        {
+            eval { $client->lookup_job( ("6a" x 16) ."-666") };
+            ok $@ && unlike($@, qr/No Driver/) && like($@, qr/database.*hash/);
         }
 
         # inserting multiple with wrong method fails
diff --git a/t/priority.t b/t/priority.t
new file mode 100644
index 0000000..5713c9d
--- /dev/null
+++ b/t/priority.t
@@ -0,0 +1,72 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 62;
+
+our $record_expected;
+
+run_tests(31, sub {
+    my $client = test_client(dbs => ['ts1']);
+
+    # Define that we want to use priority selection
+    # limit batch size to 1 so we always process jobs in
+    # priority order
+    $client->set_prioritize(1);
+    $TheSchwartz::FIND_JOB_BATCH_SIZE = 1;
+
+    for (1..10) {
+        my $job = TheSchwartz::Job->new(
+                                        funcname => 'Worker::PriorityTest',
+                                        arg      => { num => $_ },
+                                        ( $_ == 1 ? () : ( priority => $_ ) ),
+                                        );
+        my $h = $client->insert($job);
+        ok($h, "inserted job (priority $_)");
+    }
+
+    $client->reset_abilities;
+    $client->can_do("Worker::PriorityTest");
+
+    Worker::PriorityTest->set_client($client);
+
+    for (1..10) {
+        $record_expected = 11 - $_ == 1 ? undef : 11 - $_;
+        my $rv = eval { $client->work_once; };
+        ok($rv, "did stuff");
+    }
+    my $rv = eval { $client->work_once; };
+    ok(!$rv, "nothing to do now");
+
+    teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::PriorityTest;
+use base 'TheSchwartz::Worker';
+use Test::More;
+
+use strict;
+my $client;
+sub set_client { $client = $_[1]; }
+
+sub work {
+    my ($class, $job) = @_;
+    my $priority = $job->priority;
+    ok((!defined($main::record_expected) && (!defined($priority)))
+        || ($priority == $main::record_expected), "priority matches expected priority");
+    $job->completed;
+}
+
+sub keep_exit_status_for { 20 }  # keep exit status for 20 seconds after on_complete
+
+sub grab_for { 10 }
+
+sub max_retries { 1 }
+
+sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; }
+
diff --git a/t/scoreboard.t b/t/scoreboard.t
new file mode 100644
index 0000000..2cf3948
--- /dev/null
+++ b/t/scoreboard.t
@@ -0,0 +1,78 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use Test::More tests => 20;
+
+use TheSchwartz;
+use File::Spec qw();
+
+run_tests(10, sub {
+    my $pfx = '';
+    my $dbs = ['ts1'];
+
+    setup_dbs({prefix => $pfx}, $dbs);
+
+    my $client = TheSchwartz->new(scoreboard => '/tmp',
+                                  databases => [
+                                          map { {
+                                              dsn  => dsn_for($_),
+                                              user => "root",
+                                              pass => "",
+                                              prefix => $pfx,
+                                          } } @$dbs
+                                          ]);
+
+    my $sb_file = $client->scoreboard;
+    {
+        (undef, my ($sb_dir, $sb_name))  = File::Spec->splitpath($sb_file);
+        ok(-e $sb_dir, "Looking for dir $sb_dir");
+    }
+
+    {
+        my $handle = $client->insert("Worker::Addition",
+                                     {numbers => [1, 2]});
+        my $job    = Worker::Addition->grab_job($client);
+
+        my $rv = eval { Worker::Addition->work_safely($job); };
+        ok(length($@) == 0, 'Finished job with out error');
+
+        unless (ok(-e $sb_file, "Scoreboard file exists")) {
+            return;
+        }
+
+        open(FH, $sb_file) or die "Can't open '$sb_file': $!\n";
+
+        my %info = map { chomp; /^([^=]+)=(.*)$/ } <FH>;
+        close(FH);
+
+        ok($info{pid} == $$, 'Has our PID');
+        ok($info{funcname} eq 'Worker::Addition', 'Has our funcname');
+        ok($info{started} =~ /\d+/, 'Started time is a number');
+        ok($info{started} <= time, 'Started time is in the past');
+        ok($info{arg} =~ /^numbers=ARRAY/, 'Has right args');
+        ok($info{done} =~ /\d+/, 'Job has done time');
+    }
+
+    {
+        $client->DESTROY;
+        ok(! -e $sb_file, 'Scoreboard file goes away when worker finishes');
+    }
+
+    teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::Addition;
+use base 'TheSchwartz::Worker';
+
+sub work {
+    my ($class, $job) = @_;
+
+    # ....
+}
+
+1;
diff --git a/t/server-time.t b/t/server-time.t
new file mode 100644
index 0000000..f619bf3
--- /dev/null
+++ b/t/server-time.t
@@ -0,0 +1,20 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 4;
+
+run_tests(4, sub {
+    my $client = test_client(dbs => ['ts1']);
+
+    my $driver = $client->driver_for( ($client->shuffled_databases)[0] );
+    isa_ok $driver, 'Data::ObjectDriver::Driver::DBI';
+
+    cmp_ok $client->get_server_time($driver), '>', 0, 'got server time';
+
+    teardown_dbs('ts1');
+});

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git



More information about the Pkg-perl-cvs-commits mailing list