r41143 - in /branches/upstream/libthread-pool-simple-perl/current: Changes MANIFEST META.yml Makefile.PL README Simple.pm t/ t/1.t t/2.t t/3.t t/4.t

ryan52-guest at users.alioth.debian.org ryan52-guest at users.alioth.debian.org
Mon Aug 3 07:27:25 UTC 2009


Author: ryan52-guest
Date: Mon Aug  3 07:27:18 2009
New Revision: 41143

URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=41143
Log:
Load ./to_upload/Thread-Pool-Simple-0.23/ into
branches/upstream/libthread-pool-simple-perl/current.

Added:
    branches/upstream/libthread-pool-simple-perl/current/Changes
    branches/upstream/libthread-pool-simple-perl/current/MANIFEST
    branches/upstream/libthread-pool-simple-perl/current/META.yml
    branches/upstream/libthread-pool-simple-perl/current/Makefile.PL
    branches/upstream/libthread-pool-simple-perl/current/README
    branches/upstream/libthread-pool-simple-perl/current/Simple.pm
    branches/upstream/libthread-pool-simple-perl/current/t/
    branches/upstream/libthread-pool-simple-perl/current/t/1.t
    branches/upstream/libthread-pool-simple-perl/current/t/2.t
    branches/upstream/libthread-pool-simple-perl/current/t/3.t
    branches/upstream/libthread-pool-simple-perl/current/t/4.t

