[libtheschwartz-perl] 13/43: new upstream

dom at earth.li dom at earth.li
Mon May 9 20:11:00 UTC 2016


This is an automated email from the git hooks/post-receive script.

dom pushed a commit to branch master
in repository libtheschwartz-perl.

commit d5105deef5e5cff3cab5b10e1c9c6f427ef79d48
Author: Dominic Hargreaves <dom at earth.li>
Date:   Tue Aug 19 00:04:44 2008 +0000

    new upstream
---
 CHANGES                      |  17 ++++
 MANIFEST                     |  23 +++--
 META.yml                     |   2 +-
 debian/changelog             |   7 +-
 doc/schema-postgres.sql      |  70 +++++++++++++++
 extras/thetop                | 209 +++++++++++++++++++++++++++++++++++++++++++
 lib/TheSchwartz.pm           |  16 ++--
 server/bin/schwartzd         |  59 ++++++++++++
 server/doc/deps.txt          |  25 ++++++
 server/doc/protocol.txt      |  56 ++++++++++++
 server/t/00-start-ping.t     |  16 ++++
 server/t/01-insert-and-get.t |  45 ++++++++++
 server/t/lib/testlib.pl      | 174 +++++++++++++++++++++++++++++++++++
 t/api.t                      |  17 +++-
 t/priority.t                 |  72 +++++++++++++++
 t/scoreboard.t               |  78 ++++++++++++++++
 t/server-time.t              |  20 +++++
 17 files changed, 885 insertions(+), 21 deletions(-)

diff --git a/CHANGES b/CHANGES
index 5161eb1..4ee5403 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,20 @@
+1.07 (2008-07-31)
+    - bchoate: Updates to support optional prioritization of jobs.
+    - ykerherve: Croak with a nice message id a driver cannot be
+      found for a handle
+
+1.06 (2007-09-07)
+    - Code to allow a 'top' like view of runnin schwartz workers.
+    - include postgres schema in docs.  from Michael Zedeler
+      <michael at zedeler.dk>  Currently not tested in regression
+      tests, though, so not "officially" supported yet.
+    - start of work on gearman-based schwartz server.
+
+1.05
+
+    - Set TheSchwartz::Job::insert_time to current server time when
+      inserting a new job.
+
 1.04 (2007-05-22)
 
     - no code changes, just packaging/dep/test fixes, as pointed out
diff --git a/MANIFEST b/MANIFEST
index d69416c..b1d3798 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -1,10 +1,16 @@
-bin/schwartzmon
 CHANGES
+MANIFEST			This list of files
+MANIFEST.SKIP
+META.yml
+Makefile.PL
+bin/schwartzmon
 doc/http-mappings.txt
 doc/notes.txt
+doc/schema-postgres.sql
 doc/schema.sql
-extras/check_schwartz
 extras/TheSchwartz.spec
+extras/check_schwartz
+extras/thetop
 inc/Module/AutoInstall.pm
 inc/Module/Install.pm
 inc/Module/Install/AutoInstall.pm
@@ -24,10 +30,12 @@ lib/TheSchwartz/FuncMap.pm
 lib/TheSchwartz/Job.pm
 lib/TheSchwartz/JobHandle.pm
 lib/TheSchwartz/Worker.pm
-Makefile.PL
-MANIFEST			This list of files
-MANIFEST.SKIP
-META.yml
+server/bin/schwartzd
+server/doc/deps.txt
+server/doc/protocol.txt
+server/t/00-start-ping.t
+server/t/01-insert-and-get.t
+server/t/lib/testlib.pl
 t/05-job-ctor.t
 t/api.t
 t/cleanup.t
@@ -45,8 +53,11 @@ t/lib/db-common.pl
 t/parallel-workers.t
 t/pod-coverage.t
 t/pod.t
+t/priority.t
 t/replace-with.t
 t/retry-delay.t
 t/schema-sqlite.sql
+t/scoreboard.t
+t/server-time.t
 t/unique.t
 t/work-before-funcids-exist.t
diff --git a/META.yml b/META.yml
index 4ec1fdd..55ad529 100644
--- a/META.yml
+++ b/META.yml
@@ -15,4 +15,4 @@ requires:
   Data::ObjectDriver: 0.04
   Digest::MD5: 0
   Storable: 0
