[libtheschwartz-perl] 12/43: [svn-upgrade] Integrating new upstream version, libtheschwartz-perl (1.07)
dom at earth.li
dom at earth.li
Mon May 9 20:11:00 UTC 2016
This is an automated email from the git hooks/post-receive script.
dom pushed a commit to branch master
in repository libtheschwartz-perl.
commit 997b0e7fb1d0c687a1f7cb28af426b60017f8690
Author: Dominic Hargreaves <dom at earth.li>
Date: Tue Aug 19 00:01:29 2008 +0000
[svn-upgrade] Integrating new upstream version, libtheschwartz-perl (1.07)
---
CHANGES | 17 ++++
MANIFEST | 23 +++--
META.yml | 2 +-
doc/schema-postgres.sql | 70 +++++++++++++++
extras/thetop | 209 +++++++++++++++++++++++++++++++++++++++++++
lib/TheSchwartz.pm | 200 +++++++++++++++++++++++++++++++++++++----
lib/TheSchwartz/FuncMap.pm | 11 ++-
lib/TheSchwartz/Worker.pm | 6 +-
server/bin/schwartzd | 59 ++++++++++++
server/doc/deps.txt | 25 ++++++
server/doc/protocol.txt | 56 ++++++++++++
server/t/00-start-ping.t | 16 ++++
server/t/01-insert-and-get.t | 45 ++++++++++
server/t/lib/testlib.pl | 174 +++++++++++++++++++++++++++++++++++
t/api.t | 17 +++-
t/priority.t | 72 +++++++++++++++
t/scoreboard.t | 78 ++++++++++++++++
t/server-time.t | 20 +++++
18 files changed, 1073 insertions(+), 27 deletions(-)
diff --git a/CHANGES b/CHANGES
index 5161eb1..4ee5403 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,20 @@
+1.07 (2008-07-31)
+ - bchoate: Updates to support optional prioritization of jobs.
+ - ykerherve: Croak with a nice message id a driver cannot be
+ found for a handle
+
+1.06 (2007-09-07)
+ - Code to allow a 'top' like view of runnin schwartz workers.
+ - include postgres schema in docs. from Michael Zedeler
+ <michael at zedeler.dk> Currently not tested in regression
+ tests, though, so not "officially" supported yet.
+ - start of work on gearman-based schwartz server.
+
+1.05
+
+ - Set TheSchwartz::Job::insert_time to current server time when
+ inserting a new job.
+
1.04 (2007-05-22)
- no code changes, just packaging/dep/test fixes, as pointed out
diff --git a/MANIFEST b/MANIFEST
index d69416c..b1d3798 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -1,10 +1,16 @@
-bin/schwartzmon
CHANGES
+MANIFEST This list of files
+MANIFEST.SKIP
+META.yml
+Makefile.PL
+bin/schwartzmon
doc/http-mappings.txt
doc/notes.txt
+doc/schema-postgres.sql
doc/schema.sql
-extras/check_schwartz
extras/TheSchwartz.spec
+extras/check_schwartz
+extras/thetop
inc/Module/AutoInstall.pm
inc/Module/Install.pm
inc/Module/Install/AutoInstall.pm
@@ -24,10 +30,12 @@ lib/TheSchwartz/FuncMap.pm
lib/TheSchwartz/Job.pm
lib/TheSchwartz/JobHandle.pm
lib/TheSchwartz/Worker.pm
-Makefile.PL
-MANIFEST This list of files
-MANIFEST.SKIP
-META.yml
+server/bin/schwartzd
+server/doc/deps.txt
+server/doc/protocol.txt
+server/t/00-start-ping.t
+server/t/01-insert-and-get.t
+server/t/lib/testlib.pl
t/05-job-ctor.t
t/api.t
t/cleanup.t
@@ -45,8 +53,11 @@ t/lib/db-common.pl
t/parallel-workers.t
t/pod-coverage.t
t/pod.t
+t/priority.t
t/replace-with.t
t/retry-delay.t
t/schema-sqlite.sql
+t/scoreboard.t
+t/server-time.t
t/unique.t
t/work-before-funcids-exist.t
diff --git a/META.yml b/META.yml
index 4ec1fdd..55ad529 100644
--- a/META.yml
+++ b/META.yml
@@ -15,4 +15,4 @@ requires:
Data::ObjectDriver: 0.04
Digest::MD5: 0
Storable: 0
-version: 1.04
+version: 1.07
diff --git a/doc/schema-postgres.sql b/doc/schema-postgres.sql
new file mode 100644
index 0000000..c6ad03f
--- /dev/null
+++ b/doc/schema-postgres.sql
@@ -0,0 +1,70 @@
+-- From: Michael Zedeler <michael at zedeler.dk>
+-- Date: July 30, 2007 7:31:55 AM PDT
+-- To: cpan at sixapart.com
+-- Subject: TheSchwartz database schema for postgresql
+--
+-- Hi.
+--
+-- I couldn't find any useful postgresql compatible schema file for
+-- the tables that TheSchwartz seems to depend on, so I rewrote the
+-- one supplied in the package.
+--
+-- Here it is. Feel free to include it in the next release.
+--
+-- Regards,
+--
+-- Michael.
+
+
+CREATE TABLE funcmap (
+ funcid SERIAL,
+ funcname VARCHAR(255) NOT NULL,
+ UNIQUE(funcname)
+);
+
+CREATE TABLE job (
+ jobid SERIAL,
+ funcid INT NOT NULL,
+ arg BYTEA,
+ uniqkey VARCHAR(255) NULL,
+ insert_time INTEGER,
+ run_after INTEGER NOT NULL,
+ grabbed_until INTEGER NOT NULL,
+ priority SMALLINT,
+ coalesce VARCHAR(255),
+ UNIQUE(funcid, uniqkey)
+);
+
+CREATE INDEX job_funcid_runafter ON job (funcid, run_after);
+
+CREATE INDEX job_funcid_coalesce ON job (funcid, coalesce);
+
+CREATE TABLE note (
+ jobid BIGINT NOT NULL,
+ notekey VARCHAR(255),
+ PRIMARY KEY (jobid, notekey),
+ value BYTEA
+);
+
+CREATE TABLE error (
+ error_time INTEGER NOT NULL,
+ jobid BIGINT NOT NULL,
+ message VARCHAR(255) NOT NULL,
+ funcid INT NOT NULL DEFAULT 0
+);
+
+CREATE INDEX error_funcid_errortime ON error (funcid, error_time);
+CREATE INDEX error_time ON error (error_time);
+CREATE INDEX error_jobid ON error (jobid);
+
+CREATE TABLE exitstatus (
+ jobid BIGINT PRIMARY KEY NOT NULL,
+ funcid INT NOT NULL DEFAULT 0,
+ status SMALLINT,
+ completion_time INTEGER,
+ delete_after INTEGER
+);
+
+CREATE INDEX exitstatus_funcid ON exitstatus (funcid);
+CREATE INDEX exitstatus_deleteafter ON exitstatus (delete_after);
+
diff --git a/extras/thetop b/extras/thetop
new file mode 100755
index 0000000..275b943
--- /dev/null
+++ b/extras/thetop
@@ -0,0 +1,209 @@
+#!/usr/bin/perl -w
+
+=pod
+
+=head1 NAME
+
+thetop - A 'top' utility for the schwartz
+
+=head1 SYNOPSIS
+
+ thetop [--func FORMAT] [--arg FORMAT] [--sort ARGS] [--delay SECS] [--score-dir DIR]
+
+=head1 DESCRIPTION
+
+
+=cut
+
+#--------------------------------------#
+# Dependencies
+
+use strict;
+
+use Getopt::Long;
+use Term::Cap;
+use POSIX;
+
+#--------------------------------------#
+# Global Variables
+
+use vars qw( $OSPEED );
+
+BEGIN {
+ my $termios = POSIX::Termios->new;
+ $termios->getattr;
+ $OSPEED = $termios->getospeed || 9600;
+};
+
+our $TERM = Term::Cap->Tgetent({OSPEED=>$OSPEED});
+
+#--------------------------------------#
+# Main Program
+
+my ($score_dir, $delay, $func_col, @arg_col, $sort);
+
+GetOptions('score-dir=s' => \$score_dir,
+ 'delay|d=s' => \$delay,
+ 'func=s' => \$func_col,
+ 'arg=s' => \@arg_col,
+ 'sort|s=s' => \$sort,
+ );
+
+# Make sure we know where to find the scoreboard files
+unless ($score_dir) {
+ foreach my $d (qw(/var/run /dev/shm /tmp)) {
+ if (-e "$d/theschwartz") {
+ $score_dir = "$d/theschwartz";
+ last;
+ }
+ }
+
+ die "Can't find scoreboard directory. Use '--score-dir'\n"
+ unless $score_dir;
+}
+
+# If we got some formatting instructions for the arg column, parse it out
+my %arg_col_by_func;
+if (@arg_col) {
+ foreach my $a (@arg_col) {
+ if ($a =~ /=/) {
+ my ($func, $fmt) = split('=', $a);
+ $arg_col_by_func{$func} = $fmt;
+ } else {
+ $arg_col_by_func{'__ALL__'} = $a;
+ }
+ }
+}
+
+# Make sure to give a reasonable default for delay
+$delay ||= 3;
+
+# Start reporting
+clr_screen();
+while (1) {
+ report($score_dir, $func_col, \%arg_col_by_func, $sort);
+ sleep($delay);
+ clr_screen();
+}
+
+################################################################################
+
+sub report {
+ my ($dir, $func_col, $arg_col_by_func, $sort) = @_;
+
+ # Find the files available
+ opendir(SD, $dir) or die "Can't read directory '$dir': $!\n";
+ my @files = map { $dir."/$_" } readdir(SD);
+ closedir(SD);
+
+ # Grab the data out of them
+ my @data;
+ foreach my $f (@files) {
+ next unless $f =~ /scoreboard\.[0-9]+$/;
+ open(SF, '<', $f) or die "Can't open score file '$f': $!\n";
+ my %dat = map { chomp; split('=') } <SF>;
+ close(SF);
+
+ $dat{arg_array} = [split(',', $dat{arg}||'')];
+ push @data, \%dat;
+ }
+
+ my $num = scalar(@data);
+ my $width = 80-17-$num;
+ printf("Workers: %d total %${width}s\n\n", $num, scalar localtime);
+ printf("% 5s % 20s % 2s % 7s % 41s\n", 'PID', 'FUNC', 'S', 'TIME', 'ARGS');
+ foreach my $d (sort { order_by($sort, $a, $b) } @data) {
+ my $func_str = fmt_func($d, $func_col);
+
+ printf("% 5s % 20s % 2s % 7s % 41s\n",
+ $d->{pid},
+ $func_str,
+ ($d->{done} ? 'S' : 'R'),
+ fmt_time($d),
+ fmt_arg($d, $arg_col_by_func, $func_str),
+ );
+ }
+}
+
+sub order_by {
+ my ($sort, $a, $b) = @_;
+
+ if ($sort) {
+
+ } else {
+ # Default to push running tasks to the top
+ return ($a->{done}||0) <=> ($b->{done}||0) ||
+ ($a->{started}||0) <=> ($b->{started}||0);
+ }
+}
+
+sub fmt_func {
+ my ($d, $fmt) = @_;
+ my $val = $d->{funcname};
+
+ if ($fmt) {
+ if ($fmt eq 'trim') {
+ $val =~ s/^.+:://g;
+ } else {
+ $val =~ /($fmt)/;
+ $val = $1;
+ }
+ }
+
+ return substr($val, 0, 20),
+}
+
+sub fmt_time {
+ my ($d) = @_;
+ my $secs = ($d->{done}||time) - $d->{started};
+
+ if ($secs < 60) {
+ return sprintf("%02d:%02d", 0, $secs);
+ } elsif ($secs < 3600) {
+ my $min = int($secs/60);
+ $secs = $secs%60;
+ return sprintf("%02d:%02d", $min, $secs);
+ } else {
+ my $hr = int($secs/60/60);
+ my $min = int($secs/60%60);
+ $secs = $secs%60;
+ return sprintf("%d:%02d:%02d", $hr, $min, $secs);
+ }
+}
+
+## Format the arguments by interpreting the args as either a hash or an array
+## and printing out the appropriate element.
+
+sub fmt_arg {
+ my ($d, $arg_col_by_func, $func_str) = @_;
+ my $val = $d->{arg};
+ my $func_orig = $d->{funcname};
+
+ if ($arg_col_by_func) {
+ my $fmt = ($arg_col_by_func{$func_str} ||
+ $arg_col_by_func{$func_orig} ||
+ $arg_col_by_func{'__ALL__'});
+ if ($fmt) {
+ my $arg_array = $d->{arg_array};
+
+ # If its a number treat the args as an array
+ if ($fmt =~ /^[0-9]+$/) {
+ $val = $arg_array->[$fmt];
+ }
+ # otherwise, treat the args as a hash
+ else {
+ # Compensate for odd numbers of args
+ push @$arg_array, undef if scalar(@$arg_array) % 2;
+
+ my %h = @$arg_array;
+ $val = $h{$fmt};
+ }
+ }
+ }
+
+ return substr($val||'', 0, 41),
+}
+
+sub clr_screen {
+ $TERM->Tputs('cl', 1, \*STDOUT);
+}
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index db70422..b0d114f 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,10 +1,10 @@
-# $Id: TheSchwartz.pm 122 2007-05-22 17:11:37Z bradfitz $
+# $Id: TheSchwartz.pm 138 2008-06-16 17:36:19Z ykerherve $
package TheSchwartz;
use strict;
-use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration );
+use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
-our $VERSION = "1.04";
+our $VERSION = "1.07";
use Carp qw( croak );
use Data::ObjectDriver::Errors;
@@ -35,7 +35,9 @@ sub new {
my $databases = delete $args{databases};
$client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
+ $client->set_prioritize(delete $args{prioritize});
$client->set_verbose(delete $args{verbose});
+ $client->set_scoreboard(delete $args{scoreboard});
$client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
croak "unknown options ", join(', ', keys %args) if keys %args;
@@ -72,7 +74,8 @@ sub driver_for {
if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
$driver = $client->{cached_drivers}{$hashdsn}{driver};
} else {
- my $db = $client->{databases}{$hashdsn};
+ my $db = $client->{databases}{$hashdsn}
+ or croak "Ouch, I don't know about a database whose hash is $hashdsn";
$driver = Data::ObjectDriver::Driver::DBI->new(
dsn => $db->{dsn},
username => $db->{user},
@@ -80,8 +83,8 @@ sub driver_for {
($db->{prefix} ? (prefix => $db->{prefix}) : ()),
);
if ($cache_duration) {
- $client->{cached_drivers}{$hashdsn}{driver} = $driver;
- $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
+ $client->{cached_drivers}{$hashdsn}{driver} = $driver;
+ $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
}
}
return $driver;
@@ -165,12 +168,19 @@ sub list_jobs {
} $driver->search('TheSchwartz::Job' => {
funcid => $funcid,
@options
- }, { limit => $limit });
+ }, { limit => $limit,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ });
} else {
push @jobs, $driver->search('TheSchwartz::Job' => {
funcid => $funcid,
@options
- }, { limit => $limit });
+ }, { limit => $limit,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
}
}
return @jobs;
@@ -213,7 +223,11 @@ sub _find_job_with_coalescing {
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
coalesce => { op => $op, value => $coval },
- }, { limit => $FIND_JOB_BATCH_SIZE });
+ }, { limit => $FIND_JOB_BATCH_SIZE,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
};
if ($@) {
unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -252,7 +266,11 @@ sub find_job_for_workers {
funcid => \@ids,
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
- }, { limit => $FIND_JOB_BATCH_SIZE });
+ }, { limit => $FIND_JOB_BATCH_SIZE,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
};
if ($@) {
unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -268,6 +286,13 @@ sub find_job_for_workers {
}
}
+sub get_server_time {
+ my TheSchwartz $client = shift;
+ my($driver) = @_;
+ my $unixtime_sql = $driver->dbd->sql_for_unixtime;
+ return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
+}
+
sub _grab_a_job {
my TheSchwartz $client = shift;
my $hashdsn = shift;
@@ -286,8 +311,7 @@ sub _grab_a_job {
my $worker_class = $job->funcname;
my $old_grabbed_until = $job->grabbed_until;
- my $unixtime_sql = $driver->dbd->sql_for_unixtime;
- my $server_time = $driver->rw_handle->selectrow_array("SELECT $unixtime_sql")
+ my $server_time = $client->get_server_time($driver)
or die "expected a server time";
$job->grabbed_until($server_time + ($worker_class->grab_for || 1));
@@ -329,6 +353,12 @@ sub insert_job_to_driver {
## on the hashed DSN. Also: this might fail, if the database is dead.
$job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
+ ## This is sub-optimal because of clock skew, but something is
+ ## better than a NULL value. And currently, nothing in TheSchwartz
+ ## code itself uses insert_time. TODO: use server time, but without
+ ## having to do a roundtrip just to get the server time.
+ $job->insert_time(time);
+
## Now, insert the job. This also might fail.
$driver->insert($job);
};
@@ -490,7 +520,8 @@ sub work_once {
my $class = $job ? $job->funcname : undef;
if ($job) {
- $job->debug("TheSchwartz::work_once got job of class '$class'");
+ my $priority = $job->priority ? ", priority " . $job->priority : "";
+ $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
} else {
$client->debug("TheSchwartz::work_once found no jobs");
}
@@ -566,6 +597,118 @@ sub set_verbose {
$client->{verbose} = $logger;
}
+sub scoreboard {
+ my TheSchwartz $client = shift;
+
+ return $client->{scoreboard};
+}
+
+sub set_scoreboard {
+ my TheSchwartz $client = shift;
+ my ($dir) = @_;
+
+ return unless $dir;
+
+ # They want the scoreboard but don't care where it goes
+ if (($dir eq '1') or ($dir eq 'on')) {
+ # Find someplace in tmpfs to save this
+ foreach my $d (qw(/var/run /dev/shm)) {
+ $dir = $d;
+ last if -e $dir;
+ }
+ }
+
+ $dir .= '/theschwartz';
+ unless (-e $dir) {
+ mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
+ }
+
+ $client->{scoreboard} = $dir."/scoreboard.$$";
+}
+
+sub start_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ # Don't do anything of (for some reason) we don't have a current job
+ my $job = $client->current_job;
+ return unless $job;
+
+ my $class = $job->funcname;
+
+ open(SB, '>', $scoreboard)
+ or $job->debug("Could not write scoreboard '$scoreboard': $!");
+ print SB join("\n", ("pid=$$",
+ 'funcname='.($class||''),
+ 'started='.($job->grabbed_until-($class->grab_for||1)),
+ 'arg='._serialize_args($job->arg),
+ )
+ ), "\n";
+ close(SB);
+
+ return;
+}
+
+# Quick and dirty serializer. Don't use Data::Dumper because we don't need to
+# recurse indefinitely and we want to truncate the output produced
+sub _serialize_args {
+ my ($args) = @_;
+
+ if (ref $args) {
+ if (ref $args eq 'HASH') {
+ return join ',',
+ map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
+ keys %$args;
+ } elsif (ref $args eq 'ARRAY') {
+ return join ',',
+ map { substr($_||'', 0, 200) }
+ @$args;
+ }
+ } else {
+ return $args;
+ }
+}
+
+sub end_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ my $job = $client->current_job;
+
+ open(SB, '>>', $scoreboard)
+ or $job->debug("Could not append scoreboard '$scoreboard': $!");
+ print SB "done=".time."\n";
+ close(SB);
+
+ return;
+}
+
+sub clean_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ unlink($scoreboard);
+}
+
+sub prioritize {
+ my TheSchwartz $client = shift;
+ return $client->{prioritize};
+}
+
+sub set_prioritize {
+ my TheSchwartz $client = shift;
+ $client->{prioritize} = shift;
+}
+
# current job being worked. so if something dies, work_safely knows which to mark as dead.
sub current_job {
my TheSchwartz $client = shift;
@@ -577,6 +720,15 @@ sub set_current_job {
$client->{current_job} = shift;
}
+DESTROY {
+ foreach my $arg (@_) {
+ # Call 'clean_scoreboard' on TheSchwartz objects
+ if (ref($arg) and $arg->isa('TheSchwartz')) {
+ $arg->clean_scoreboard;
+ }
+ }
+}
+
1;
__END__
@@ -605,14 +757,14 @@ TheSchwartz - reliable job queue
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
-
+
print "Workin' hard or hardly workin'? Hyuk!!\n";
$job->completed();
}
package main;
-
+
my $client = TheSchwartz->new( databases => $DATABASE_INFO );
$client->can_do('MyWorker');
$client->work();
@@ -681,6 +833,12 @@ it is called to log debug messages. If C<verbose> is not a coderef but is some
other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
messages will not be logged.
+=item * C<prioritize>
+
+A value indicating whether to utilize the job 'priority' field when selecting
+jobs to be processed. If unspecified, jobs will always be executed in a
+randomized order.
+
=item * C<driver_cache_expiration>
Optional value to control how long database connections are cached for in seconds.
@@ -765,6 +923,10 @@ Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
Adds the given C<TheSchwartz::Job> objects to one of the client's job
databases. All the given jobs are recorded in I<one> job database.
+=head2 C<$client-E<gt>set_prioritize( $prioritize )>
+
+Set the C<prioritize> value as described in the constructor.
+
=head1 WORKING
The methods of TheSchwartz clients for use in worker processes are:
@@ -790,6 +952,10 @@ Find and perform any jobs C<$client> can do, forever. When no job is available,
the working process will sleep for C<$delay> seconds (or 5, if not specified)
before looking again.
+=head2 C<$client-E<gt>work_on($handle)>
+
+Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
+
=head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
Returns a C<TheSchwartz::Job> for a random job that the client can do. If
@@ -810,6 +976,10 @@ Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
find matching jobs, with all the attendant performance implications for your
job databases.
+=head2 C<$client-E<gt>get_server_time( $driver )>
+
+Given an open driver I<$driver> to a database, gets the current server time from the database.
+
=head1 COPYRIGHT, LICENSE & WARRANTY
This software is Copyright 2007, Six Apart Ltd, cpan at sixapart.com. All
diff --git a/lib/TheSchwartz/FuncMap.pm b/lib/TheSchwartz/FuncMap.pm
index 3179082..65c9ba9 100644
--- a/lib/TheSchwartz/FuncMap.pm
+++ b/lib/TheSchwartz/FuncMap.pm
@@ -1,4 +1,4 @@
-# $Id: FuncMap.pm 42 2006-05-16 05:15:30Z btrott $
+# $Id: FuncMap.pm 136 2008-02-23 01:28:13Z bchoate $
package TheSchwartz::FuncMap;
use strict;
@@ -16,9 +16,16 @@ sub create_or_find {
my $class = shift;
my($driver, $funcname) = @_;
+ ## Attempt to select funcmap record by name. If successful, return
+ ## object, otherwise proceed with insertion and return.
+ my ($map) = $driver->search('TheSchwartz::FuncMap' =>
+ { funcname => $funcname }
+ );
+ return $map if $map;
+
## Attempt to insert a new funcmap row. Since the funcname column is
## UNIQUE, if the row already exists, an exception will be thrown.
- my $map = $class->new;
+ $map = $class->new;
$map->funcname($funcname);
eval { $driver->insert($map) };
diff --git a/lib/TheSchwartz/Worker.pm b/lib/TheSchwartz/Worker.pm
index bbe61ab..2bfeaae 100644
--- a/lib/TheSchwartz/Worker.pm
+++ b/lib/TheSchwartz/Worker.pm
@@ -1,4 +1,4 @@
-# $Id: Worker.pm 106 2006-10-16 21:42:32Z mpaschal $
+# $Id: Worker.pm 134 2007-08-22 22:04:38Z garth $
package TheSchwartz::Worker;
use strict;
@@ -24,6 +24,8 @@ sub work_safely {
$job->debug("Working on $class ...");
$job->set_as_current;
+ $client->start_scoreboard;
+
eval {
$res = $class->work($job);
};
@@ -37,6 +39,8 @@ sub work_safely {
$cjob->failed('Job did not explicitly complete, fail, or get replaced');
}
+ $client->end_scoreboard;
+
# FIXME: this return value is kinda useless/undefined. should we even return anything? any callers? -brad
return $res;
}
diff --git a/server/bin/schwartzd b/server/bin/schwartzd
new file mode 100755
index 0000000..1c34808
--- /dev/null
+++ b/server/bin/schwartzd
@@ -0,0 +1,59 @@
+#!/usr/bin/perl
+
+use strict;
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+use Gearman::Worker;
+use TheSchwartz;
+use JSON::Any;
+
+my $j = JSON::Any->new;
+
+my $ts = TheSchwartz->new(databases => [{
+ dsn => "dbi:mysql:database=t_sch_unnamed",
+ user => "root",
+ pass => "",
+}]);
+
+# FIXME: use embedded gearman server, and workers be child processes
+
+my $worker = Gearman::Worker->new;
+$worker->job_servers('127.0.0.1:7003');
+
+$worker->register_function("insert_job" => handler(\&insert_job));
+$worker->work while 1;
+
+############################################################################
+
+sub handler {
+ my ($code) = @_;
+ return sub {
+ my $job = shift;
+ my $arg = $job->arg;
+ my $jreq = eval { $j->jsonToObj($job->arg) };
+ unless ($jreq) {
+ die "not a valid JSON request";
+ }
+ return $code->($job, $jreq);
+ };
+}
+
+sub insert_job {
+ my ($job, $json) = @_;
+ my $funcname = $json->{funcname} or die "No funcname";
+ my $job = TheSchwartz::Job->new(
+ funcname => $json->{funcname},
+ arg => $json->{arg},
+ uniqkey => $json->{uniqkey},
+ coalesce => $json->{coalesce},
+ );
+ my $h = $ts->insert($job) or
+ die "insert_failure\n";
+ return $h->as_string;
+}
+
diff --git a/server/doc/deps.txt b/server/doc/deps.txt
new file mode 100644
index 0000000..53d5366
--- /dev/null
+++ b/server/doc/deps.txt
@@ -0,0 +1,25 @@
+TheSchwartz (itself)
+Data::ObjectDriver
+ DBI
+ Class::Accessor::Fast (libclass-accessor-perl)
+ Class::Trigger (libclass-trigger-perl)
+ libio-stringy-perl
+ Class::Data::Inheritable
+ List::Util
+ Test::Exception -- build_requires (libtest-exception-perl)
+
+Digest::MD5
+Storable
+
+Gearman::Server
+
+Danga::Socket
+Sys::Syscall
+
+JSON::Any
+JSON
+
+----
+Perl client:
+ Gearman::Client
+ String::CRC32
diff --git a/server/doc/protocol.txt b/server/doc/protocol.txt
new file mode 100644
index 0000000..24134a3
--- /dev/null
+++ b/server/doc/protocol.txt
@@ -0,0 +1,56 @@
+grab_job: {
+ can_do: [@can_do],
+}
+ -> {
+ jobid,
+ job,
+ arg,
+ # failure_count?
+ }
+ (or)
+ -> Nothing
+
+
+insert_job: {
+ job: "foo",
+ arg: "lskdjflksdjflskdf",
+ uniqkey: "blah",
+ run_after: $unix_time,
+ coalesce: "to_foo",
+}
+ -> jobid
+
+
+# atomic insert multiple jobs:
+insert_jobs:
+ [ {...}, {...}, ]
+
+ -> @jobids
+
+mark_completed: {
+ jobid: 5,
+ replace_with: [@jobs], #optional
+}
+ -> { handles => [@handles] }
+
+mark_failed: {
+ jobid: 5,
+ message: "error message",
+ exit_status: 6,
+ retry_in: 80, # optional. if not present, no retry.
+}
+
+get_failure_log: {
+ jobid: 6
+}
+ -> [ {time:2342342,exitstatus ... }, {....}, {...} ]
+
+get_status: {
+ jobid: 6,
+}
+ -> {
+ exitstatus: 0,
+ }
+
+
+
diff --git a/server/t/00-start-ping.t b/server/t/00-start-ping.t
new file mode 100644
index 0000000..32ae203
--- /dev/null
+++ b/server/t/00-start-ping.t
@@ -0,0 +1,16 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+require 't/lib/testlib.pl';
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+
diff --git a/server/t/01-insert-and-get.t b/server/t/01-insert-and-get.t
new file mode 100644
index 0000000..9f934c7
--- /dev/null
+++ b/server/t/01-insert-and-get.t
@@ -0,0 +1,45 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+BEGIN {
+ require 't/lib/testlib.pl';
+}
+use Gearman::Client;
+use Data::Dumper;
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+my $cl = $srv->gearman_client;
+
+my $ret;
+
+# FIXME: test currently requires running gearmand on localhost
+{
+ use IO::Socket::INET;
+ my $sock = IO::Socket::INET->new(PeerAddr => "127.0.0.1:7003");
+ ok($sock, "local gearmand is up for testing")
+ or die "can't continue";
+}
+
+sub do_req {
+ my $req = shift;
+ my $ret = $cl->do_task("insert_job", json($req));
+ return undef unless $ret;
+ return $$ret unless $$ret =~ /^\s*[\[\{]/;
+ return unjson($$ret);
+}
+
+$ret = do_req({
+ funcname => "foo",
+ arg => "fooarg",
+});
+like($ret, qr/^\w+-\d+$/, "got a job handle");
+
diff --git a/server/t/lib/testlib.pl b/server/t/lib/testlib.pl
new file mode 100644
index 0000000..8668800
--- /dev/null
+++ b/server/t/lib/testlib.pl
@@ -0,0 +1,174 @@
+# $Id: db-common.pl 91 2006-08-17 00:39:55Z bradfitz $
+
+use strict;
+use File::Spec;
+use Carp qw(croak);
+use DBI;
+use FindBin;
+use JSON::Any;
+
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+sub json {
+ return JSON::Any->objToJson(shift);
+}
+
+sub unjson {
+ return JSON::Any->json_to_obj(shift);
+}
+
+sub test_client {
+ my %opts = @_;
+ my $dbs = delete $opts{dbs};
+ my $init = delete $opts{init};
+ my $pfx = delete $opts{dbprefix};
+ croak "'dbs' not an ARRAY" unless ref $dbs eq "ARRAY";
+ croak "unknown opts" if %opts;
+ $init = 1 unless defined $init;
+
+ if ($init) {
+ setup_dbs({ prefix => $pfx }, $dbs);
+ }
+
+ return TheSchwartz->new(databases => [
+ map { {
+ dsn => dsn_for($_),
+ user => "root",
+ pass => "",
+ prefix => $pfx,
+ } } @$dbs
+ ]);
+}
+
+package TestDB;
+use strict;
+sub new {
+ my $class = shift;
+ my $name = shift || "unnamed";
+ my $db = TestDB::MySQL->new($name) || TestDB::SQLite->new($name);
+ if ($db) {
+ my $dbh = $db->dbh;
+ my $schema = $db->schema_file;
+ my @sql = _load_sql($schema);
+ for my $sql (@sql) {
+ $db->alter_create(\$sql);
+ $dbh->do($sql);
+ }
+ $dbh->disconnect;
+ return $db;
+ }
+
+ eval {
+ Test::More::plan(skip_all => "MySQL or SQLite not available for testing");
+ };
+ if ($@) {
+ return undef;
+ }
+ exit(0);
+}
+
+sub dbh {
+ my ($self) = @_;
+ return DBI->connect($self->dsn, "root", "", { RaiseError => 1 });
+}
+
+sub alter_create {
+ my $sqlref = shift;
+ # subclasses can override
+}
+
+sub _load_sql {
+ my($file) = @_;
+ open my $fh, $file or die "Can't open $file: $!";
+ my $sql = do { local $/; <$fh> };
+ close $fh;
+ split /;\s*/, $sql;
+}
+
+package TestDB::MySQL;
+use strict;
+use base 'TestDB';
+
+sub new {
+ my ($class, $name) = @_;
+
+ my $dbh = eval { _mysql_dbh() } or return undef;
+ my $self = bless {
+ basename => $name,
+ dbname => "t_sch_$name",
+ root_dbh => $dbh,
+ }, $class;
+
+ $dbh->do("DROP DATABASE IF EXISTS $self->{dbname}");
+ $dbh->do("CREATE DATABASE $self->{dbname}");
+ return $self;
+}
+
+sub dsn {
+ my ($self) = @_;
+ return "DBI:mysql:" . $self->{dbname};
+}
+
+sub _mysql_dbh {
+ return DBI->connect("DBI:mysql:mysql", "root", "", { RaiseError => 1 })
+ or die "Couldn't connect to database";
+}
+
+sub alter_create {
+ my ($self, $sqlref) = @_;
+ $$sqlref .= " ENGINE=INNODB\n";
+}
+
+sub schema_file {
+ return "../doc/schema.sql";
+}
+
+package TestDB::SQLite;
+use strict;
+use base 'TestDB';
+
+sub new {
+ return undef;
+}
+
+package TestServer;
+use strict;
+
+sub new {
+ my ($class, $db) = @_;
+ $db ||= TestDB->new || return undef;
+ my $pid = fork;
+ die "out of memory" unless defined $pid;
+ if ($pid) {
+ return bless {
+ pid => $pid,
+ }, $class;
+ }
+
+ my $bin = "$FindBin::Bin/../bin/schwartzd";
+ die "Not exist: $bin" unless -e $bin;
+ die "Not executable: $bin" unless -x $bin;
+ exec $bin;
+ die "Failed to exec test schwartzd!";
+}
+
+sub gearman_client {
+ my $self = shift;
+ my $cl = Gearman::Client->new;
+ $cl->job_servers('127.0.0.1:7003');
+ return $cl;
+}
+
+sub DESTROY {
+ my $self = shift;
+ if ($self->{pid}) {
+ kill 9, $self->{pid};
+ }
+}
+
+1;
diff --git a/t/api.t b/t/api.t
index bdb5790..f3a77b6 100644
--- a/t/api.t
+++ b/t/api.t
@@ -7,9 +7,9 @@ use warnings;
require 't/lib/db-common.pl';
use TheSchwartz;
-use Test::More tests => 80;
+use Test::More tests => 108;
-run_tests(40, sub {
+run_tests(42, sub {
foreach my $pfx ("", "testprefix_") {
my $client = test_client(dbs => ['ts1'],
@@ -30,6 +30,7 @@ run_tests(40, sub {
my $job = $handle->job;
isa_ok $job, 'TheSchwartz::Job';
is $job->funcname, 'feedmajor', 'handle->job gives us the right job';
+ cmp_ok $job->insert_time, '>', 0, 'insert_time is non-zero';
# getting a handle object back
my $hand2 = $client->handle_from_string($hstr);
@@ -80,6 +81,18 @@ run_tests(40, sub {
ok($job);
$handle = $client->insert($job);
isa_ok $handle, 'TheSchwartz::JobHandle';
+
+ my $same = $client->lookup_job($handle->as_string);
+ ok $same;
+ isa_ok $same, 'TheSchwartz::Job';
+ is $same->handle->as_string, $handle->as_string;
+
+ }
+
+ ## Just test that handles for unknown database croak with an explicit message
+ {
+ eval { $client->lookup_job( ("6a" x 16) ."-666") };
+ ok $@ && unlike($@, qr/No Driver/) && like($@, qr/database.*hash/);
}
# inserting multiple with wrong method fails
diff --git a/t/priority.t b/t/priority.t
new file mode 100644
index 0000000..5713c9d
--- /dev/null
+++ b/t/priority.t
@@ -0,0 +1,72 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 62;
+
+our $record_expected;
+
+run_tests(31, sub {
+ my $client = test_client(dbs => ['ts1']);
+
+ # Define that we want to use priority selection
+ # limit batch size to 1 so we always process jobs in
+ # priority order
+ $client->set_prioritize(1);
+ $TheSchwartz::FIND_JOB_BATCH_SIZE = 1;
+
+ for (1..10) {
+ my $job = TheSchwartz::Job->new(
+ funcname => 'Worker::PriorityTest',
+ arg => { num => $_ },
+ ( $_ == 1 ? () : ( priority => $_ ) ),
+ );
+ my $h = $client->insert($job);
+ ok($h, "inserted job (priority $_)");
+ }
+
+ $client->reset_abilities;
+ $client->can_do("Worker::PriorityTest");
+
+ Worker::PriorityTest->set_client($client);
+
+ for (1..10) {
+ $record_expected = 11 - $_ == 1 ? undef : 11 - $_;
+ my $rv = eval { $client->work_once; };
+ ok($rv, "did stuff");
+ }
+ my $rv = eval { $client->work_once; };
+ ok(!$rv, "nothing to do now");
+
+ teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::PriorityTest;
+use base 'TheSchwartz::Worker';
+use Test::More;
+
+use strict;
+my $client;
+sub set_client { $client = $_[1]; }
+
+sub work {
+ my ($class, $job) = @_;
+ my $priority = $job->priority;
+ ok((!defined($main::record_expected) && (!defined($priority)))
+ || ($priority == $main::record_expected), "priority matches expected priority");
+ $job->completed;
+}
+
+sub keep_exit_status_for { 20 } # keep exit status for 20 seconds after on_complete
+
+sub grab_for { 10 }
+
+sub max_retries { 1 }
+
+sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; }
+
diff --git a/t/scoreboard.t b/t/scoreboard.t
new file mode 100644
index 0000000..2cf3948
--- /dev/null
+++ b/t/scoreboard.t
@@ -0,0 +1,78 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use Test::More tests => 20;
+
+use TheSchwartz;
+use File::Spec qw();
+
+run_tests(10, sub {
+ my $pfx = '';
+ my $dbs = ['ts1'];
+
+ setup_dbs({prefix => $pfx}, $dbs);
+
+ my $client = TheSchwartz->new(scoreboard => '/tmp',
+ databases => [
+ map { {
+ dsn => dsn_for($_),
+ user => "root",
+ pass => "",
+ prefix => $pfx,
+ } } @$dbs
+ ]);
+
+ my $sb_file = $client->scoreboard;
+ {
+ (undef, my ($sb_dir, $sb_name)) = File::Spec->splitpath($sb_file);
+ ok(-e $sb_dir, "Looking for dir $sb_dir");
+ }
+
+ {
+ my $handle = $client->insert("Worker::Addition",
+ {numbers => [1, 2]});
+ my $job = Worker::Addition->grab_job($client);
+
+ my $rv = eval { Worker::Addition->work_safely($job); };
+ ok(length($@) == 0, 'Finished job with out error');
+
+ unless (ok(-e $sb_file, "Scoreboard file exists")) {
+ return;
+ }
+
+ open(FH, $sb_file) or die "Can't open '$sb_file': $!\n";
+
+ my %info = map { chomp; /^([^=]+)=(.*)$/ } <FH>;
+ close(FH);
+
+ ok($info{pid} == $$, 'Has our PID');
+ ok($info{funcname} eq 'Worker::Addition', 'Has our funcname');
+ ok($info{started} =~ /\d+/, 'Started time is a number');
+ ok($info{started} <= time, 'Started time is in the past');
+ ok($info{arg} =~ /^numbers=ARRAY/, 'Has right args');
+ ok($info{done} =~ /\d+/, 'Job has done time');
+ }
+
+ {
+ $client->DESTROY;
+ ok(! -e $sb_file, 'Scoreboard file goes away when worker finishes');
+ }
+
+ teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::Addition;
+use base 'TheSchwartz::Worker';
+
+sub work {
+ my ($class, $job) = @_;
+
+ # ....
+}
+
+1;
diff --git a/t/server-time.t b/t/server-time.t
new file mode 100644
index 0000000..f619bf3
--- /dev/null
+++ b/t/server-time.t
@@ -0,0 +1,20 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 4;
+
+run_tests(4, sub {
+ my $client = test_client(dbs => ['ts1']);
+
+ my $driver = $client->driver_for( ($client->shuffled_databases)[0] );
+ isa_ok $driver, 'Data::ObjectDriver::Driver::DBI';
+
+ cmp_ok $client->get_server_time($driver), '>', 0, 'got server time';
+
+ teardown_dbs('ts1');
+});
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git
More information about the Pkg-perl-cvs-commits
mailing list