Added: branches/upstream/libthread-pool-simple-perl/current/Changes
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Changes?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Changes (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Changes Mon Aug  3 07:27:18 2009
@@ -1,0 +1,43 @@
+Revision history for Perl extension Thread::Pool::Simple.
+
+0.23  24 MAY 2007
+        - only calling ``sleep'' when not busy to avoid 1 sec delay
+          during thread creation
+
+0.22  10 MAY 2007
+        - corrected a race condition where cancelled jobs may slip
+          into the done queue
+        - added ``init'' handler
+        - calling ``sleep'' instead of ``yield'' in main thread to
+          reduce the number of besy() tests
+
+0.21  9 MAY 2007
+        - fixed two defects found by Dominik Gehl
+        - 1. always call pre_handler() before creating the thread
+        - 2. do not count ``fake'' job in busy()
+
+0.20  29 MAR 2007
+        - reworked some code
+
+0.10  14 MAR 2007
+        - added job cancelation support
+
+0.05  8 MAR 2007
+        - re-added lifespan
+        - fixed a `more than max worker' bug
+
+0.04  7 MAR 2007
+        - remove lifespan, pause, and resume
+        - add `passid' config option
+
+0.03  6 MAR 2007
+        - fixed a bug that passes wrong arguments to the handler
+
+0.02  6 MAR 2007
+        - pass default parameters to the handler
+
+0.01  27 FEB 2007
+        - imitial version  
+	- original version; created by h2xs 1.22 with options
+		-XA Thread::Pool::Simple
+

Added: branches/upstream/libthread-pool-simple-perl/current/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/MANIFEST?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/MANIFEST (added)
+++ branches/upstream/libthread-pool-simple-perl/current/MANIFEST Mon Aug  3 07:27:18 2009
@@ -1,0 +1,10 @@
+Changes
+Makefile.PL
+MANIFEST
+README
+Simple.pm
+t/1.t
+t/2.t
+t/3.t
+t/4.t
+META.yml                                 Module meta-data (added by MakeMaker)

Added: branches/upstream/libthread-pool-simple-perl/current/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/META.yml?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/META.yml (added)
+++ branches/upstream/libthread-pool-simple-perl/current/META.yml Mon Aug  3 07:27:18 2009
@@ -1,0 +1,10 @@
+# http://module-build.sourceforge.net/META-spec.html
+#XXXXXXX This is a prototype!!!  It will change in the future!!! XXXXX#
+name:         Thread-Pool-Simple
+version:      0.23
+version_from: Simple.pm
+installdirs:  site
+requires:
+
+distribution_type: module
+generated_by: ExtUtils::MakeMaker version 6.30

Added: branches/upstream/libthread-pool-simple-perl/current/Makefile.PL
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Makefile.PL?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Makefile.PL (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Makefile.PL Mon Aug  3 07:27:18 2009
@@ -1,0 +1,10 @@
+use 5.008;
+use ExtUtils::MakeMaker;
+
+WriteMakefile(
+    'NAME'		=> 'Thread::Pool::Simple',
+    'VERSION_FROM'	=> 'Simple.pm',
+    'PREREQ_PM'		=> {},
+    ($] >= 5.005 ?
+      (AUTHOR     => 'Jianyuan Wu <jwu at cpan.org>') : ()),
+);

Added: branches/upstream/libthread-pool-simple-perl/current/README
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/README?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/README (added)
+++ branches/upstream/libthread-pool-simple-perl/current/README Mon Aug  3 07:27:18 2009
@@ -1,0 +1,26 @@
+Thread/Pool/Simple
+==================
+
+Thread::Pool::Simple provides a simple thread-pool implementaion
+without external dependencies outside core modules.
+
+INSTALLATION
+
+To install this module type the following:
+
+   perl Makefile.PL
+   make
+   make test
+   make install
+
+DEPENDENCIES
+
+This module requires Perl 5.8 (or above) with multi-thread support.
+
+COPYRIGHT AND LICENCE
+
+Copyright (C) 2007 Jianyuan Wu
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself. 
+

Added: branches/upstream/libthread-pool-simple-perl/current/Simple.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Simple.pm?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Simple.pm (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Simple.pm Mon Aug  3 07:27:18 2009
@@ -1,0 +1,381 @@
+package Thread::Pool::Simple;
+use 5.008;
+use strict;
+use threads;
+use threads::shared;
+use warnings;
+use Carp;
+use Storable qw(nfreeze thaw);
+use Thread::Queue;
+use Thread::Semaphore;
+
+our $VERSION = '0.23';
+
+sub new {
+    my ($class, %arg) = @_;
+    my %config : shared
+      = (min => ($arg{min} || 1),
+         max => ($arg{max} || 10),
+         load => ($arg{load} || 20),
+         lifespan => ($arg{lifespan} || 50000),
+         passid => ($arg{passid} || 0),
+        );
+    my %handler;
+    for (qw(init pre do post)) {
+        next unless exists $arg{$_} && ref $arg{$_} eq 'ARRAY';
+        $handler{$_} = $arg{$_}
+    }
+    my $self = &share({});
+    $self->{config} = \%config;
+    $self->{pending} = Thread::Queue->new();
+    $self->{submitted} = &share({});
+    $self->{done} = &share({});
+    my $state : shared = 0;
+    $self->{state} = \$state;
+    my $worker : shared = 0;
+    $self->{worker} = \$worker;
+    $self->{shutdown_lock} = Thread::Semaphore->new();
+    bless $self, $class;
+    $self->{shutdown_lock}->down();
+    async {
+        $self->_run(\%handler);
+        $self->{shutdown_lock}->up();
+    }->detach();
+    return $self;
+}
+
+sub _run : locked method {
+    my ($self, $handler) = @_;
+    while (1) {
+        last if $self->terminating();
+        if ($self->busy()) {
+            $self->_increase($handler);
+        }
+        else {
+            sleep 1;
+        }
+        threads->yield();
+    }
+    my $worker = $self->{worker};
+    {
+        lock $$worker;
+        cond_wait $$worker while $$worker;
+    }
+}
+
+sub _increase : locked method {
+    my ($self, $handler) = @_;
+    my $max = do { lock %{$self->{config}}; $self->{config}{max} };
+    my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
+    return unless $worker < $max;
+
+    $self->_handle_func($handler->{init});
+
+    eval {
+        threads->create(\&_handle, $self, $handler)->detach();
+        lock ${$self->{worker}};
+        ++${$self->{worker}};
+    };
+    carp "fail to add new thread: $@" if $@;
+}
+
+sub _handle {
+    my ($self, $handler) = @_;
+
+    $self->_handle_func($handler->{pre});
+
+    my $do = $handler->{do};
+    my $func = defined $do ? shift @$do : undef;
+    my ($lifespan, $passid)
+      = do {
+          lock %{$self->{config}};
+          @{$self->{config}}{qw(lifespan passid)}
+      };
+    eval {
+        while (!$self->terminating()
+               && $lifespan--
+              ) {
+            my ($id, $job) = unpack 'Na*', $self->{pending}->dequeue();
+            $self->_state(-2) && last unless $id;
+            $self->_drop($id) && next unless $self->job_exists($id);
+
+            my $arg = thaw($job);
+            my @ret;
+            if ($id % 3 == 2) {  # void context
+                if (defined $func) {
+                    eval {
+                        no strict 'refs';
+                        scalar $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+                    };
+                    $self->_drop($id);
+                    next;
+                }
+            }
+            elsif ($id % 3 == 1) { # list context
+                if (defined $func) {
+                    @ret = eval {
+                        no strict 'refs';
+                        $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+                    };
+                }
+            }
+            else { # scalar context
+                if (defined $func) {
+                    $ret[0] = eval {
+                        no strict 'refs';
+                        $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+                    };
+                }
+            }
+
+            $self->_drop($id) && next unless $self->job_exists($id);
+
+            if ($@) {
+                @ret = ('e', $@);
+            }
+            else {
+                unshift @ret, 'n';
+            }
+            my $ret = nfreeze(\@ret);
+            {
+                lock %{$self->{done}};
+                $self->{done}{$id} = $ret;
+                cond_signal %{$self->{done}};
+            }
+        }
+        continue {
+            threads->yield();
+        }
+    };
+    carp "job handling error: $@" if $@;
+
+    $self->_handle_func($handler->{post});
+
+    my $worker = $self->{worker};
+    lock $$worker;
+    --$$worker;
+    cond_signal $$worker;
+}
+
+sub _handle_func {
+    my ($self, $handler) = @_;
+    return unless defined $handler;
+    my @arg = @$handler;
+    my $func = shift @arg;
+    if (defined $func) {
+        eval {
+            no strict 'refs';
+            $func->(@arg);
+        };
+        carp $@ if $@;
+    }
+}
+
+sub _state : locked method {
+    my $self = shift;
+    my $state = $self->{state};
+    lock $$state;
+    return $$state unless @_;
+    my $s = shift;
+    $$state = $s;
+    return $s;
+}
+
+sub join : locked method {
+    my ($self, $nb) = @_;
+    $self->_state(-1);
+    my $max = do { lock %{$self->{config}}; $self->{config}{max} };
+    $self->{pending}->enqueue((pack('Na*', 0, '')) x $max);
+    return if $nb;
+    $self->{shutdown_lock}->down();
+    sleep 1; # cool down, otherwise may coredump while run tests
+}
+
+sub detach : locked method {
+    my ($self) = @_;
+    $self->join(1);
+}
+
+sub busy : locked method {
+    my ($self) = @_;
+    my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
+    my ($min, $max, $load) = do { lock %{$self->{config}}; @{$self->{config}}{'min', 'max', 'load'} };
+    my $pending = $self->{pending}->pending();
+
+    # do not count the fake job added after join()
+    $pending -= $max if $self->_state() == -1;
+    return $worker < $min || $pending > $worker * $load;
+}
+
+sub terminating : locked method {
+    my ($self) = @_;
+    my $state = $self->_state();
+    my $job = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
+    return 1 if $state == -1 && !$job;
+    return 1 if $state == -2;
+    return;
+}
+
+sub config : locked method {
+    my $self = shift;
+    my $config = $self->{config};
+    lock %$config;
+    return %$config unless @_;
+    %$config = (%$config, @_);
+    return %$config;
+}
+
+sub add : locked method {
+    my $self = shift;
+    my $context = wantarray;
+    $context = 2 unless defined $context; # void context = 2
+    my $arg = nfreeze(\@_);
+    my $id;
+    while (1) {
+        $id = int(rand(time()));
+        next unless $id;
+        ++$id unless $context == $id % 3;
+        ++$id unless $context == $id % 3;
+        {
+            lock %{$self->{submitted}};
+            next if $self->job_exists($id);
+            {
+                # this is necessary as some cancelled jobs may slip in
+                lock %{$self->{done}};
+                delete $self->{done}{$id};
+            }
+            $self->{pending}->enqueue(pack('Na*', $id, $arg));
+            $self->{submitted}{$id} = 1;
+        }
+        last;
+    }
+    return $id;
+}
+
+sub job_exists : locked method {
+    my ($self, $id) = @_;
+    lock %{$self->{submitted}};
+    return $self->{submitted}{$id};
+}
+
+sub job_done : locked method {
+    my ($self, $id) = @_;
+    lock %{$self->{done}};
+    return $self->{done}{$id};
+}
+
+sub _drop : locked method {
+    my ($self, $id) = @_;
+    lock %{$self->{submitted}};
+    delete $self->{submitted}{$id};
+}
+
+sub _remove : locked method {
+    my ($self, $id, $nb) = @_;
+    return if $id % 3 == 2;
+    return unless $self->job_exists($id);
+    my ($exist, $ret);
+    {
+        lock %{$self->{done}};
+        if (!$nb) {
+            cond_wait %{$self->{done}} until exists $self->{done}{$id};
+            cond_signal %{$self->{done}} if 1 < keys %{$self->{done}};
+        }
+        $exist = ($ret) = delete $self->{done}{$id};
+    }
+    $self->_drop($id) if $exist;
+    return $exist unless defined $ret;
+    $ret = thaw($ret);
+    my $err = shift @$ret;
+    croak $ret->[0] if $err eq 'e';
+    return ($exist, @$ret) if $id % 3 == 1;
+    return ($exist, $ret->[0]);
+}
+
+sub remove : locked method {
+    my ($self, $id) = @_;
+    my ($exist, @ret) = $self->_remove($id);
+    return @ret;
+}
+
+
+sub remove_nb : locked method {
+    my ($self, $id) = @_;
+    return $self->_remove($id, 1);
+}
+
+sub cancel : locked method {
+    my ($self, $id) = @_;
+    my ($exist) = eval { $self->remove_nb($id) };
+    if (!$exist) {
+        lock %{$self->{submitted}};
+        $self->{submitted}{$id} = 0;
+    }
+}
+
+sub cancel_all : locked method {
+    my ($self) = @_;
+    my @id = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
+    for (@id) {
+        $self->cancel($_);
+    }
+}
+
+1;
+__END__
+
+=head1 NAME
+
+Thread::Pool::Simple - A simple thread-pool implementation
+
+=head1 SYNOPSIS
+
+  use Thread::Pool::Simple;
+
+  my $pool = Thread::Pool::Simple->new(
+                 min => 3,           # at least 3 workers
+                 max => 5,           # at most 5 workers
+                 load => 10,         # increase worker if on average every worker has 10 jobs waiting
+                 init => [\&init_handle, $arg1, $arg2, ...]   # run before creating worker thread
+                 pre => [\&pre_handle, $arg1, $arg2, ...]   # run after creating worker thread
+                 do => [\&do_handle, $arg1, $arg2, ...]     # job handler for each worker
+                 post => [\&post_handle, $arg1, $arg2, ...] # run before worker threads end
+                 passid => 1,        # whether to pass the job id as the first argument to the &do_handle
+                 lifespan => 10000,  # total jobs handled by each worker
+               );
+
+  my ($id1) = $pool->add(@arg1); # call in list context
+  my $id2 = $pool->add(@arg2);   # call in scalar conetxt
+  $pool->add(@arg3)              # call in void context
+
+  my @ret = $pool->remove($id1); # get result (block)
+  my $ret = $pool->remove_nb($id2); # get result (no block)
+
+  $pool->cancel($id1);           # cancel the job
+  $pool->cancel_all();           # cancel all jobs
+
+  $pool->join();                 # wait till all jobs are done
+  $pool->detach();               # don't wait.
+
+=head1 DESCRIPTION
+
+C<Thread::Pool::Simple> provides a simple thread-pool implementaion
+without external dependencies outside core modules.
+
+Jobs can be submitted to and handled by multi-threaded `workers'
+managed by the pool.
+
+=head1 AUTHOR
+
+Jianyuan Wu, E<lt>jwu at cpan.orgE<gt>
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright 2007 by Jianyuan Wu
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
+=cut
+
+

Added: branches/upstream/libthread-pool-simple-perl/current/t/1.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/1.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/1.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/1.t Mon Aug  3 07:27:18 2009
@@ -1,0 +1,27 @@
+BEGIN {
+    use Config;
+    if (!$Config{useithreads}) {
+        print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+        exit 0;
+    }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+                                     load => 5,
+                                     max => 10,
+                                     do => [sub {  return @_; }],
+                                    );
+
+
+for (1..1000) {
+    my @arg = (1, 2, 3);
+    my ($id, @ret);
+    $pool->add(@arg);
+}
+
+$pool->join();

Added: branches/upstream/libthread-pool-simple-perl/current/t/2.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/2.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/2.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/2.t Mon Aug  3 07:27:18 2009
@@ -1,0 +1,40 @@
+BEGIN {
+    use Config;
+    if (!$Config{useithreads}) {
+        print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+        exit 0;
+    }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+                                     load => 5,
+                                     max => 10,
+                                     do => [sub { return @_; }],
+                                    );
+
+
+for (1..300) {
+    my @arg = (1, 2, 3);
+    my ($id, @ret);
+    $pool->add(@arg);
+
+    ($id) = $pool->add(@arg);
+    @ret = $pool->remove($id);
+    ok("@ret" eq "@arg");
+
+    ($id) = $pool->add(@arg);
+    $pool->cancel($id);
+    @ret = $pool->remove($id);
+    ok(!@ret);
+
+    $id = $pool->add(@arg);
+    @ret = $pool->remove($id);
+    ok($ret[0] == 3);
+}
+
+$pool->join();

Added: branches/upstream/libthread-pool-simple-perl/current/t/3.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/3.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/3.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/3.t Mon Aug  3 07:27:18 2009
@@ -1,0 +1,55 @@
+BEGIN {
+    use Config;
+    if (!$Config{useithreads}) {
+        print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+        exit 0;
+    }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+                                     load => 5,
+                                     max => 20,
+                                     do => [sub { return @_; }],
+                                    );
+
+
+for (1..300) {
+    my @arg = (1, 2, 3);
+    my ($id, @ret);
+    $pool->add(@arg);
+
+    ($id) = $pool->add(@arg);
+    ($id) = $pool->add(@arg);
+    $pool->cancel($id);
+
+    $id = $pool->add(@arg);
+}
+
+$pool->cancel_all();
+
+for (1..300) {
+    my @arg = (5, 6, 7);
+    my ($id, @ret);
+    $pool->add(@arg);
+
+    ($id) = $pool->add(@arg);
+    @ret = $pool->remove($id);
+
+    ok("@ret" eq "@arg");
+
+    ($id) = $pool->add(@arg);
+    $pool->cancel($id);
+    @ret = $pool->remove($id);
+    ok(!@ret);
+
+    $id = $pool->add(@arg);
+    @ret = $pool->remove($id);
+    ok($ret[0] == 3);
+}
+
+$pool->join();

Added: branches/upstream/libthread-pool-simple-perl/current/t/4.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/4.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/4.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/4.t Mon Aug  3 07:27:18 2009
@@ -1,0 +1,27 @@
+BEGIN {
+    use Config;
+    if (!$Config{useithreads}) {
+        print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+        exit 0;
+    }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+                                     load => 5,
+                                     max => 10,
+                                     do => [sub {  return 444/($_[0] - 1); }],
+                                    );
+
+
+my @arg = (1, 2, 3);
+my ($id, @ret);
+$id = $pool->add(@arg);
+eval { @ret = $pool->remove($id) };
+ok ($@ =~ /Illegal division by zero/);
+
+$pool->join();




More information about the Pkg-perl-cvs-commits mailing list