-version: 1.04
+version: 1.07
diff --git a/debian/changelog b/debian/changelog
index be49760..d2335d5 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,9 +1,8 @@
-libtheschwartz-perl (1.04-2~test.1) unstable; urgency=low
+libtheschwartz-perl (1.07-1) UNRELEASED; urgency=low
 
-  * Include versions of lib/* from upstream SVN at 136
-  * Temporarily disable tests pod-coverage.t and unique.t
+  * New upstream release
 
- -- Dominic Hargreaves <dom at earth.li>  Sun,  2 Mar 2008 22:28:26 +0000
+ -- Dominic Hargreaves <dom at earth.li>  Tue, 19 Aug 2008 01:04:35 +0100
 
 libtheschwartz-perl (1.04-1) unstable; urgency=low
 
diff --git a/doc/schema-postgres.sql b/doc/schema-postgres.sql
new file mode 100644
index 0000000..c6ad03f
--- /dev/null
+++ b/doc/schema-postgres.sql
@@ -0,0 +1,70 @@
+-- From: Michael Zedeler <michael at zedeler.dk>
+-- Date: July 30, 2007 7:31:55 AM PDT
+-- To: cpan at sixapart.com
+-- Subject: TheSchwartz database schema for postgresql
+--
+-- Hi.
+--
+-- I couldn't find any useful postgresql compatible schema file for  
+-- the tables that TheSchwartz seems to depend on, so I rewrote the  
+-- one supplied in the package.
+--
+-- Here it is. Feel free to include it in the next release.
+--
+-- Regards,
+--
+-- Michael.
+
+
+CREATE TABLE funcmap (
+        funcid SERIAL,
+        funcname       VARCHAR(255) NOT NULL,
+        UNIQUE(funcname)
+);
+
+CREATE TABLE job (
+        jobid           SERIAL,
+        funcid          INT NOT NULL,
+        arg             BYTEA,
+        uniqkey         VARCHAR(255) NULL,
+        insert_time     INTEGER,
+        run_after       INTEGER NOT NULL,
+        grabbed_until   INTEGER NOT NULL,
+        priority        SMALLINT,
+        coalesce        VARCHAR(255),
+        UNIQUE(funcid, uniqkey)
+);
+
+CREATE INDEX job_funcid_runafter ON job (funcid, run_after);
+
+CREATE INDEX job_funcid_coalesce ON job (funcid, coalesce);
+
+CREATE TABLE note (
+        jobid           BIGINT NOT NULL,
+        notekey         VARCHAR(255),
+        PRIMARY KEY (jobid, notekey),
+        value           BYTEA
+);
+
+CREATE TABLE error (
+        error_time      INTEGER NOT NULL,
+        jobid           BIGINT NOT NULL,
+        message         VARCHAR(255) NOT NULL,
+        funcid          INT NOT NULL DEFAULT 0
+);
+
+CREATE INDEX error_funcid_errortime ON error (funcid, error_time);
+CREATE INDEX error_time ON error (error_time);
+CREATE INDEX error_jobid ON error (jobid);
+
+CREATE TABLE exitstatus (
+        jobid           BIGINT PRIMARY KEY NOT NULL,
+        funcid          INT NOT NULL DEFAULT 0,
+        status          SMALLINT,
+        completion_time INTEGER,
+        delete_after    INTEGER
+);
+
+CREATE INDEX exitstatus_funcid ON exitstatus (funcid);
+CREATE INDEX exitstatus_deleteafter ON exitstatus (delete_after);
+
diff --git a/extras/thetop b/extras/thetop
new file mode 100755
index 0000000..275b943
--- /dev/null
+++ b/extras/thetop
@@ -0,0 +1,209 @@
+#!/usr/bin/perl -w
+
+=pod
+
+=head1 NAME
+
+thetop - A 'top' utility for the schwartz
+
+=head1 SYNOPSIS
+
+  thetop [--func FORMAT] [--arg FORMAT] [--sort ARGS] [--delay SECS] [--score-dir DIR]
+
+=head1 DESCRIPTION
+
+
+=cut
+
+#--------------------------------------#
+# Dependencies
+
+use strict;
+
+use Getopt::Long;
+use Term::Cap;
+use POSIX;
+
+#--------------------------------------#
+# Global Variables
+
+use vars qw( $OSPEED );
+
+BEGIN {
+    my $termios = POSIX::Termios->new;
+    $termios->getattr;
+    $OSPEED = $termios->getospeed || 9600;
+};
+
+our $TERM = Term::Cap->Tgetent({OSPEED=>$OSPEED});
+
+#--------------------------------------#
+# Main Program
+
+my ($score_dir, $delay, $func_col, @arg_col, $sort);
+
+GetOptions('score-dir=s' => \$score_dir,
+           'delay|d=s'   => \$delay,
+           'func=s'      => \$func_col,
+           'arg=s'       => \@arg_col,
+           'sort|s=s'    => \$sort,
+          );
+
+# Make sure we know where to find the scoreboard files
+unless ($score_dir) {
+    foreach my $d (qw(/var/run /dev/shm /tmp)) {
+        if (-e "$d/theschwartz") {
+            $score_dir = "$d/theschwartz";
+            last;
+        }
+    }
+
+    die "Can't find scoreboard directory.  Use '--score-dir'\n"
+      unless $score_dir;
+}
+
+# If we got some formatting instructions for the arg column, parse it out
+my %arg_col_by_func;
+if (@arg_col) {
+    foreach my $a (@arg_col) {
+        if ($a =~ /=/) {
+            my ($func, $fmt) = split('=', $a);
+            $arg_col_by_func{$func} = $fmt;
+        } else {
+            $arg_col_by_func{'__ALL__'} = $a;
+        }
+    }
+}
+
+# Make sure to give a reasonable default for delay
+$delay ||= 3;
+
+# Start reporting
+clr_screen();
+while (1) {
+    report($score_dir, $func_col, \%arg_col_by_func, $sort);
+    sleep($delay);
+    clr_screen();
+}
+
+################################################################################
+
+sub report {
+    my ($dir, $func_col, $arg_col_by_func, $sort) = @_;
+
+    # Find the files available
+    opendir(SD, $dir) or die "Can't read directory '$dir': $!\n";
+    my @files = map { $dir."/$_" } readdir(SD);
+    closedir(SD);
+
+    # Grab the data out of them
+    my @data;
+    foreach my $f (@files) {
+        next unless $f =~ /scoreboard\.[0-9]+$/;
+        open(SF, '<', $f) or die "Can't open score file '$f': $!\n";
+        my %dat = map { chomp; split('=') } <SF>;
+        close(SF);
+
+        $dat{arg_array} = [split(',', $dat{arg}||'')];
+        push @data, \%dat;
+    }
+
+    my $num = scalar(@data);
+    my $width = 80-17-$num;
+    printf("Workers: %d total %${width}s\n\n", $num, scalar localtime);
+    printf("% 5s % 20s % 2s % 7s % 41s\n", 'PID', 'FUNC', 'S', 'TIME', 'ARGS');
+    foreach my $d (sort { order_by($sort, $a, $b) } @data) {
+        my $func_str = fmt_func($d, $func_col);
+
+        printf("% 5s % 20s % 2s % 7s % 41s\n",
+               $d->{pid},
+               $func_str,
+               ($d->{done} ? 'S' : 'R'),
+               fmt_time($d),
+               fmt_arg($d, $arg_col_by_func, $func_str),
+              );
+    }
+}
+
+sub order_by {
+    my ($sort, $a, $b) = @_;
+
+    if ($sort) {
+
+    } else {
+        # Default to push running tasks to the top
+        return ($a->{done}||0) <=> ($b->{done}||0) ||
+               ($a->{started}||0) <=> ($b->{started}||0);
+    }
+}
+
+sub fmt_func {
+    my ($d, $fmt) = @_;
+    my $val = $d->{funcname};
+
+    if ($fmt) {
+        if ($fmt eq 'trim') {
+            $val =~ s/^.+:://g;
+        } else {
+            $val =~ /($fmt)/;
+            $val = $1;
+        }
+    }
+
+    return substr($val, 0, 20),
+}
+
+sub fmt_time {
+    my ($d) = @_;
+    my $secs = ($d->{done}||time) - $d->{started};
+
+    if ($secs < 60) {
+        return sprintf("%02d:%02d", 0, $secs);
+    } elsif ($secs < 3600) {
+        my $min = int($secs/60);
+        $secs = $secs%60;
+        return sprintf("%02d:%02d", $min, $secs);
+    } else {
+        my $hr  = int($secs/60/60);
+        my $min = int($secs/60%60);
+        $secs = $secs%60;
+        return sprintf("%d:%02d:%02d", $hr, $min, $secs);
+    }
+}
+
+## Format the arguments by interpreting the args as either a hash or an array
+## and printing out the appropriate element.
+
+sub fmt_arg {
+    my ($d, $arg_col_by_func, $func_str) = @_;
+    my $val = $d->{arg};
+    my $func_orig = $d->{funcname};
+
+    if ($arg_col_by_func) {
+        my $fmt = ($arg_col_by_func{$func_str}  ||
+                   $arg_col_by_func{$func_orig} ||
+                   $arg_col_by_func{'__ALL__'});
+        if ($fmt) {
+            my $arg_array = $d->{arg_array};
+
+            # If its a number treat the args as an array
+            if ($fmt =~ /^[0-9]+$/) {
+                $val = $arg_array->[$fmt];
+            }
+            # otherwise, treat the args as a hash
+            else {
+                # Compensate for odd numbers of args
+                push @$arg_array, undef if scalar(@$arg_array) % 2;
+
+                my %h = @$arg_array;
+                $val = $h{$fmt};
+            }
+        }
+    }
+
+    return substr($val||'', 0, 41),
+}
+
+sub clr_screen {
+    $TERM->Tputs('cl', 1, \*STDOUT);
+}
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index a8076ef..b0d114f 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,10 +1,10 @@
-# $Id: TheSchwartz.pm 136 2008-02-23 01:28:13Z bchoate $
+# $Id: TheSchwartz.pm 138 2008-06-16 17:36:19Z ykerherve $
 
 package TheSchwartz;
 use strict;
 use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
 
-our $VERSION = "1.04";
+our $VERSION = "1.07";
 
 use Carp qw( croak );
 use Data::ObjectDriver::Errors;
@@ -74,7 +74,8 @@ sub driver_for {
     if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
         $driver = $client->{cached_drivers}{$hashdsn}{driver};
     } else {
-        my $db = $client->{databases}{$hashdsn};
+        my $db = $client->{databases}{$hashdsn}
+            or croak "Ouch, I don't know about a database whose hash is $hashdsn";
         $driver = Data::ObjectDriver::Driver::DBI->new(
                 dsn      => $db->{dsn},
                 username => $db->{user},
@@ -922,6 +923,10 @@ Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
 Adds the given C<TheSchwartz::Job> objects to one of the client's job
 databases. All the given jobs are recorded in I<one> job database.
 
+=head2 C<$client-E<gt>set_prioritize( $prioritize )>
+
+Set the C<prioritize> value as described in the constructor.
+
 =head1 WORKING
 
 The methods of TheSchwartz clients for use in worker processes are:
@@ -936,11 +941,6 @@ given ability.
 
 Find and perform one job C<$client> can do.
 
-=head2 C<$client-E<gt>work_on($handle_id)>
-
-Finds a C<TheSchwartz::Job> corresponding to the given handle ID and returns the
-result of L</work_once> on it, or, if the handle is not found, returns false.
-
 =head2 C<$client-E<gt>work_until_done()>
 
 Find and perform jobs C<$client> can do until no more such jobs are found in
diff --git a/server/bin/schwartzd b/server/bin/schwartzd
new file mode 100755
index 0000000..1c34808
--- /dev/null
+++ b/server/bin/schwartzd
@@ -0,0 +1,59 @@
+#!/usr/bin/perl
+
+use strict;
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+use Gearman::Worker;
+use TheSchwartz;
+use JSON::Any;
+
+my $j = JSON::Any->new;
+
+my $ts = TheSchwartz->new(databases => [{
+    dsn => "dbi:mysql:database=t_sch_unnamed",
+    user => "root",
+    pass => "",
+}]);
+
+# FIXME: use embedded gearman server, and workers be child processes
+
+my $worker = Gearman::Worker->new;
+$worker->job_servers('127.0.0.1:7003');
+
+$worker->register_function("insert_job" => handler(\&insert_job));
+$worker->work while 1;
+
+############################################################################
+
+sub handler {
+    my ($code) = @_;
+    return sub {
+	my $job = shift;
+	my $arg = $job->arg;
+	my $jreq = eval { $j->jsonToObj($job->arg) };
+	unless ($jreq) {
+	    die "not a valid JSON request";
+	}
+	return $code->($job, $jreq);
+    };
+}
+
+sub insert_job {
+    my ($job, $json) = @_;
+    my $funcname = $json->{funcname} or die "No funcname";
+    my $job = TheSchwartz::Job->new(
+				    funcname => $json->{funcname},
+				    arg      => $json->{arg},
+				    uniqkey  => $json->{uniqkey},
+				    coalesce => $json->{coalesce},
+				    );
+    my $h = $ts->insert($job) or
+        die "insert_failure\n";
+    return $h->as_string;
+}
+
diff --git a/server/doc/deps.txt b/server/doc/deps.txt
new file mode 100644
index 0000000..53d5366
--- /dev/null
+++ b/server/doc/deps.txt
@@ -0,0 +1,25 @@
+TheSchwartz (itself)
+Data::ObjectDriver
+  DBI
+  Class::Accessor::Fast (libclass-accessor-perl)
+  Class::Trigger (libclass-trigger-perl)
+     libio-stringy-perl
+     Class::Data::Inheritable
+  List::Util
+  Test::Exception -- build_requires (libtest-exception-perl)
+
+Digest::MD5
+Storable
+
+Gearman::Server
+
+Danga::Socket
+Sys::Syscall
+
+JSON::Any
+JSON
+
+----
+Perl client:
+   Gearman::Client
+   String::CRC32
diff --git a/server/doc/protocol.txt b/server/doc/protocol.txt
new file mode 100644
index 0000000..24134a3
--- /dev/null
+++ b/server/doc/protocol.txt
@@ -0,0 +1,56 @@
+grab_job:  {
+   can_do: [@can_do],
+}
+  -> {
+     jobid,
+     job,
+     arg,
+     # failure_count?
+  }
+  (or)
+  -> Nothing
+
+
+insert_job: {
+  job: "foo",
+  arg: "lskdjflksdjflskdf",
+  uniqkey: "blah",
+  run_after: $unix_time,
+  coalesce: "to_foo",
+}
+  -> jobid
+
+
+# atomic insert multiple jobs:
+insert_jobs:
+   [ {...}, {...}, ]
+
+   -> @jobids
+
+mark_completed: {
+   jobid: 5,
+   replace_with: [@jobs],    #optional
+}
+   -> { handles => [@handles] }
+
+mark_failed: {
+   jobid: 5,
+   message: "error message",
+   exit_status: 6,
+   retry_in: 80,   # optional.  if not present, no retry.
+}
+
+get_failure_log: {
+   jobid: 6
+}
+  -> [ {time:2342342,exitstatus ... }, {....}, {...} ]
+
+get_status: {
+   jobid: 6,
+}
+  -> {
+       exitstatus: 0,
+     }
+
+
+
diff --git a/server/t/00-start-ping.t b/server/t/00-start-ping.t
new file mode 100644
index 0000000..32ae203
--- /dev/null
+++ b/server/t/00-start-ping.t
@@ -0,0 +1,16 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+require 't/lib/testlib.pl';
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+
diff --git a/server/t/01-insert-and-get.t b/server/t/01-insert-and-get.t
new file mode 100644
index 0000000..9f934c7
--- /dev/null
+++ b/server/t/01-insert-and-get.t
@@ -0,0 +1,45 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+use Test::More;
+BEGIN {
+    require 't/lib/testlib.pl';
+}
+use Gearman::Client;
+use Data::Dumper;
+
+my $db = TestDB->new;
+plan tests => 1;
+
+ok($db, "got a test database");
+
+my $srv = TestServer->new($db);
+ok($srv, "got a test server");
+
+my $cl = $srv->gearman_client;
+
+my $ret;
+
+# FIXME: test currently requires running gearmand on localhost
+{
+    use IO::Socket::INET;
+    my $sock = IO::Socket::INET->new(PeerAddr => "127.0.0.1:7003");
+    ok($sock, "local gearmand is up for testing")
+        or die "can't continue";
+}
+
+sub do_req {
+    my $req = shift;
+    my $ret = $cl->do_task("insert_job", json($req));
+    return undef unless $ret;
+    return $$ret unless $$ret =~ /^\s*[\[\{]/;
+    return unjson($$ret);
+}
+
+$ret = do_req({
+    funcname => "foo",
+    arg => "fooarg",
+});
+like($ret, qr/^\w+-\d+$/, "got a job handle");
+
diff --git a/server/t/lib/testlib.pl b/server/t/lib/testlib.pl
new file mode 100644
index 0000000..8668800
--- /dev/null
+++ b/server/t/lib/testlib.pl
@@ -0,0 +1,174 @@
+# $Id: db-common.pl 91 2006-08-17 00:39:55Z bradfitz $
+
+use strict;
+use File::Spec;
+use Carp qw(croak);
+use DBI;
+use FindBin;
+use JSON::Any;
+
+use lib "$ENV{HOME}/hack/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/hack/TheSchwartz/lib";
+use lib "$ENV{HOME}/hack/gearman/api/perl/Gearman/lib";
+use lib "$ENV{HOME}/cvs/Data-ObjectDriver/lib";
+use lib "$ENV{HOME}/cvs/TheSchwartz/lib";
+use lib "$ENV{HOME}/cvs/gearman/api/perl/Gearman/lib";
+
+sub json {
+    return JSON::Any->objToJson(shift);
+}
+
+sub unjson {
+    return JSON::Any->json_to_obj(shift);
+}
+
+sub test_client {
+    my %opts = @_;
+    my $dbs     = delete $opts{dbs};
+    my $init    = delete $opts{init};
+    my $pfx     = delete $opts{dbprefix};
+    croak "'dbs' not an ARRAY" unless ref $dbs eq "ARRAY";
+    croak "unknown opts" if %opts;
+    $init = 1 unless defined $init;
+
+    if ($init) {
+        setup_dbs({ prefix => $pfx }, $dbs);
+    }
+
+    return TheSchwartz->new(databases => [
+                                          map { {
+                                              dsn  => dsn_for($_),
+                                              user => "root",
+                                              pass => "",
+                                              prefix => $pfx,
+                                          } } @$dbs
+                                          ]);
+}
+
+package TestDB;
+use strict;
+sub new {
+    my $class = shift;
+    my $name = shift || "unnamed";
+    my $db = TestDB::MySQL->new($name) || TestDB::SQLite->new($name);
+    if ($db) {
+	my $dbh = $db->dbh;
+	my $schema = $db->schema_file;
+        my @sql = _load_sql($schema);
+        for my $sql (@sql) {
+	    $db->alter_create(\$sql);
+            $dbh->do($sql);
+        }
+        $dbh->disconnect;
+	return $db;
+    }
+
+    eval {
+	Test::More::plan(skip_all => "MySQL or SQLite not available for testing");
+    };
+    if ($@) {
+	return undef;
+    }
+    exit(0);
+}
+
+sub dbh {
+    my ($self) = @_;
+    return DBI->connect($self->dsn, "root", "", { RaiseError => 1 });
+}
+
+sub alter_create {
+    my $sqlref = shift;
+    # subclasses can override
+}
+
+sub _load_sql {
+    my($file) = @_;
+    open my $fh, $file or die "Can't open $file: $!";
+    my $sql = do { local $/; <$fh> };
+    close $fh;
+    split /;\s*/, $sql;
+}
+
+package TestDB::MySQL;
+use strict;
+use base 'TestDB';
+
+sub new {
+    my ($class, $name) = @_;
+
+    my $dbh  = eval { _mysql_dbh() } or return undef;
+    my $self = bless {
+	basename  => $name,
+	dbname    => "t_sch_$name",
+	root_dbh  => $dbh,
+    }, $class;
+
+    $dbh->do("DROP DATABASE IF EXISTS $self->{dbname}");
+    $dbh->do("CREATE DATABASE $self->{dbname}");
+    return $self;
+}
+
+sub dsn {
+    my ($self) = @_;
+    return "DBI:mysql:" . $self->{dbname};
+}
+
+sub _mysql_dbh {
+    return DBI->connect("DBI:mysql:mysql", "root", "", { RaiseError => 1 })
+        or die "Couldn't connect to database";
+}
+
+sub alter_create {
+    my ($self, $sqlref) = @_;
+    $$sqlref .= " ENGINE=INNODB\n";
+}
+
+sub schema_file {
+    return "../doc/schema.sql";
+}
+
+package TestDB::SQLite;
+use strict;
+use base 'TestDB';
+
+sub new {
+    return undef;
+}
+
+package TestServer;
+use strict;
+
+sub new {
+    my ($class, $db) = @_;
+    $db ||= TestDB->new || return undef;
+    my $pid = fork;
+    die "out of memory" unless defined $pid;
+    if ($pid) {
+	return bless {
+	    pid => $pid,
+	}, $class;
+    }
+
+    my $bin = "$FindBin::Bin/../bin/schwartzd";
+    die "Not exist: $bin" unless -e $bin;
+    die "Not executable: $bin" unless -x $bin;
+    exec $bin;
+    die "Failed to exec test schwartzd!";
+}
+
+sub gearman_client {
+    my $self = shift;
+    my $cl = Gearman::Client->new;
+    $cl->job_servers('127.0.0.1:7003');
+    return $cl;
+}
+
+sub DESTROY {
+    my $self = shift;
+    if ($self->{pid}) {
+	kill 9, $self->{pid};
+    }
+}
+
+1;
diff --git a/t/api.t b/t/api.t
index bdb5790..f3a77b6 100644
--- a/t/api.t
+++ b/t/api.t
@@ -7,9 +7,9 @@ use warnings;
 require 't/lib/db-common.pl';
 
 use TheSchwartz;
