[libtheschwartz-perl] 09/43: include newer lib/

dom at earth.li dom at earth.li
Mon May 9 20:10:55 UTC 2016


This is an automated email from the git hooks/post-receive script.

dom pushed a commit to branch master
in repository libtheschwartz-perl.

commit cec8697433447d8529ed8559a2553611ab0ad2cb
Author: Dominic Hargreaves <dom at earth.li>
Date:   Sun Mar 2 22:26:58 2008 +0000

    include newer lib/
---
 debian/changelog           |   6 +-
 lib/TheSchwartz.pm         | 191 ++++++++++++++++++++++++++++++++++++++++++---
 lib/TheSchwartz/FuncMap.pm |  11 ++-
 lib/TheSchwartz/Worker.pm  |   6 +-
 4 files changed, 195 insertions(+), 19 deletions(-)

diff --git a/debian/changelog b/debian/changelog
index 882bf68..d277584 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,8 +1,8 @@
-libtheschwartz-perl (1.04-2) UNRELEASED; urgency=low
+libtheschwartz-perl (1.04-2~test.1) unstable; urgency=low
 
-  * NOT RELEASED YET
+  * Include versions of lib/* from upstream SVN at 136
 
- -- Dominic Hargreaves <dom at earth.li>  Thu, 20 Dec 2007 23:27:45 +0000
+ -- Dominic Hargreaves <dom at earth.li>  Sun,  2 Mar 2008 22:26:14 +0000
 
 libtheschwartz-perl (1.04-1) unstable; urgency=low
 
diff --git a/lib/TheSchwartz.pm b/lib/TheSchwartz.pm
index c2e8589..a8076ef 100644
--- a/lib/TheSchwartz.pm
+++ b/lib/TheSchwartz.pm
@@ -1,8 +1,8 @@
-# $Id: TheSchwartz.pm 122 2007-05-22 17:11:37Z bradfitz $
+# $Id: TheSchwartz.pm 136 2008-02-23 01:28:13Z bchoate $
 
 package TheSchwartz;
 use strict;
-use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration );
+use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
 
 our $VERSION = "1.04";
 
@@ -35,7 +35,9 @@ sub new {
     my $databases = delete $args{databases};
 
     $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
+    $client->set_prioritize(delete $args{prioritize});
     $client->set_verbose(delete $args{verbose});
+    $client->set_scoreboard(delete $args{scoreboard});
     $client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
     croak "unknown options ", join(', ', keys %args) if keys %args;
 
@@ -80,8 +82,8 @@ sub driver_for {
                 ($db->{prefix} ? (prefix   => $db->{prefix}) : ()),
         );
         if ($cache_duration) {
-            $client->{cached_drivers}{$hashdsn}{driver} = $driver;            
-            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;                        
+            $client->{cached_drivers}{$hashdsn}{driver} = $driver;
+            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
         }
     }
     return $driver;
@@ -165,12 +167,19 @@ sub list_jobs {
             } $driver->search('TheSchwartz::Job' => {
                 funcid        => $funcid,
                 @options
-                }, { limit => $limit });
+                }, { limit => $limit,
+                    ( $client->prioritize ? ( sort => 'priority',
+                    direction => 'descend' ) : () )
+                });
         } else {
             push @jobs, $driver->search('TheSchwartz::Job' => {
                 funcid        => $funcid,
                 @options
-                }, { limit => $limit });
+                }, { limit => $limit,
+                    ( $client->prioritize ? ( sort => 'priority',
+                        direction => 'descend' ) : () )
+                }
+            );
         }
     }
     return @jobs;
@@ -213,7 +222,11 @@ sub _find_job_with_coalescing {
                     run_after     => \ "<= $unixtime",
                     grabbed_until => \ "<= $unixtime",
                     coalesce      => { op => $op, value => $coval },
-                }, { limit => $FIND_JOB_BATCH_SIZE });
+                }, { limit => $FIND_JOB_BATCH_SIZE,
+                    ( $client->prioritize ? ( sort => 'priority',
+                        direction => 'descend' ) : () )
+                }
+            );
         };
         if ($@) {
             unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -252,7 +265,11 @@ sub find_job_for_workers {
                     funcid        => \@ids,
                     run_after     => \ "<= $unixtime",
                     grabbed_until => \ "<= $unixtime",
-                }, { limit => $FIND_JOB_BATCH_SIZE });
+                }, { limit => $FIND_JOB_BATCH_SIZE,
+                    ( $client->prioritize ? ( sort => 'priority',
+                    direction => 'descend' ) : () )
+                }
+            );
         };
         if ($@) {
             unless (OK_ERRORS->{ $driver->last_error || 0 }) {
@@ -268,6 +285,13 @@ sub find_job_for_workers {
     }
 }
 
+sub get_server_time {
+    my TheSchwartz $client = shift;
+    my($driver) = @_;
+    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
+    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
+}
+
 sub _grab_a_job {
     my TheSchwartz $client = shift;
     my $hashdsn = shift;
@@ -286,8 +310,7 @@ sub _grab_a_job {
         my $worker_class = $job->funcname;
         my $old_grabbed_until = $job->grabbed_until;
 
-        my $unixtime_sql = $driver->dbd->sql_for_unixtime;
-        my $server_time = $driver->rw_handle->selectrow_array("SELECT $unixtime_sql")
+        my $server_time = $client->get_server_time($driver)
             or die "expected a server time";
 
         $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
@@ -329,6 +352,12 @@ sub insert_job_to_driver {
         ## on the hashed DSN. Also: this might fail, if the database is dead.
         $job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
 
+        ## This is sub-optimal because of clock skew, but something is
+        ## better than a NULL value. And currently, nothing in TheSchwartz
+        ## code itself uses insert_time. TODO: use server time, but without
+        ## having to do a roundtrip just to get the server time.
+        $job->insert_time(time);
+
         ## Now, insert the job. This also might fail.
         $driver->insert($job);
     };
@@ -490,7 +519,8 @@ sub work_once {
 
     my $class = $job ? $job->funcname : undef;
     if ($job) {
-        $job->debug("TheSchwartz::work_once got job of class '$class'");
+        my $priority = $job->priority ? ", priority " . $job->priority : "";
+        $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
     } else {
         $client->debug("TheSchwartz::work_once found no jobs");
     }
@@ -566,6 +596,118 @@ sub set_verbose {
     $client->{verbose} = $logger;
 }
 
+sub scoreboard {
+    my TheSchwartz $client = shift;
+
+    return $client->{scoreboard};
+}
+
+sub set_scoreboard {
+    my TheSchwartz $client = shift;
+    my ($dir) = @_;
+
+    return unless $dir;
+
+    # They want the scoreboard but don't care where it goes
+    if (($dir eq '1') or ($dir eq 'on')) {
+        # Find someplace in tmpfs to save this
+        foreach my $d (qw(/var/run /dev/shm)) {
+            $dir = $d;
+            last if -e $dir;
+        }
+    }
+
+    $dir .= '/theschwartz';
+    unless (-e $dir) {
+        mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
+    }
+
+    $client->{scoreboard} = $dir."/scoreboard.$$";
+}
+
+sub start_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    # Don't do anything of (for some reason) we don't have a current job
+    my $job = $client->current_job;
+    return unless $job;
+
+    my $class = $job->funcname;
+
+    open(SB, '>', $scoreboard)
+      or $job->debug("Could not write scoreboard '$scoreboard': $!");
+    print SB join("\n", ("pid=$$",
+                         'funcname='.($class||''),
+                         'started='.($job->grabbed_until-($class->grab_for||1)),
+                         'arg='._serialize_args($job->arg),
+                        )
+                 ), "\n";
+    close(SB);
+
+    return;
+}
+
+# Quick and dirty serializer.  Don't use Data::Dumper because we don't need to
+# recurse indefinitely and we want to truncate the output produced
+sub _serialize_args {
+    my ($args) = @_;
+
+    if (ref $args) {
+        if (ref $args eq 'HASH') {
+            return join ',',
+                   map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
+                   keys %$args;
+        } elsif (ref $args eq 'ARRAY') {
+            return join ',',
+                   map { substr($_||'', 0, 200) }
+                   @$args;
+        }
+    } else {
+        return $args;
+    }
+}
+
+sub end_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    my $job = $client->current_job;
+
+    open(SB, '>>', $scoreboard)
+      or $job->debug("Could not append scoreboard '$scoreboard': $!");
+    print SB "done=".time."\n";
+    close(SB);
+
+    return;
+}
+
+sub clean_scoreboard {
+    my TheSchwartz $client = shift;
+
+    # Don't do anything if we're not configured to write to the scoreboard
+    my $scoreboard = $client->scoreboard;
+    return unless $scoreboard;
+
+    unlink($scoreboard);
+}
+
+sub prioritize {
+    my TheSchwartz $client = shift;
+    return $client->{prioritize};
+}
+
+sub set_prioritize {
+    my TheSchwartz $client = shift;
+    $client->{prioritize} = shift;
+}
+
 # current job being worked.  so if something dies, work_safely knows which to mark as dead.
 sub current_job {
     my TheSchwartz $client = shift;
@@ -577,6 +719,15 @@ sub set_current_job {
     $client->{current_job} = shift;
 }
 
+DESTROY {
+    foreach my $arg (@_) {
+        # Call 'clean_scoreboard' on TheSchwartz objects
+        if (ref($arg) and $arg->isa('TheSchwartz')) {
+            $arg->clean_scoreboard;
+        }
+    }
+}
+
 1;
 
 __END__
@@ -605,14 +756,14 @@ TheSchwartz - reliable job queue
     sub work {
         my $class = shift;
         my TheSchwartz::Job $job = shift;
-        
+
         print "Workin' hard or hardly workin'? Hyuk!!\n";
 
         $job->completed();
     }
 
     package main;
-    
+
     my $client = TheSchwartz->new( databases => $DATABASE_INFO );
     $client->can_do('MyWorker');
     $client->work();
@@ -681,6 +832,12 @@ it is called to log debug messages. If C<verbose> is not a coderef but is some
 other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
 messages will not be logged.
 
+=item * C<prioritize>
+
+A value indicating whether to utilize the job 'priority' field when selecting
+jobs to be processed. If unspecified, jobs will always be executed in a
+randomized order.
+
 =item * C<driver_cache_expiration>
 
 Optional value to control how long database connections are cached for in seconds.
@@ -795,6 +952,10 @@ Find and perform any jobs C<$client> can do, forever. When no job is available,
 the working process will sleep for C<$delay> seconds (or 5, if not specified)
 before looking again.
 
+=head2 C<$client-E<gt>work_on($handle)>
+
+Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
+
 =head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
 
 Returns a C<TheSchwartz::Job> for a random job that the client can do. If
@@ -815,6 +976,10 @@ Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
 find matching jobs, with all the attendant performance implications for your
 job databases.
 
+=head2 C<$client-E<gt>get_server_time( $driver )>
+
+Given an open driver I<$driver> to a database, gets the current server time from the database.
+
 =head1 COPYRIGHT, LICENSE & WARRANTY
 
 This software is Copyright 2007, Six Apart Ltd, cpan at sixapart.com. All
diff --git a/lib/TheSchwartz/FuncMap.pm b/lib/TheSchwartz/FuncMap.pm
index 3179082..65c9ba9 100644
--- a/lib/TheSchwartz/FuncMap.pm
+++ b/lib/TheSchwartz/FuncMap.pm
@@ -1,4 +1,4 @@
-# $Id: FuncMap.pm 42 2006-05-16 05:15:30Z btrott $
+# $Id: FuncMap.pm 136 2008-02-23 01:28:13Z bchoate $
 
 package TheSchwartz::FuncMap;
 use strict;
@@ -16,9 +16,16 @@ sub create_or_find {
     my $class = shift;
     my($driver, $funcname) = @_;
 
+    ## Attempt to select funcmap record by name. If successful, return
+    ## object, otherwise proceed with insertion and return.
+    my ($map) = $driver->search('TheSchwartz::FuncMap' =>
+            { funcname => $funcname }
+        );
+    return $map if $map;
+
     ## Attempt to insert a new funcmap row. Since the funcname column is
     ## UNIQUE, if the row already exists, an exception will be thrown.
-    my $map = $class->new;
+    $map = $class->new;
     $map->funcname($funcname);
     eval { $driver->insert($map) };
 
diff --git a/lib/TheSchwartz/Worker.pm b/lib/TheSchwartz/Worker.pm
index bbe61ab..2bfeaae 100644
--- a/lib/TheSchwartz/Worker.pm
+++ b/lib/TheSchwartz/Worker.pm
@@ -1,4 +1,4 @@
-# $Id: Worker.pm 106 2006-10-16 21:42:32Z mpaschal $
+# $Id: Worker.pm 134 2007-08-22 22:04:38Z garth $
 
 package TheSchwartz::Worker;
 use strict;
@@ -24,6 +24,8 @@ sub work_safely {
 
     $job->debug("Working on $class ...");
     $job->set_as_current;
+    $client->start_scoreboard;
+
     eval {
         $res = $class->work($job);
     };
@@ -37,6 +39,8 @@ sub work_safely {
         $cjob->failed('Job did not explicitly complete, fail, or get replaced');
     }
 
+    $client->end_scoreboard;
+
     # FIXME: this return value is kinda useless/undefined.  should we even return anything?  any callers? -brad
     return $res;
 }

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libtheschwartz-perl.git



More information about the Pkg-perl-cvs-commits mailing list