[libtheschwartz-perl] 09/43: include newer lib/
dom at earth.li
dom at earth.li
Mon May 9 20:10:55 UTC 2016
This is an automated email from the git hooks/post-receive script.
dom pushed a commit to branch master
in repository libtheschwartz-perl.
commit cec8697433447d8529ed8559a2553611ab0ad2cb
Author: Dominic Hargreaves <dom at earth.li>
Date: Sun Mar 2 22:26:58 2008 +0000
include newer lib/
---
debian/changelog | 6 +-
lib/TheSchwartz.pm | 191 ++++++++++++++++++++++++++++++++++++++++++---
lib/TheSchwartz/FuncMap.pm | 11 ++-
lib/TheSchwartz/Worker.pm | 6 +-
4 files changed, 195 insertions(+), 19 deletions(-)
diff --git a/debian/changelog b/debian/changelog
index 882bf68..d277584 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,8 +1,8 @@
-libtheschwartz-perl (1.04-2) UNRELEASED; urgency=low
+libtheschwartz-perl (1.04-2~test.1) unstable; urgency=low
- * NOT RELEASED YET
+ * Include versions of lib/* from upstream SVN at 136
- -- Dominic Hargreaves <dom at earth.li> Thu, 20 Dec 2007 23:27:45 +0000
+ -- Dominic Hargreaves <dom at earth.li> Sun, 2 Mar 2008 22:26:14 +0000
libtheschwartz-perl (1.04-1) unstable; urgency=low
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index c2e8589..a8076ef 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,8 +1,8 @@
-# $Id: TheSchwartz.pm 122 2007-05-22 17:11:37Z bradfitz $
+# $Id: TheSchwartz.pm 136 2008-02-23 01:28:13Z bchoate $
package TheSchwartz;
use strict;
-use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration );
+use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
our $VERSION = "1.04";
@@ -35,7 +35,9 @@ sub new {
my $databases = delete $args{databases};
$client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
+ $client->set_prioritize(delete $args{prioritize});
$client->set_verbose(delete $args{verbose});
+ $client->set_scoreboard(delete $args{scoreboard});
$client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
croak "unknown options ", join(', ', keys %args) if keys %args;
@@ -80,8 +82,8 @@ sub driver_for {
($db->{prefix} ? (prefix => $db->{prefix}) : ()),
);
if ($cache_duration) {
- $client->{cached_drivers}{$hashdsn}{driver} = $driver;
- $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
+ $client->{cached_drivers}{$hashdsn}{driver} = $driver;
+ $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
}
}
return $driver;
@@ -165,12 +167,19 @@ sub list_jobs {
} $driver->search('TheSchwartz::Job' => {
funcid => $funcid,
@options
- }, { limit => $limit });
+ }, { limit => $limit,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ });
} else {
push @jobs, $driver->search('TheSchwartz::Job' => {
funcid => $funcid,
@options
- }, { limit => $limit });
+ }, { limit => $limit,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
}
}
return @jobs;
@@ -213,7 +222,11 @@ sub _find_job_with_coalescing {
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
coalesce => { op => $op, value => $coval },
- }, { limit => $FIND_JOB_BATCH_SIZE });
+ }, { limit => $FIND_JOB_BATCH_SIZE,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
};
if ($@) {
unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -252,7 +265,11 @@ sub find_job_for_workers {
funcid => \@ids,
run_after => \ "<= $unixtime",
grabbed_until => \ "<= $unixtime",
- }, { limit => $FIND_JOB_BATCH_SIZE });
+ }, { limit => $FIND_JOB_BATCH_SIZE,
+ ( $client->prioritize ? ( sort => 'priority',
+ direction => 'descend' ) : () )
+ }
+ );
};
if ($@) {
unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -268,6 +285,13 @@ sub find_job_for_workers {
}
}
+sub get_server_time {
+ my TheSchwartz $client = shift;
+ my($driver) = @_;
+ my $unixtime_sql = $driver->dbd->sql_for_unixtime;
+ return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
+}
+
sub _grab_a_job {
my TheSchwartz $client = shift;
my $hashdsn = shift;
@@ -286,8 +310,7 @@ sub _grab_a_job {
my $worker_class = $job->funcname;
my $old_grabbed_until = $job->grabbed_until;
- my $unixtime_sql = $driver->dbd->sql_for_unixtime;
- my $server_time = $driver->rw_handle->selectrow_array("SELECT $unixtime_sql")
+ my $server_time = $client->get_server_time($driver)
or die "expected a server time";
$job->grabbed_until($server_time + ($worker_class->grab_for || 1));
@@ -329,6 +352,12 @@ sub insert_job_to_driver {
## on the hashed DSN. Also: this might fail, if the database is dead.
$job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
+ ## This is sub-optimal because of clock skew, but something is
+ ## better than a NULL value. And currently, nothing in TheSchwartz
+ ## code itself uses insert_time. TODO: use server time, but without
+ ## having to do a roundtrip just to get the server time.
+ $job->insert_time(time);
+
## Now, insert the job. This also might fail.
$driver->insert($job);
};
@@ -490,7 +519,8 @@ sub work_once {
my $class = $job ? $job->funcname : undef;
if ($job) {
- $job->debug("TheSchwartz::work_once got job of class '$class'");
+ my $priority = $job->priority ? ", priority " . $job->priority : "";
+ $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
} else {
$client->debug("TheSchwartz::work_once found no jobs");
}
@@ -566,6 +596,118 @@ sub set_verbose {
$client->{verbose} = $logger;
}
+sub scoreboard {
+ my TheSchwartz $client = shift;
+
+ return $client->{scoreboard};
+}
+
+sub set_scoreboard {
+ my TheSchwartz $client = shift;
+ my ($dir) = @_;
+
+ return unless $dir;
+
+ # They want the scoreboard but don't care where it goes
+ if (($dir eq '1') or ($dir eq 'on')) {
+ # Find someplace in tmpfs to save this
+ foreach my $d (qw(/var/run /dev/shm)) {
+ $dir = $d;
+ last if -e $dir;
+ }
+ }
+
+ $dir .= '/theschwartz';
+ unless (-e $dir) {
+ mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
+ }
+
+ $client->{scoreboard} = $dir."/scoreboard.$$";
+}
+
+sub start_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ # Don't do anything of (for some reason) we don't have a current job
+ my $job = $client->current_job;
+ return unless $job;
+
+ my $class = $job->funcname;
+
+ open(SB, '>', $scoreboard)
+ or $job->debug("Could not write scoreboard '$scoreboard': $!");
+ print SB join("\n", ("pid=$$",
+ 'funcname='.($class||''),
+ 'started='.($job->grabbed_until-($class->grab_for||1)),
+ 'arg='._serialize_args($job->arg),
+ )
+ ), "\n";
+ close(SB);
+
+ return;
+}
+
+# Quick and dirty serializer. Don't use Data::Dumper because we don't need to
+# recurse indefinitely and we want to truncate the output produced
+sub _serialize_args {
+ my ($args) = @_;
+
+ if (ref $args) {
+ if (ref $args eq 'HASH') {
+ return join ',',
+ map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
+ keys %$args;
+ } elsif (ref $args eq 'ARRAY') {
+ return join ',',
+ map { substr($_||'', 0, 200) }
+ @$args;
+ }
+ } else {
+ return $args;
+ }
+}
+
+sub end_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ my $job = $client->current_job;
+
+ open(SB, '>>', $scoreboard)
+ or $job->debug("Could not append scoreboard '$scoreboard': $!");
+ print SB "done=".time."\n";
+ close(SB);
+
+ return;
+}
+
+sub clean_scoreboard {
+ my TheSchwartz $client = shift;
+
+ # Don't do anything if we're not configured to write to the scoreboard
+ my $scoreboard = $client->scoreboard;
+ return unless $scoreboard;
+
+ unlink($scoreboard);
+}
+
+sub prioritize {
+ my TheSchwartz $client = shift;
+ return $client->{prioritize};
+}
+
+sub set_prioritize {
+ my TheSchwartz $client = shift;
+ $client->{prioritize} = shift;
+}
+
# current job being worked. so if something dies, work_safely knows which to mark as dead.
sub current_job {
my TheSchwartz $client = shift;
@@ -577,6 +719,15 @@ sub set_current_job {
$client->{current_job} = shift;
}
+DESTROY {
+ foreach my $arg (@_) {
+ # Call 'clean_scoreboard' on TheSchwartz objects
+ if (ref($arg) and $arg->isa('TheSchwartz')) {
+ $arg->clean_scoreboard;
+ }
+ }
+}
+
1;
__END__
@@ -605,14 +756,14 @@ TheSchwartz - reliable job queue
sub work {
my $class = shift;
my TheSchwartz::Job $job = shift;
-
+
print "Workin' hard or hardly workin'? Hyuk!!\n";
$job->completed();
}
package main;
-
+
my $client = TheSchwartz->new( databases => $DATABASE_INFO );
$client->can_do('MyWorker');
$client->work();
@@ -681,6 +832,12 @@ it is called to log debug messages. If C<verbose> is not a coderef but is some
other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
messages will not be logged.
+=item * C<prioritize>
+
+A value indicating whether to utilize the job 'priority' field when selecting
+jobs to be processed. If unspecified, jobs will always be executed in a
+randomized order.
+
=item * C<driver_cache_expiration>
Optional value to control how long database connections are cached for in seconds.
@@ -795,6 +952,10 @@ Find and perform any jobs C<$client> can do, forever. When no job is available,
the working process will sleep for C<$delay> seconds (or 5, if not specified)
before looking again.
+=head2 C<$client-E<gt>work_on($handle)>
+
+Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
+
=head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
Returns a C<TheSchwartz::Job> for a random job that the client can do. If
@@ -815,6 +976,10 @@ Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
find matching jobs, with all the attendant performance implications for your
job databases.
+=head2 C<$client-E<gt>get_server_time( $driver )>
+
+Given an open driver I<$driver> to a database, gets the current server time from the database.
+
=head1 COPYRIGHT, LICENSE & WARRANTY
This software is Copyright 2007, Six Apart Ltd, cpan at sixapart.com. All
diff --git a/lib/TheSchwartz/FuncMap.pm b/lib/TheSchwartz/FuncMap.pm
index 3179082..65c9ba9 100644
--- a/lib/TheSchwartz/FuncMap.pm
+++ b/lib/TheSchwartz/FuncMap.pm
@@ -1,4 +1,4 @@
-# $Id: FuncMap.pm 42 2006-05-16 05:15:30Z btrott $
+# $Id: FuncMap.pm 136 2008-02-23 01:28:13Z bchoate $
package TheSchwartz::FuncMap;
use strict;
@@ -16,9 +16,16 @@ sub create_or_find {
my $class = shift;
my($driver, $funcname) = @_;
+ ## Attempt to select funcmap record by name. If successful, return
+ ## object, otherwise proceed with insertion and return.
+ my ($map) = $driver->search('TheSchwartz::FuncMap' =>
+ { funcname => $funcname }
+ );
+ return $map if $map;
+
## Attempt to insert a new funcmap row. Since the funcname column is
## UNIQUE, if the row already exists, an exception will be thrown.
- my $map = $class->new;
+ $map = $class->new;
$map->funcname($funcname);
eval { $driver->insert($map) };
diff --git a/lib/TheSchwartz/Worker.pm b/lib/TheSchwartz/Worker.pm
index bbe61ab..2bfeaae 100644
--- a/lib/TheSchwartz/Worker.pm
+++ b/lib/TheSchwartz/Worker.pm
@@ -1,4 +1,4 @@
-# $Id: Worker.pm 106 2006-10-16 21:42:32Z mpaschal $
+# $Id: Worker.pm 134 2007-08-22 22:04:38Z garth $
package TheSchwartz::Worker;
use strict;
@@ -24,6 +24,8 @@ sub work_safely {
$job->debug("Working on $class ...");
$job->set_as_current;
+ $client->start_scoreboard;
+
eval {
$res = $class->work($job);
};
@@ -37,6 +39,8 @@ sub work_safely {
$cjob->failed('Job did not explicitly complete, fail, or get replaced');
}
+ $client->end_scoreboard;
+
# FIXME: this return value is kinda useless/undefined. should we even return anything? any callers? -brad
return $res;
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git
More information about the Pkg-perl-cvs-commits
mailing list