-use Test::More tests => 80;
+use Test::More tests => 108;
 
-run_tests(40, sub {
+run_tests(42, sub {
     foreach my $pfx ("", "testprefix_") {
 
         my $client = test_client(dbs      => ['ts1'],
@@ -30,6 +30,7 @@ run_tests(40, sub {
         my $job = $handle->job;
         isa_ok $job, 'TheSchwartz::Job';
         is $job->funcname, 'feedmajor', 'handle->job gives us the right job';
+        cmp_ok $job->insert_time, '>', 0, 'insert_time is non-zero';
 
         # getting a handle object back
         my $hand2 = $client->handle_from_string($hstr);
@@ -80,6 +81,18 @@ run_tests(40, sub {
             ok($job);
             $handle = $client->insert($job);
             isa_ok $handle, 'TheSchwartz::JobHandle';
+            
+            my $same = $client->lookup_job($handle->as_string);
+            ok $same;
+            isa_ok $same, 'TheSchwartz::Job';
+            is $same->handle->as_string, $handle->as_string;
+            
+        }
+        
+        ## Just test that handles for unknown database croak with an explicit message
+        {
+            eval { $client->lookup_job( ("6a" x 16) ."-666") };
+            ok $@ && unlike($@, qr/No Driver/) && like($@, qr/database.*hash/);
         }
 
         # inserting multiple with wrong method fails
diff --git a/t/priority.t b/t/priority.t
new file mode 100644
index 0000000..5713c9d
--- /dev/null
+++ b/t/priority.t
@@ -0,0 +1,72 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 62;
+
+our $record_expected;
+
+run_tests(31, sub {
+    my $client = test_client(dbs => ['ts1']);
+
+    # Define that we want to use priority selection
+    # limit batch size to 1 so we always process jobs in
+    # priority order
+    $client->set_prioritize(1);
+    $TheSchwartz::FIND_JOB_BATCH_SIZE = 1;
+
+    for (1..10) {
+        my $job = TheSchwartz::Job->new(
+                                        funcname => 'Worker::PriorityTest',
+                                        arg      => { num => $_ },
+                                        ( $_ == 1 ? () : ( priority => $_ ) ),
+                                        );
+        my $h = $client->insert($job);
+        ok($h, "inserted job (priority $_)");
+    }
+
+    $client->reset_abilities;
+    $client->can_do("Worker::PriorityTest");
+
+    Worker::PriorityTest->set_client($client);
+
+    for (1..10) {
+        $record_expected = 11 - $_ == 1 ? undef : 11 - $_;
+        my $rv = eval { $client->work_once; };
+        ok($rv, "did stuff");
+    }
+    my $rv = eval { $client->work_once; };
+    ok(!$rv, "nothing to do now");
+
+    teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::PriorityTest;
+use base 'TheSchwartz::Worker';
+use Test::More;
+
+use strict;
+my $client;
+sub set_client { $client = $_[1]; }
+
+sub work {
+    my ($class, $job) = @_;
+    my $priority = $job->priority;
+    ok((!defined($main::record_expected) && (!defined($priority)))
+        || ($priority == $main::record_expected), "priority matches expected priority");
+    $job->completed;
+}
+
+sub keep_exit_status_for { 20 }  # keep exit status for 20 seconds after on_complete
+
+sub grab_for { 10 }
+
+sub max_retries { 1 }
+
+sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; }
+
diff --git a/t/scoreboard.t b/t/scoreboard.t
new file mode 100644
index 0000000..2cf3948
--- /dev/null
+++ b/t/scoreboard.t
@@ -0,0 +1,78 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use Test::More tests => 20;
+
+use TheSchwartz;
+use File::Spec qw();
+
+run_tests(10, sub {
+    my $pfx = '';
+    my $dbs = ['ts1'];
+
+    setup_dbs({prefix => $pfx}, $dbs);
+
+    my $client = TheSchwartz->new(scoreboard => '/tmp',
+                                  databases => [
+                                          map { {
+                                              dsn  => dsn_for($_),
+                                              user => "root",
+                                              pass => "",
+                                              prefix => $pfx,
+                                          } } @$dbs
+                                          ]);
+
+    my $sb_file = $client->scoreboard;
+    {
+        (undef, my ($sb_dir, $sb_name))  = File::Spec->splitpath($sb_file);
+        ok(-e $sb_dir, "Looking for dir $sb_dir");
+    }
+
+    {
+        my $handle = $client->insert("Worker::Addition",
+                                     {numbers => [1, 2]});
+        my $job    = Worker::Addition->grab_job($client);
+
+        my $rv = eval { Worker::Addition->work_safely($job); };
+        ok(length($@) == 0, 'Finished job with out error');
+
+        unless (ok(-e $sb_file, "Scoreboard file exists")) {
+            return;
+        }
+
+        open(FH, $sb_file) or die "Can't open '$sb_file': $!\n";
+
+        my %info = map { chomp; /^([^=]+)=(.*)$/ } <FH>;
+        close(FH);
+
+        ok($info{pid} == $$, 'Has our PID');
+        ok($info{funcname} eq 'Worker::Addition', 'Has our funcname');
+        ok($info{started} =~ /\d+/, 'Started time is a number');
+        ok($info{started} <= time, 'Started time is in the past');
+        ok($info{arg} =~ /^numbers=ARRAY/, 'Has right args');
+        ok($info{done} =~ /\d+/, 'Job has done time');
+    }
+
+    {
+        $client->DESTROY;
+        ok(! -e $sb_file, 'Scoreboard file goes away when worker finishes');
+    }
+
+    teardown_dbs('ts1');
+});
+
+############################################################################
+package Worker::Addition;
+use base 'TheSchwartz::Worker';
+
+sub work {
+    my ($class, $job) = @_;
+
+    # ....
+}
+
+1;
diff --git a/t/server-time.t b/t/server-time.t
new file mode 100644
index 0000000..f619bf3
--- /dev/null
+++ b/t/server-time.t
@@ -0,0 +1,20 @@
+# -*-perl-*-
+
+use strict;
+use warnings;
+
+require 't/lib/db-common.pl';
+
+use TheSchwartz;
+use Test::More tests => 4;
+
+run_tests(4, sub {
+    my $client = test_client(dbs => ['ts1']);
+
+    my $driver = $client->driver_for( ($client->shuffled_databases)[0] );
+    isa_ok $driver, 'Data::ObjectDriver::Driver::DBI';
+
+    cmp_ok $client->get_server_time($driver), '>', 0, 'got server time';
+
+    teardown_dbs('ts1');
+});

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git



More information about the Pkg-perl-cvs-commits mailing list