[libtheschwartz-perl] 13/43: new upstream
dom at earth.li
dom at earth.li
Mon May 9 20:11:00 UTC 2016
This is an automated email from the git hooks/post-receive script.
dom pushed a commit to branch master
in repository libtheschwartz-perl.
commit d5105deef5e5cff3cab5b10e1c9c6f427ef79d48
Author: Dominic Hargreaves <dom at earth.li>
Date: Tue Aug 19 00:04:44 2008 +0000
new upstream
---
CHANGES | 17 ++++
MANIFEST | 23 +++--
META.yml | 2 +-
debian/changelog | 7 +-
doc/schema-postgres.sql | 70 +++++++++++++++
extras/thetop | 209 +++++++++++++++++++++++++++++++++++++++++++
lib/TheSchwartz.pm | 16 ++--
server/bin/schwartzd | 59 ++++++++++++
server/doc/deps.txt | 25 ++++++
server/doc/protocol.txt | 56 ++++++++++++
server/t/00-start-ping.t | 16 ++++
server/t/01-insert-and-get.t | 45 ++++++++++
server/t/lib/testlib.pl | 174 +++++++++++++++++++++++++++++++++++
t/api.t | 17 +++-
t/priority.t | 72 +++++++++++++++
t/scoreboard.t | 78 ++++++++++++++++
t/server-time.t | 20 +++++
17 files changed, 885 insertions(+), 21 deletions(-)
diff --git a/CHANGES b/CHANGES
index 5161eb1..4ee5403 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,20 @@
+1.07 (2008-07-31)
+ - bchoate: Updates to support optional prioritization of jobs.
+ - ykerherve: Croak with a nice message id a driver cannot be
+ found for a handle
+
+1.06 (2007-09-07)
+ - Code to allow a 'top' like view of runnin schwartz workers.
+ - include postgres schema in docs. from Michael Zedeler
+ <michael at zedeler.dk> Currently not tested in regression
+ tests, though, so not "officially" supported yet.
+ - start of work on gearman-based schwartz server.
+
+1.05
+
+ - Set TheSchwartz::Job::insert_time to current server time when
+ inserting a new job.
+
1.04 (2007-05-22)
- no code changes, just packaging/dep/test fixes, as pointed out
diff --git a/MANIFEST b/MANIFEST
index d69416c..b1d3798 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -1,10 +1,16 @@
-bin/schwartzmon
CHANGES
+MANIFEST This list of files
+MANIFEST.SKIP
+META.yml
+Makefile.PL
+bin/schwartzmon
doc/http-mappings.txt
doc/notes.txt
+doc/schema-postgres.sql
doc/schema.sql
-extras/check_schwartz
extras/TheSchwartz.spec
+extras/check_schwartz
+extras/thetop
inc/Module/AutoInstall.pm
inc/Module/Install.pm
inc/Module/Install/AutoInstall.pm
@@ -24,10 +30,12 @@ lib/TheSchwartz/FuncMap.pm
lib/TheSchwartz/Job.pm
lib/TheSchwartz/JobHandle.pm
lib/TheSchwartz/Worker.pm
-Makefile.PL
-MANIFEST This list of files
-MANIFEST.SKIP
-META.yml
+server/bin/schwartzd
+server/doc/deps.txt
+server/doc/protocol.txt
+server/t/00-start-ping.t
+server/t/01-insert-and-get.t
+server/t/lib/testlib.pl
t/05-job-ctor.t
t/api.t
t/cleanup.t
@@ -45,8 +53,11 @@ t/lib/db-common.pl
t/parallel-workers.t
t/pod-coverage.t
t/pod.t
+t/priority.t
t/replace-with.t
t/retry-delay.t
t/schema-sqlite.sql
+t/scoreboard.t
+t/server-time.t
t/unique.t
t/work-before-funcids-exist.t
diff --git a/META.yml b/META.yml
index 4ec1fdd..55ad529 100644
--- a/META.yml
+++ b/META.yml
@@ -15,4 +15,4 @@ requires:
Data::ObjectDriver: 0.04
Digest::MD5: 0
Storable: 0
-version: 1.04
+version: 1.07
diff --git a/debian/changelog b/debian/changelog
index be49760..d2335d5 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,9 +1,8 @@
-libtheschwartz-perl (1.04-2~test.1) unstable; urgency=low
+libtheschwartz-perl (1.07-1) UNRELEASED; urgency=low
- * Include versions of lib/* from upstream SVN at 136
- * Temporarily disable tests pod-coverage.t and unique.t
+ * New upstream release
- -- Dominic Hargreaves <dom at earth.li> Sun, 2 Mar 2008 22:28:26 +0000
+ -- Dominic Hargreaves <dom at earth.li> Tue, 19 Aug 2008 01:04:35 +0100
libtheschwartz-perl (1.04-1) unstable; urgency=low
diff --git a/doc/schema-postgres.sql b/doc/schema-postgres.sql
new file mode 100644
index 0000000..c6ad03f
--- /dev/null
+++ b/doc/schema-postgres.sql
@@ -0,0 +1,70 @@
+-- From: Michael Zedeler <michael at zedeler.dk>
+-- Date: July 30, 2007 7:31:55 AM PDT
+-- To: cpan at sixapart.com
+-- Subject: TheSchwartz database schema for postgresql
+--
+-- Hi.
+--
+-- I couldn't find any useful postgresql compatible schema file for
+-- the tables that TheSchwartz seems to depend on, so I rewrote the
+-- one supplied in the package.
+--
+-- Here it is. Feel free to include it in the next release.
+--
+-- Regards,
+--
+-- Michael.
+
+
+CREATE TABLE funcmap (
+ funcid SERIAL,
+ funcname VARCHAR(255) NOT NULL,
+ UNIQUE(funcname)
+);
+
+CREATE TABLE job (
+ jobid SERIAL,
+ funcid INT NOT NULL,
+ arg BYTEA,
+ uniqkey VARCHAR(255) NULL,
+ insert_time INTEGER,
+ run_after INTEGER NOT NULL,
+ grabbed_until INTEGER NOT NULL,
+ priority SMALLINT,
+ coalesce VARCHAR(255),
+ UNIQUE(funcid, uniqkey)
+);
+
+CREATE INDEX job_funcid_runafter ON job (funcid, run_after);
+
+CREATE INDEX job_funcid_coalesce ON job (funcid, coalesce);
+
+CREATE TABLE note (
+ jobid BIGINT NOT NULL,
+ notekey VARCHAR(255),
+ PRIMARY KEY (jobid, notekey),
+ value BYTEA
+);
+
+CREATE TABLE error (
+ error_time INTEGER NOT NULL,
+ jobid BIGINT NOT NULL,
+ message VARCHAR(255) NOT NULL,
+ funcid INT NOT NULL DEFAULT 0
+);
+
+CREATE INDEX error_funcid_errortime ON error (funcid, error_time);
+CREATE INDEX error_time ON error (error_time);
+CREATE INDEX error_jobid ON error (jobid);
+
+CREATE TABLE exitstatus (
+ jobid BIGINT PRIMARY KEY NOT NULL,
+ funcid INT NOT NULL DEFAULT 0,
+ status SMALLINT,
+ completion_time INTEGER,
+ delete_after INTEGER
+);
+
+CREATE INDEX exitstatus_funcid ON exitstatus (funcid);
+CREATE INDEX exitstatus_deleteafter ON exitstatus (delete_after);
+
diff --git a/extras/thetop b/extras/thetop
new file mode 100755
index 0000000..275b943
--- /dev/null
+++ b/extras/thetop
@@ -0,0 +1,209 @@
+#!/usr/bin/perl -w
+
+=pod
+
+=head1 NAME
+
+thetop - A 'top' utility for the schwartz
+
+=head1 SYNOPSIS
+
+ thetop [--func FORMAT] [--arg FORMAT] [--sort ARGS] [--delay SECS] [--score-dir DIR]
+
+=head1 DESCRIPTION
+
+
+=cut
+
+#--------------------------------------#
+# Dependencies
+
+use strict;
+
+use Getopt::Long;
+use Term::Cap;
+use POSIX;
+
+#--------------------------------------#
+# Global Variables
+
+use vars qw( $OSPEED );
+
+BEGIN {
+ my $termios = POSIX::Termios->new;
+ $termios->getattr;
+ $OSPEED = $termios->getospeed || 9600;
+};
+
+our $TERM = Term::Cap->Tgetent({OSPEED=>$OSPEED});
+
+#--------------------------------------#
+# Main Program
+
+my ($score_dir, $delay, $func_col, @arg_col, $sort);
+
+GetOptions('score-dir=s' => \$score_dir,
+ 'delay|d=s' => \$delay,
+ 'func=s' => \$func_col,
+ 'arg=s' => \@arg_col,
+ 'sort|s=s' => \$sort,
+ );
+
+# Make sure we know where to find the scoreboard files
+unless ($score_dir) {
+ foreach my $d (qw(/var/run /dev/shm /tmp)) {
+ if (-e "$d/theschwartz") {
+ $score_dir = "$d/theschwartz";
+ last;
+ }
+ }
+
+ die "Can't find scoreboard directory. Use '--score-dir'\n"
+ unless $score_dir;
+}
+
+# If we got some formatting instructions for the arg column, parse it out
+my %arg_col_by_func;
+if (@arg_col) {
+ foreach my $a (@arg_col) {
+ if ($a =~ /=/) {
+ my ($func, $fmt) = split('=', $a);
+ $arg_col_by_func{$func} = $fmt;
+ } else {
+ $arg_col_by_func{'__ALL__'} = $a;
+ }
+ }
+}
+
+# Make sure to give a reasonable default for delay
+$delay ||= 3;
+
+# Start reporting
+clr_screen();
+while (1) {
+ report($score_dir, $func_col, \%arg_col_by_func, $sort);
+ sleep($delay);
+ clr_screen();
+}
+
+################################################################################
+
+sub report {
+ my ($dir, $func_col, $arg_col_by_func, $sort) = @_;
+
+ # Find the files available
+ opendir(SD, $dir) or die "Can't read directory '$dir': $!\n";
+ my @files = map { $dir."/$_" } readdir(SD);
+ closedir(SD);
+
+ # Grab the data out of them
+ my @data;
+ foreach my $f (@files) {
+ next unless $f =~ /scoreboard\.[0-9]+$/;
+ open(SF, '<', $f) or die "Can't open score file '$f': $!\n";
+ my %dat = map { chomp; split('=') } <SF>;
+ close(SF);
+
+ $dat{arg_array} = [split(',', $dat{arg}||'')];
+ push @data, \%dat;
+ }
+
+ my $num = scalar(@data);
+ my $width = 80-17-$num;
+ printf("Workers: %d total %${width}s\n\n", $num, scalar localtime);
+ printf("% 5s % 20s % 2s % 7s % 41s\n", 'PID', 'FUNC', 'S', 'TIME', 'ARGS');
+ foreach my $d (sort { order_by($sort, $a, $b) } @data) {
+ my $func_str = fmt_func($d, $func_col);
+
+ printf("% 5s % 20s % 2s % 7s % 41s\n",
+ $d->{pid},
+ $func_str,
+ ($d->{done} ? 'S' : 'R'),
+ fmt_time($d),
+ fmt_arg($d, $arg_col_by_func, $func_str),
+ );
+ }
+}
+
+sub order_by {
+ my ($sort, $a, $b) = @_;
+
+ if ($sort) {
+
+ } else {
+ # Default to push running tasks to the top
+ return ($a->{done}||0) <=> ($b->{done}||0) ||
+ ($a->{started}||0) <=> ($b->{started}||0);
+ }
+}
+
+sub fmt_func {
+ my ($d, $fmt) = @_;
+ my $val = $d->{funcname};
+
+ if ($fmt) {
+ if ($fmt eq 'trim') {
+ $val =~ s/^.+:://g;
+ } else {
+ $val =~ /($fmt)/;
+ $val = $1;
+ }
+ }
+
+ return substr($val, 0, 20),
+}
+
+sub fmt_time {
+ my ($d) = @_;
+ my $secs = ($d->{done}||time) - $d->{started};
+
+ if ($secs < 60) {
+ return sprintf("%02d:%02d", 0, $secs);
+ } elsif ($secs < 3600) {
+ my $min = int($secs/60);
+ $secs = $secs%60;
+ return sprintf("%02d:%02d", $min, $secs);
+ } else {
+ my $hr = int($secs/60/60);
+ my $min = int($secs/60%60);
+ $secs = $secs%60;
+ return sprintf("%d:%02d:%02d", $hr, $min, $secs);
+ }
+}
+
+## Format the arguments by interpreting the args as either a hash or an array
+## and printing out the appropriate element.
+
+sub fmt_arg {
+ my ($d, $arg_col_by_func, $func_str) = @_;
+ my $val = $d->{arg};
+ my $func_orig = $d->{funcname};
+
+ if ($arg_col_by_func) {
+ my $fmt = ($arg_col_by_func{$func_str} ||
+ $arg_col_by_func{$func_orig} ||
+ $arg_col_by_func{'__ALL__'});
+ if ($fmt) {
+ my $arg_array = $d->{arg_array};
+
+ # If its a number treat the args as an array
+ if ($fmt =~ /^[0-9]+$/) {
+ $val = $arg_array->[$fmt];
+ }
+ # otherwise, treat the args as a hash
+ else {
+ # Compensate for odd numbers of args
+ push @$arg_array, undef if scalar(@$arg_array) % 2;
+
+ my %h = @$arg_array;
+ $val = $h{$fmt};
+ }
+ }
+ }
+
+ return substr($val||'', 0, 41),
+}
+
+sub clr_screen {
+ $TERM->Tputs('cl', 1, \*STDOUT);
+}
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index a8076ef..b0d114f 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,10 +1,10 @@
-# $Id: TheSchwartz.pm 136 2008-02-23 01:28:13Z bchoate $
+# $Id: TheSchwartz.pm 138 2008-06-16 17:36:19Z ykerherve $
package TheSchwartz;
use strict;
use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
-our $VERSION = "1.04";
+our $VERSION = "1.07";
use Carp qw( croak );
use Data::ObjectDriver::Errors;
@@ -74,7 +74,8 @@ sub driver_for {
if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
$driver = $client->{cached_drivers}{$hashdsn}{driver};
} else {
- my $db = $client->{databases}{$hashdsn};
+ my $db = $client->{databases}{$hashdsn}
+ or croak "Ouch, I don't know about a database whose hash is $hashdsn";
$driver = Data::ObjectDriver::Driver::DBI->new(
dsn => $db->{dsn},
username => $db->{user},
@@ -922,6 +923,10 @@ Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
Adds the given C<TheSchwartz::Job> objects to one of the client's job
databases. All the given jobs are recorded in I<one> job database.
+=head2 C<$client-E<gt>set_prioritize( $prioritize )>
+
+Set the C<prioritize> value as described in the constructor.
+
=head1 WORKING
The methods of TheSchwartz clients for use in worker processes are:
@@ -936,11 +941,6 @@ given ability.
Find and perform one job C<$client> can do.
-=head2 C<$client-E<gt>work_on($handle_id)>
-
-Finds a C<TheSchwartz::Job> corresponding to the given handle ID and returns the
-result of L</work_once> on it, or, if the handle is not found, returns false.
-
=head2 C<$client-E<gt>work_until_done()>
Find and perform jobs C<$client> can do until no more such jobs are found in
diff --git a/server/bin/schwartzd b/server/bin/schwartzd
new file mode 100755
index 0000000..1c34808
--- /dev/null
+++ b/server/bin/schwartzd
@@ -0,0 +1,59 @@
+#!/usr/bin/perl
+
+use strict;
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+use Gearman::Worker;
+use TheSchwartz;
+use JSON::Any;
+
+my $j = JSON::Any->new;
+
+my $ts = TheSchwartz->new(databases => [{
+ dsn => "dbi:mysql:database=t_sch_unnamed",
+ user => "root",
+ pass => "",
+}]);
+
+# FIXME: use embedded gearman server, and workers be child processes
+
+my $worker = Gearman::Worker->new;
+$worker->job_servers('127.0.0.1:7003');
+
+$worker->register_function("insert_job" => handler(\&insert_job));
+$worker->work while 1;
+
+############################################################################
+
+sub handler {
+ my ($code) = @_;
+ return sub {
+ my $job = shift;
+ my $arg = $job->arg;
+ my $jreq = eval { $j->jsonToObj($job->arg) };
+ unless ($jreq) {
+ die "not a valid JSON request";
+ }
+ return $code->($job, $jreq);
+ };
+}
+
+sub insert_job {
+ my ($job, $json) = @_;
+ my $funcname = $json->{funcname} or die "No funcname";
+ my $job = TheSchwartz::Job->new(
+ funcname => $json->{funcname},
+ arg => $json->{arg},
+ uniqkey => $json->{uniqkey},
+ coalesce => $json->{coalesce},
+ );
+ my $h = $ts->insert($job) or
+ die "insert_failure\n";
+ return $h->as_string;
+}
+
diff --git a/server/doc/deps.txt b/server/doc/deps.txt
new file mode 100644
index 0000000..53d5366
--- /dev/null
+++ b/server/doc/deps.txt
@@ -0,0 +1,25 @@
+TheSchwartz (itself)
+Data::ObjectDriver
+ DBI
+ Class::Accessor::Fast (libclass-accessor-perl)
+ Class::Trigger (libclass-trigger-perl)
+ libio-stringy-perl
+ Class::Data::Inheritable
+ List::Util
+ Test::Exception -- build_requires (libtest-exception-perl)
+
+Digest::MD5
+Storable
+
+Gearman::Server
+
+Danga::Socket
+Sys::Syscall
+
+JSON::Any
+JSON
+
+----
+Perl client:
+ Gearman::Client
+ String::CRC32
diff --git a/server/doc/protocol.txt b/server/doc/protocol.txt
new file mode 100644
index 0000000..24134a3
--- /dev/null
+++ b/server/doc/protocol.txt
@@ -0,0 +1,56 @@
+grab_job: {
+ can_do: [@can_do],
+}
+ -> {
+ jobid,
+ job,
+ arg,
+ # failure_count?
+ }
+ (or)
+ -> Nothing
+
+
+insert_job: {
+ job: "foo",
+ arg: "lskdjflksdjflskdf",
+ uniqkey: "blah",
+ run_after: $unix_time,
+ coalesce: "to_foo",
+}
+ -> jobid
+
+
+# atomic insert multiple jobs:
+insert_jobs:
+ [ {...}, {...}, ]
+
+ -> @jobids
+
+mark_completed: {
+ jobid: 5,
+ replace_with: [@jobs], #optional
+}
+ -> { handles => [@handles] }
+
+mark_failed: {
+ jobid: 5,
+ message: "error message",
+ exit_status: 6,
+ retry_in: 80, # optional. if not present, no retry.
+}
+
+get_failure_log: {
+ jobid: 6
+}
+ -> [ {time:2342342,exitstatus ... }, {....}, {...} ]
+
+get_status: {
+ jobid: 6,
+}
+ -> {
+ exitstatus: 0,
+ }
+
+
+
diff --git a/server/t/00-start-ping.t b/server/t/00-start-ping.t
new file mode 100644
index 0000000..32ae203
--- /dev/null
+++ b/server/t/00-start-ping.t
@@ -0,0 +1,16 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+require 't/lib/testlib.pl';
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+
diff --git a/server/t/01-insert-and-get.t b/server/t/01-insert-and-get.t
new file mode 100644
index 0000000..9f934c7
--- /dev/null
+++ b/server/t/01-insert-and-get.t
@@ -0,0 +1,45 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+BEGIN {
+ require 't/lib/testlib.pl';
+}
+use Gearman::Client;
+use Data::Dumper;
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+my $cl = $srv->gearman_client;
+
+my $ret;
+
+# FIXME: test currently requires running gearmand on localhost
+{
+ use IO::Socket::INET;
+ my $sock = IO::Socket::INET->new(PeerAddr => "127.0.0.1:7003");
+ ok($sock, "local gearmand is up for testing")
+ or die "can't continue";
+}
+
+sub do_req {
+ my $req = shift;
+ my $ret = $cl->do_task("insert_job", json($req));
+ return undef unless $ret;
+ return $$ret unless $$ret =~ /^\s*[\[\{]/;
+ return unjson($$ret);
+}
+
+$ret = do_req({
+ funcname => "foo",
+ arg => "fooarg",
+});
+like($ret, qr/^\w+-\d+$/, "got a job handle");
+
diff --git a/server/t/lib/testlib.pl b/server/t/lib/testlib.pl
new file mode 100644
index 0000000..8668800
--- /dev/null
+++ b/server/t/lib/testlib.pl
@@ -0,0 +1,174 @@
+# $Id: db-common.pl 91 2006-08-17 00:39:55Z bradfitz $
+
+use strict;
+use File::Spec;
+use Carp qw(croak);
+use DBI;
+use FindBin;
+use JSON::Any;
+
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+sub json {
+ return JSON::Any->objToJson(shift);
+}
+
+sub unjson {
+ return JSON::Any->json_to_obj(shift);
+}
+
+sub test_client {
+ my %opts = @_;
+ my $dbs = delete $opts{dbs};
+ my $init = delete $opts{init};
+ my $pfx = delete $opts{dbprefix};
+ croak "'dbs' not an ARRAY" unless ref $dbs eq "ARRAY";
+ croak "unknown opts" if %opts;
+ $init = 1 unless defined $init;
+
+ if ($init) {
+ setup_dbs({ prefix => $pfx }, $dbs);
+ }
+
+ return TheSchwartz->new(databases => [
+ map { {
+ dsn => dsn_for($_),
+ user => "root",
+ pass => "",
+ prefix => $pfx,
+ } } @$dbs
+ ]);
+}
+
+package TestDB;
+use strict;
+sub new {
+ my $class = shift;
+ my $name = shift || "unnamed";
+ my $db = TestDB::MySQL->new($name) || TestDB::SQLite->new($name);
+ if ($db) {
+ my $dbh = $db->dbh;
+ my $schema = $db->schema_file;
+ my @sql = _load_sql($schema);
+ for my $sql (@sql) {
+ $db->alter_create(\$sql);
+ $dbh->do($sql);
+ }
+ $dbh->disconnect;
+ return $db;
+ }
+
+ eval {
+ Test::More::plan(skip_all => "MySQL or SQLite not available for testing");
+ };
+ if ($@) {
+ return undef;
+ }
+ exit(0);
+}
+
+sub dbh {
+ my ($self) = @_;
+ return DBI->connect($self->dsn, "root", "", { RaiseError => 1 });
+}
+
+sub alter_create {
+ my $sqlref = shift;
+ # subclasses can override
+}
+
+sub _load_sql {
+ my($file) = @_;
+ open my $fh, $file or die "Can't open $file: $!";
+ my $sql = do { local $/; <$fh> };
+ close $fh;
+ split /;\s*/, $sql;
+}
+
+package TestDB::MySQL;
+use strict;
+use base 'TestDB';
+
+sub new {
+ my ($class, $name) = @_;
+
+ my $dbh = eval { _mysql_dbh() } or return undef;
+ my $self = bless {
+ basename => $name,
+ dbname => "t_sch_$name",
+ root_dbh => $dbh,
+ }, $class;
+
+ $dbh->do("DROP DATABASE IF EXISTS $self->{dbname}");
+ $dbh->do("CREATE DATABASE $self->{dbname}");
+ return $self;
+}
+
+sub dsn {
+ my ($self) = @_;
+ return "DBI:mysql:" . $self->{dbname};
+}
+
+sub _mysql_dbh {
+ return DBI->connect("DBI:mysql:mysql", "root", "", { RaiseError => 1 })
+ or die "Couldn't connect to database";
+}
+
+sub alter_create {
+ my ($self, $sqlref) = @_;
+ $$sqlref .= " ENGINE=INNODB\n";
+}
+
+sub schema_file {
+ return "../doc/schema.sql";
+}
+
+package TestDB::SQLite;
+use strict;
+use base 'TestDB';
+
+sub new {
+ return undef;
+}
+
+package TestServer;
+use strict;
+
+sub new {
+ my ($class, $db) = @_;
+ $db ||= TestDB->new || return undef;
+ my $pid = fork;
+ die "out of memory" unless defined $pid;
+ if ($pid) {
+ return bless {
+ pid => $pid,
+ }, $class;
+ }
+
+ my $bin = "$FindBin::Bin/../bin/schwartzd";
+ die "Not exist: $bin" unless -e $bin;
+ die "Not executable: $bin" unless -x $bin;
+ exec $bin;
+ die "Failed to exec test schwartzd!";
+}
+
+sub gearman_client {
+ my $self = shift;
+ my $cl = Gearman::Client->new;
+ $cl->job_servers('127.0.0.1:7003');
+ return $cl;
+}
+
+sub DESTROY {
+ my $self = shift;
+ if ($self->{pid}) {
+ kill 9, $self->{pid};
+ }
+}
+
+1;
diff --git a/t/api.t b/t/api.t
index bdb5790..f3a77b6 100644
--- a/t/api.t
+++ b/t/api.t
@@ -7,9 +7,9 @@ use warnings;
require 't/lib/db-common.pl';
use TheSchwartz;
-use Test::More tests => 80;
+use Test::More tests => 108;
-run_tests(40, sub {
+run_tests(42, sub {
foreach my $pfx ("", "testprefix_") {
my $client = test_client(dbs => ['ts1'],
@@ -30,6 +30,7 @@ run_tests(40, sub {
my $job = $handle->job;
isa_ok $job, 'TheSchwartz::Job';
is $job->funcname, 'feedmajor', 'handle->job gives us the right job';
+ cmp_ok $job->insert_time, '>', 0, 'insert_time is non-zero';
# getting a handle object back
my $hand2 = $client->handle_from_string($hstr);
@@ -80,6 +81,18 @@ run_tests(40, sub {
ok($job);
$handle = $client->insert($job);
isa_ok $handle, 'TheSchwartz::JobHandle';
+
+ my $same = $client->lookup_job($handle->as_string);
+ ok $same;
+ isa_ok $same, 'TheSchwartz::Job';
+ is $same->handle->as_string, $handle->as_string;
+
+ }
+
+ ## Just test that handles for unknown database croak with an explicit message
+ {
+ eval { $client->lookup_job( ("6a" x 16) ."-666") };
+ ok $@ && unlike($@, qr/No Driver/) && like($@, qr/database.*hash/);
}
# inserting multiple with wrong method fails
diff --git a/t/priority.t b/t/priority.t
new file mode 100644
index 0000000..5713c9d
--- /dev/null
+++ b/t/priority.t
@@ -0,0 +1,72 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 62;
+
+our $record_expected;
+
+run_tests(31, sub {
+ my $client = test_client(dbs => ['ts1']);
+
+ # Define that we want to use priority selection
+ # limit batch size to 1 so we always process jobs in
+ # priority order
+ $client->set_prioritize(1);
+ $TheSchwartz::FIND_JOB_BATCH_SIZE = 1;
+
+ for (1..10) {
+ my $job = TheSchwartz::Job->new(
+ funcname => 'Worker::PriorityTest',
+ arg => { num => $_ },
+ ( $_ == 1 ? () : ( priority => $_ ) ),
+ );
+ my $h = $client->insert($job);
+ ok($h, "inserted job (priority $_)");
+ }
+
+ $client->reset_abilities;
+ $client->can_do("Worker::PriorityTest");
+
+ Worker::PriorityTest->set_client($client);
+
+ for (1..10) {
+ $record_expected = 11 - $_ == 1 ? undef : 11 - $_;
+ my $rv = eval { $client->work_once; };
+ ok($rv, "did stuff");
+ }
+ my $rv = eval { $client->work_once; };
+ ok(!$rv, "nothing to do now");
+
+ teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::PriorityTest;
+use base 'TheSchwartz::Worker';
+use Test::More;
+
+use strict;
+my $client;
+sub set_client { $client = $_[1]; }
+
+sub work {
+ my ($class, $job) = @_;
+ my $priority = $job->priority;
+ ok((!defined($main::record_expected) && (!defined($priority)))
+ || ($priority == $main::record_expected), "priority matches expected priority");
+ $job->completed;
+}
+
+sub keep_exit_status_for { 20 } # keep exit status for 20 seconds after on_complete
+
+sub grab_for { 10 }
+
+sub max_retries { 1 }
+
+sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; }
+
diff --git a/t/scoreboard.t b/t/scoreboard.t
new file mode 100644
index 0000000..2cf3948
--- /dev/null
+++ b/t/scoreboard.t
@@ -0,0 +1,78 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use Test::More tests => 20;
+
+use TheSchwartz;
+use File::Spec qw();
+
+run_tests(10, sub {
+ my $pfx = '';
+ my $dbs = ['ts1'];
+
+ setup_dbs({prefix => $pfx}, $dbs);
+
+ my $client = TheSchwartz->new(scoreboard => '/tmp',
+ databases => [
+ map { {
+ dsn => dsn_for($_),
+ user => "root",
+ pass => "",
+ prefix => $pfx,
+ } } @$dbs
+ ]);
+
+ my $sb_file = $client->scoreboard;
+ {
+ (undef, my ($sb_dir, $sb_name)) = File::Spec->splitpath($sb_file);
+ ok(-e $sb_dir, "Looking for dir $sb_dir");
+ }
+
+ {
+ my $handle = $client->insert("Worker::Addition",
+ {numbers => [1, 2]});
+ my $job = Worker::Addition->grab_job($client);
+
+ my $rv = eval { Worker::Addition->work_safely($job); };
+ ok(length($@) == 0, 'Finished job with out error');
+
+ unless (ok(-e $sb_file, "Scoreboard file exists")) {
+ return;
+ }
+
+ open(FH, $sb_file) or die "Can't open '$sb_file': $!\n";
+
+ my %info = map { chomp; /^([^=]+)=(.*)$/ } <FH>;
+ close(FH);
+
+ ok($info{pid} == $$, 'Has our PID');
+ ok($info{funcname} eq 'Worker::Addition', 'Has our funcname');
+ ok($info{started} =~ /\d+/, 'Started time is a number');
+ ok($info{started} <= time, 'Started time is in the past');
+ ok($info{arg} =~ /^numbers=ARRAY/, 'Has right args');
+ ok($info{done} =~ /\d+/, 'Job has done time');
+ }
+
+ {
+ $client->DESTROY;
+ ok(! -e $sb_file, 'Scoreboard file goes away when worker finishes');
+ }
+
+ teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::Addition;
+use base 'TheSchwartz::Worker';
+
+sub work {
+ my ($class, $job) = @_;
+
+ # ....
+}
+
+1;
diff --git a/t/server-time.t b/t/server-time.t
new file mode 100644
index 0000000..f619bf3
--- /dev/null
+++ b/t/server-time.t
@@ -0,0 +1,20 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 4;
+
+run_tests(4, sub {
+ my $client = test_client(dbs => ['ts1']);
+
+ my $driver = $client->driver_for( ($client->shuffled_databases)[0] );
+ isa_ok $driver, 'Data::ObjectDriver::Driver::DBI';
+
+ cmp_ok $client->get_server_time($driver), '>', 0, 'got server time';
+
+ teardown_dbs('ts1');
+});
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git
More information about the Pkg-perl-cvs-commits
mailing list