r14866 - in /branches/upstream/libpoe-component-jobqueue-perl: ./ current/ current/CHANGES current/JobQueue.pm current/MANIFEST current/META.yml current/Makefile.PL current/README current/t/ current/t/01_queues.t

gregoa-guest at users.alioth.debian.org gregoa-guest at users.alioth.debian.org
Wed Feb 13 20:31:02 UTC 2008

Author: gregoa-guest
Date: Wed Feb 13 20:31:01 2008
New Revision: 14866

URL: http://svn.debian.org/wsvn/?sc=1&rev=14866
[svn-inject] Installing original source of libpoe-component-jobqueue-perl


Added: branches/upstream/libpoe-component-jobqueue-perl/current/CHANGES
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/CHANGES?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/CHANGES (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/CHANGES Wed Feb 13 20:31:01 2008
@@ -1,0 +1,20 @@
+2004-05-19 16:13:42 v0_54
+  2004-05-19 16:13:42 by rcaputo; JobQueue.pm 1.10
+    Zach Thompson reported the hash keys problem that was previously
+    patched, reminding me that I hadn't actually released the fix to the
+    world. Here I am bumping the version number so I can release it to
+    the CPAN. 
+  2003-11-29 23:18:56 by rcaputo; JobQueue.pm 1.9; t/01_queues.t 1.8
+    Applied neyuki's patch to correct some typos in hash keys and
+    parameter names. This also corrects the worker respond logic in
+    active queues. 
+Beginning of Recorded History

Added: branches/upstream/libpoe-component-jobqueue-perl/current/JobQueue.pm
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/JobQueue.pm?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/JobQueue.pm (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/JobQueue.pm Wed Feb 13 20:31:01 2008
@@ -1,0 +1,576 @@
+# $Id: JobQueue.pm,v 1.10 2004/05/19 16:13:42 rcaputo Exp $
+# License and documentation are after __END__.
+package POE::Component::JobQueue;
+use strict;
+use vars qw($VERSION);
+$VERSION = '0.54';
+use Carp qw (croak);
+use POE::Session;
+sub DEBUG () { 0 };
+# Spawn a new PoCo::JobQueue session.  This basically is a
+# constructor, but it isn't named "new" because it doesn't create a
+# usable object.  Instead, it spawns the object off as a session.
+sub spawn {
+  my $type = shift;
+  croak "$type requires an even number of parameters" if @_ % 2;
+  my %params = @_;
+  ### Parameters that are common to both types of job queue.
+  my $alias = delete $params{Alias};
+  $alias = 'queuer' unless defined $alias and length $alias;
+  my $worker = delete $params{Worker};
+  croak "$type requires a coderef Worker parameter"
+    unless defined $worker and ref($worker) eq 'CODE';
+  my $worker_limit = delete $params{WorkerLimit};
+  $worker_limit = 8 unless defined $worker_limit and $worker_limit > 0;
+  croak "$type requires either an Active or a Passive parameter block"
+    unless defined($params{Active}) xor defined($params{Passive});
+  ### Parameters and states that are common to both types of queue.
+  my @args    = ( $alias, $worker_limit, $worker );
+  my %states  =
+    ( _child  => \&poco_jobqueue_both_child,
+      stop    => \&poco_jobqueue_both_stop,
+      _stop   => sub {},
+    );
+  ### Modal parameters and states go here.
+  # Set up for an active queue.
+  if (exists $params{Active}) {
+    my $active = delete $params{Active};
+    my $poll_interval = delete $active->{PollInterval};
+    $poll_interval = undef
+      unless defined $poll_interval and $poll_interval > 0;
+    my $ack_alias = delete $active->{AckAlias};
+    $ack_alias = undef unless defined $ack_alias and length $ack_alias;
+    my $ack_state = delete $active->{AckState};
+    $ack_state = undef unless defined $ack_state and length $ack_state;
+    croak "$type must have neither or both AckAlias and AckState"
+      if defined($ack_alias) xor defined($ack_state);
+    $states{_start}  = \&poco_jobqueue_active_start;
+    $states{dequeue} = \&poco_jobqueue_active_dequeue;
+    push @args, $poll_interval, $ack_alias, $ack_state;
+  }
+  # Set up for a passive queue.
+  elsif (exists $params{Passive}) {
+    my $passive = delete $params{Passive};
+    my $prioritizer = delete $passive->{Prioritizer};
+    $prioritizer = sub { 1 } unless defined $prioritizer;
+    croak( "$type doesn't know these Passive parameters: ",
+           join(', ', sort keys %$passive)
+         ) if scalar keys %$passive;
+    $states{_start}  = \&poco_jobqueue_passive_start;
+    $states{dequeue} = \&poco_jobqueue_passive_dequeue;
+    $states{enqueue} = \&poco_jobqueue_passive_enqueue;
+    push @args, $prioritizer;
+  }
+  croak( "$type doesn't know these parameters: ",
+         join(', ', sort keys %params)
+       ) if scalar keys %params;
+  # Spawn whichever queue we've built.
+  POE::Session->create
+    ( inline_states => \%states,
+      args          => \@args,
+    );
+  undef;
+# Helper function for active job queues.
+sub poco_jobqueue_active_meta_postback {
+  die "unimplemented bit";
+# Start an active job queue.  This type of queue polls for new jobs.
+sub poco_jobqueue_active_start {
+  my ( $kernel, $heap,
+       $alias, $worker_limit, $worker_ref,
+       $poll_interval, $ack_alias, $ack_state
+     ) = @_[KERNEL, HEAP, ARG0..ARG5];
+  # Common parameters.
+  $heap->{alias}         = $alias;
+  $heap->{worker_limit}  = $worker_limit;
+  $heap->{worker_ref}    = $worker_ref;
+  # Active queue parameters.
+  $heap->{poll_interval} = $poll_interval;
+  $heap->{meta_postback} =
+    sub {
+      my @job = @_;
+      my $session = $kernel->alias_resolve( $ack_alias );
+      return $session->postback( $ack_state, @job ) if defined $session;
+      return sub { 1 };
+    };
+  # State variables.  Pending polls starts at 1 because we're going to
+  # fake an initial poll to get things started.
+  $heap->{worker_count}  = 0;
+  $heap->{pending_polls} = 0;
+  $heap->{latest_worker} = 0;
+  # Register an alias.
+  $kernel->alias_set($alias);
+  # Start an initial set of workers.
+  $kernel->yield( 'dequeue' );
+# Start a passive job queue.  This type of queue waits for something
+# else to enqueue jobs.
+sub poco_jobqueue_passive_start {
+  my ( $kernel, $heap,
+       $alias, $worker_limit, $worker_ref,
+       $prioritizer
+     ) = @_[KERNEL, HEAP, ARG0..ARG3];
+  # Common parameters.
+  $heap->{alias}         = $alias;
+  $heap->{worker_limit}  = $worker_limit;
+  $heap->{worker_ref}    = $worker_ref;
+  # Active queue parameters.
+  $heap->{prioritizer}   = $prioritizer;
+  # State variables.
+  $heap->{worker_count}  = 0;
+  $heap->{job_queue}     = [ ];
+  # Register an alias.
+  $kernel->alias_set($alias);
+# A worker either has come or gone.  Track the number of running
+# workers, and spawn new ones if appropriate.
+sub poco_jobqueue_both_child {
+  my ($kernel, $heap, $operation) = @_[KERNEL, HEAP, ARG0];
+  # A worker has begun its job.  Count it so we know how many exist.
+  if ($operation eq 'gain' or $operation eq 'create') {
+    DEBUG and warn "JQ: job queue $heap->{alias} got a new worker";
+    $heap->{worker_count}++;
+  }
+  # A worker has finished.  Decrement our worker count, and try to
+  # start another worker to take its place.
+  else {
+    DEBUG and warn "JQ: job queue $heap->{alias} lost a worker";
+    warn( "worker count ($heap->{worker_count}) exceeded the limit (",
+          $heap->{worker_limit}, ")"
+        ) if $heap->{worker_count} > $heap->{worker_limit};
+    $heap->{worker_count}--;
+    $kernel->yield('dequeue') unless $heap->{latest_worker};
+  }
+# Attempt to fill empty worker slots.
+# This is a token for ARG0 that signifies this was a timed event.
+sub TIMED () { 31415 }
+sub poco_jobqueue_active_dequeue {
+  my ($kernel, $heap, $is_timed) = @_[KERNEL, HEAP, ARG0];
+  # If this is a poll from a timed event, then decrement the pending
+  # polls count.  The pending polls count is just to ensure that
+  # redundant delays are not set, because each redundant delay would
+  # force the existing one forward in time.  They would delay polling
+  # past the hard polling interval, which would probably be bad (and
+  # could stave off polling indefinitely in some instances).  I think
+  # this is a bit of a hack, and something better should replace it.
+  if (defined $is_timed and $is_timed == TIMED) {
+    # Decrement the number of pending polls.  There can be only one,
+    # so throw in a die for assertion testing.
+    die "pending polls should now be zero (not $heap->{pending_polls})"
+      if --$heap->{pending_polls};
+  }
+  # Attempt to fill the empty worker slots.
+  while ($heap->{worker_count} < $heap->{worker_limit}) {
+    # Call the worker to fetch a new job and spawn a session.
+    my $previous_worker_count = $heap->{worker_count};
+    $heap->{worker_ref}->( $heap->{meta_postback} );
+    # If the worker count hasn't changed, then we've run out of jobs.
+    # Begin polling, if applicable, and exit the spawn loop.
+    if ($heap->{worker_count} == $previous_worker_count) {
+      if (defined $heap->{poll_interval} and !$heap->{pending_polls}) {
+        $heap->{pending_polls}++;
+        $kernel->delay( dequeue => $heap->{poll_interval} => TIMED );
+      }
+      $heap->{latest_worker}++ unless defined $heap->{poll_interval};
+      last;
+    }
+  }
+# Attempt to fill empty worker slots.
+sub poco_jobqueue_passive_dequeue {
+  my ($kernel, $heap) = @_[KERNEL, HEAP];
+  # Attempt to fill the empty worker slots.
+  while ($heap->{worker_count} < $heap->{worker_limit}) {
+    # Try to fetch another job from the queue.
+    my $next_job = shift @{ $heap->{job_queue} };
+    last unless defined $next_job;
+    DEBUG and
+      warn "JQ: job queue $heap->{alias} is starting a new worker";
+    # Start a new session with the job.
+    $heap->{worker_ref}->( @$next_job );
+  }
+  # Avoid accidentally returning something.
+  undef;
+# Enqueue a job in a passive queue.
+sub poco_jobqueue_passive_enqueue {
+  my ($kernel, $sender, $heap, $return_state, @job) =
+    @_[KERNEL, SENDER, HEAP, ARG0..$#_];
+  DEBUG and warn "JQ: job queue $heap->{alias} enqueuing a new job";
+  my $postback = $sender->postback( $return_state, @job );
+  # Add the job to the queue.  Use the prioritizer to find the right
+  # place to put it.
+  my $queue_index = @{ $heap->{job_queue} };
+  while ($queue_index--) {
+    last if
+      $heap->{prioritizer}->( $heap->{job_queue}->[$queue_index],
+                              \@job,
+                            ) >= 0;
+  }
+  # Place the new job after the index we found.
+  splice( @{$heap->{job_queue}}, $queue_index+1, 0, [ $postback, @job ] );
+  # Dequeue a new event.
+  $kernel->yield( 'dequeue' );
+=head1 NAME
+POE::Component::JobQueue - a component to manage queues and worker pools
+=head1 SYNOPSIS
+  use POE qw(Component::JobQueue);
+  # Passive queue waits for enqueue events.
+  POE::Component::JobQueue->spawn
+    ( Alias         => 'passive',         # defaults to 'queuer'
+      WorkerLimit   => 16,                # defaults to 8
+      Worker        => \&spawn_a_worker,  # code which will start a session
+      Passive       =>
+      { Prioritizer => \&job_comparer,    # defaults to sub { 1 } # FIFO
+      },
+    );
+  # Active queue fetches jobs and spawns workers.
+  POE::Component::JobQueue->spawn
+    ( Alias          => 'active',          # defaults to 'queuer'
+      WorkerLimit    => 32,                # defaults to 8
+      Worker         => \&fetch_and_spawn, # fetch a job and start a session
+      Active         =>
+      { PollInterval => 1,                 # defaults to undef (no polling)
+        AckAlias     => 'respondee',       # defaults to undef (no respondee)
+        AckState     => 'response',        # defaults to undef
+      },
+    );
+  # Enqueuing a job in a passive queue.
+  $kernel->post( 'passive',   # post to 'passive' alias
+                 'enqueue',   # 'enqueue' a job
+                 'postback',  # which of our states is notified when it's done
+                 @job_params, # job parameters
+               );
+  # Passive worker function.
+  sub spawn_a_worker {
+    my ($postback, @job_params) = @_;     # same parameters as posted
+    POE::Session->create
+      ( inline_states => \%inline_states, # handwaving over details here
+        args          => [ $postback,     # $postback->(@results) to return
+                           @job_params,   # parameters of this job
+                         ],
+      );
+  }
+  # Active worker function.
+  sub fetch_and_spawn {
+    my $meta_postback = shift;               # called to create a postback
+    my @job_params = &fetch_next_job();      # fetch the next job's parameters
+    if (@job_params) {                       # if there's a job to do...
+      my $postback = $meta_postback->(@job_params); # ... create a postback
+      POE::Session->create                          # ... create a session
+        ( inline_states => \%inline_states,  # handwaving over details here
+          args          => [ $postback,      # $postback->(@results) to return
+                             @job_params,    # parameters of this job
+                           ],
+        );
+    }
+  }
+  # Invoke a postback to acknowledge that a job is done.
+  $postback->( @job_results );
+  # This is the sub which is called when a postback is invoked.
+  sub postback_handler {
+    my ($request_packet, $response_packet) = @_[ARG0, ARG1];
+    my @original_job_params = @{$request_packet};  # original post/fetch
+    my @job_results         = @{$response_packet}; # passed to the postback
+    print "original job parameters: (@original_job_params)\n";
+    print "results of finished job: (@job_response)\n";
+  }
+POE::Component::JobQueue manages a finite pool of worker sessions as
+they handle an arbitrarily large number of tasks.  It often is used as
+a form of flow control, preventing a large group of tasks from
+exhausting some sort of resource.
+PoCo::JobQueue implements two kinds of queue: active and passive.
+Both kinds of queue use a Worker coderef to spawn sessions that
+process jobs, but how they use the Worker differs between them.
+Active queues' Worker code fetches a new job from a resource that must
+be polled.  For example, it may read a new line from a file.  Passive
+queues, on the other hand, are given jobs with 'enqueue' events.
+Their Worker functions are passed the next job as parameters.
+JobQueue components are not proper objects.  Instead of being created,
+as most objects are, they are "spawned" as separate sessions.  To
+avoid confusion (and hopefully not cause other confusion), they must
+be spawned wich a C<spawn> method, not created anew with a C<new> one.
+POE::Component::JobQueue's C<spawn> method takes different parameters
+depending whether it's going to be an active or a passive session.
+Regardless, there are a few parameters which are the same for both:
+=over 2
+=item Alias => $session_alias
+C<Alias> sets the name by which the session will be known.  If no
+alias is given, the component defaults to "queuer".  The alias lets
+several sessions interact with job queues without keeping (or even
+knowing) hard references to them.  It's possible to spawn several
+queues with different aliases.
+=item WorkerLimit => $worker_count
+C<WorkerLimit> sets the limit on the number of worker sessions which
+will run in parallel.  It defaults arbitrarily to 8.  No more than
+this number of workers will be active at once.
+=item Worker => \&worker
+C<Worker> is a coderef which is called whenever it's time to spawn a
+new session.  What it receives as parameters and what it's expected to
+do are slightly different for active and passive sessions.
+Active workers receive just one parameter: a meta-postback.  This is
+used to build a postback once the next job's parameters are known.
+They're expected to actively fetch the next job's parameters and spawn
+a new session if necessary.
+See C<sub fetch_and_spawn> in the SYNOPSIS for an example of an active
+worker function.>
+Passive workers' arguments include a pre-built postback and the next
+job's parameters.  Since the JobQueue component already knows what the
+job parameters are, it's done most of the work for the worker.  All
+that's left is to spawn the session that will process the job.
+See C<sub spawn_a_worker> in the SYNOPSIS for an example of a passive
+worker function.
+When a postback is called, it posts its parameters (plus the
+parameters passed when it was created) to the session it belongs to.
+Postbacks are discussed in the POE::Session manpage.
+These parameters are unique to passive queues:
+=over 2
+=item Passive => \%passive_parameters
+C<Passive> contains a hashref of passive queue parameters.  The
+C<Passive> parameter block's presence indicates that the queue will be
+passive, but its contents may be empty since all its parameters are
+  Passive => { }, # all passive parameters take default values
+A queue can't be both active and passive at the same time.
+The C<Passive> block takes up to one parameter.
+=over 2
+=item Prioritizer => \&prioritizer_function
+C<Prioritizer> holds a function that defines how a job queue will be
+ordered.  The prioritizer function receives references to two jobs,
+and it returns a value which tells the JobQueue component which job
+should be dealt with first.
+In the Unix tradition, lower priorities go first.  This transforms the
+prioritizer into a simple sort function, which it has been modelled
+after.  Like sort's sorter sub, the prioritizer returns -1 if the
+first job goes before the second one; 0 if both jobs have the same
+priority; and 1 if the first job goes after the second.  It's easier
+to write an example than to describe it:
+  sub low_priorities_first {
+    my ($first_job, $second_job) = @_;
+    return $first_job->{priority} <=> $second_job->{priority};
+  }
+The first argument always refers to the new job being enqueued.
+The default prioritizer always returns 1.  Since the first argument
+always refers to the new job being enqueued, this effects a FIFO
+queue.  Replacing it with a prioritizer that always returns -1 will
+turn the JobQueue into a stack (last in, first out).
+These parameters are unique to active queues:
+=over 2
+=item Active => \%active_parameters
+C<Active> contains a hashref of active queue parameters.  The
+C<Active> parameter block's presence indicates that the queue will be
+active, but its contens may be empty since all its parameters are
+  Active => { }, # all active parameters take default values
+A queue can't be both active and passive at the same time.
+The C<Active> block takes up to three parameters.
+=over 2
+=item PollInterval => $seconds
+Active C<Worker> functions indicate that they've run out of jobs by
+failing to spawn new sessions.  When this happens, an active queue may
+go into "polling" mode.  In this mode, the C<Worker> is called
+periodically to see if new jobs have appeared in whatever it's getting
+them from.
+C<PollInterval>, if present, tells the job queue how often to call
+C<Worker> in the absence of new sessions.  If it's omitted, the active
+queue stops after the first time it runs out of jobs.
+=item AckAlias => $alias
+=item AckState => $state
+C<AckAlias> and C<AckState> tell the active job queue where to send
+acknowledgements of jobs which have been completed.  If one is
+specified, then both must be.
+Sessions communicate asynchronously with passive JobQueue components.
+They post "enqueue" requests to it, and it posts job results back.
+Requests are posted to the component's "enqueue" state.  They include
+the name of a state to post responses back to, and a list of job
+parameters.  For example:
+  $kernel->post( 'queue', 'enqueue', # queuer session alias & state
+                 'job_results',      # my state to receive responses
+                 @job_parameters,    # parameters of the job
+               );
+Once the job is completed, the handler for 'job_results' will be
+called with the job parameters and results.  See C<sub
+postback_handler> in the SYNOPSIS for an example results handler.
+Active JobQueue components act as event generators.  They don't
+receive jobs from the outside; instead, they poll for them and post
+acknowledgements as they're completed.
+=head1 SEE ALSO
+This component is built upon and POE.  Please see its source code and
+the documentation for its foundation modules to learn more.
+Also see the test program, t/01_queues.t, in the
+POE::Component::JobQueue distribution.
+=head1 BUGS
+This documentation is pretty bad.
+POE::Component::JobQueue is Copyright 1999-2002 by Rocco Caputo.  All
+rights are reserved.  POE::Component::JobQueue is free software; you
+may redistribute it and/or modify it under the same terms as Perl
+Rocco may be contacted by e-mail via rcaputo at cpan.org.

Added: branches/upstream/libpoe-component-jobqueue-perl/current/MANIFEST
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/MANIFEST?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/MANIFEST (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/MANIFEST Wed Feb 13 20:31:01 2008
@@ -1,0 +1,8 @@
+# $Id: MANIFEST,v 1.1 2000/09/04 20:19:57 rcaputo Exp $
+META.yml                                 Module meta-data (added by MakeMaker)

Added: branches/upstream/libpoe-component-jobqueue-perl/current/META.yml
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/META.yml?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/META.yml (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/META.yml Wed Feb 13 20:31:01 2008
@@ -1,0 +1,11 @@
+# http://module-build.sourceforge.net/META-spec.html
+#XXXXXXX This is a prototype!!!  It will change in the future!!! XXXXX#
+name:         POE-Component-JobQueue
+version:      0.54
+version_from: JobQueue.pm
+installdirs:  site
+    POE:                           0.11
+distribution_type: module
+generated_by: ExtUtils::MakeMaker version 6.17

Added: branches/upstream/libpoe-component-jobqueue-perl/current/Makefile.PL
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/Makefile.PL?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/Makefile.PL (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/Makefile.PL Wed Feb 13 20:31:01 2008
@@ -1,0 +1,27 @@
+# $Id: Makefile.PL,v 1.3 2002/09/10 03:42:50 rcaputo Exp $
+use ExtUtils::MakeMaker;
+# Touch CHANGES so it exists.
+open(CHANGES, ">>CHANGES") and close CHANGES;
+  ( NAME         => 'POE::Component::JobQueue',
+    AUTHOR       => 'Rocco Caputo <rcaputo at cpan.org>',
+    ABSTRACT     => ( 'POE component for processing large numbers of tasks ' .
+                      'with finite numbers of workers.'
+                    ),
+    VERSION_FROM => 'JobQueue.pm',
+    PM           => { 'JobQueue.pm' => '$(INST_LIBDIR)/JobQueue.pm' },
+    PREREQ_PM    => { POE      => 0.11,
+                    },
+    dist         =>
+    { COMPRESS   => 'gzip -9f',
+      SUFFIX     => 'gz',
+      PREOP      => ( 'cvs-log.perl | ' .
+                      'tee ./$(DISTNAME)-$(VERSION)/CHANGES > ./CHANGES'
+                    ),
+    },
+  );

Added: branches/upstream/libpoe-component-jobqueue-perl/current/README
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/README?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/README (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/README Wed Feb 13 20:31:01 2008
@@ -1,0 +1,66 @@
+$Id: README,v 1.2 2001/05/29 17:54:55 rcaputo Exp $
+POE::Component::JobQueue manages a finite pool of worker sessions as
+they handle an arbitrarily large number of tasks.  It often is used as
+a form of flow control, preventing an arbitrarily large number of
+worker sessions from exhausting some finite resource.
+This module requires POE.  This requirement is encoded in Makefile.PL,
+so the CPAN shell should do the right thing.  If this is installed
+without the CPAN shell, the person installing it will have to do the
+right thing manually.
+Basic Installation
+POE::Component::JobQueue may be installed through the CPAN shell in
+the usual CPAN shell manner.  This typically is:
+  $ perl -MCPAN -e 'intstall POE::Component::JobQueue'
+You can also read this README from the CPAN shell:
+  $ perl -MCPAN -e shell
+  cpan> readme POE::Component::JobQueue
+And you can install the component from the CPAN prompt as well:
+  cpan> install POE::Component::JobQueue
+Manual Installation
+POE::Component::JobQueue can also be installed manually.
+<ftp://ftp.cpan.org/pub/CPAN/authors/id/R/RC/RCAPUTO/> or a similarly
+named directory at your favorite CPAN mirror should hold the latest
+Downloading and unpacking the distribution are left as exercises for
+the reader.  To build and test it:
+  perl Makefile.PL
+  make test
+The test program, t/01_pools.t, makes an excellent sample program.  If
+you would like to see more details about its operation, edit it and
+set the DEBUG constant to any value Perl considers "true".
+When you're ready to install the component:
+  make install
+It should now be ready to use.
+Thanks for reading!
+-- Rocco Caputo / troc at netrus.net / poe.perl.org / poe.sourceforge.net

Added: branches/upstream/libpoe-component-jobqueue-perl/current/t/01_queues.t
URL: http://svn.debian.org/wsvn/branches/upstream/libpoe-component-jobqueue-perl/current/t/01_queues.t?rev=14866&op=file
--- branches/upstream/libpoe-component-jobqueue-perl/current/t/01_queues.t (added)
+++ branches/upstream/libpoe-component-jobqueue-perl/current/t/01_queues.t Wed Feb 13 20:31:01 2008
@@ -1,0 +1,208 @@
+#!/usr/bin/perl -w
+# $Id: 01_queues.t,v 1.8 2003/11/29 23:18:56 rcaputo Exp $
+use strict;
+sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+use POE;
+use POE::Component::JobQueue;
+sub DEBUG () { 0 };
+$| = 1;
+print "1..3\n";
+# A list of tasks to run.
+my @active_tasks  = qw( 5 4 3 2 1 5 4 3 2 1 );
+my @passive_tasks = @active_tasks;
+my @tests_done;
+my $active_test              = 1;
+my $passive_test             = $active_test + scalar @passive_tasks;
+my $active_simultaneous      = 0;
+my $passive_simultaneous     = 0;
+my $active_max_simultaneous  = 0;
+my $passive_max_simultaneous = 0;
+my $target_done_count        = @active_tasks + @passive_tasks;
+sub worker_start {
+  my ($kernel, $heap, $postback, $test, $task, $a_or_p) =
+    @_[KERNEL, HEAP, ARG0..ARG3];
+  $heap->{test} = $test;
+  $heap->{task} = $task;
+  $heap->{aorp} = $a_or_p;
+  $heap->{postback} = $postback;
+  $kernel->delay( done => $task );
+  DEBUG and warn "$a_or_p test $test started ($task)\n";
+  if ($a_or_p eq 'active') {
+    $active_simultaneous++;
+    $active_max_simultaneous = $active_simultaneous
+      if $active_simultaneous > $active_max_simultaneous;
+  }
+  else {
+    $passive_simultaneous++;
+    $passive_max_simultaneous = $passive_simultaneous
+      if $passive_simultaneous > $passive_max_simultaneous;
+  }
+sub worker_done {
+  my $heap = $_[HEAP];
+  DEBUG and
+    warn "$heap->{aorp} test $heap->{test} finished ($heap->{task})\n";
+  push @tests_done, $heap->{test};
+  if ($heap->{aorp} eq 'active') {
+    $active_simultaneous--;
+  }
+  else {
+    $passive_simultaneous--;
+  }
+  my $postback = delete $heap->{postback};
+  if (ref $postback eq 'ARRAY') {
+    my $session = $_[KERNEL]->alias_resolve($postback->[0]);
+    if (defined $session) {
+      $postback = $session->postback( $postback->[1], $heap->{task} );
+    }
+    else {
+      $postback = sub { 1 };
+    }
+  }
+  # Causes some evil recursion somewhere. :(
+  # $postback->( $heap->{test}, $heap->{task} );
+sub spawn_worker {
+  my ($outer_postback, $outer_test, $outer_task, $active_or_passive) = @_;
+  POE::Session->create
+    ( inline_states =>
+      { _start => \&worker_start,
+        done   => \&worker_done,
+        _stop => sub {},
+      },
+      args => [ $outer_postback, $outer_test, $outer_task, $active_or_passive ]
+    );
+sub passive_respondee_start {
+  my $kernel = $_[KERNEL];
+  $kernel->yield( 'flood_queue' );
+sub passive_respondee_flood_queue {
+  my $kernel = $_[KERNEL];
+  foreach (@passive_tasks) {
+    $kernel->post( passive => enqueue => response => $_ );
+  }
+  $kernel->yield( 'dummy' );
+sub passive_respondee_response {
+  my ($request, $response) = @_[ARG0, ARG1];
+  my (@req_job) = @$request;
+  my (@resp_answer) = @$response;
+  DEBUG and warn "passive respondee got: (@req_job) = (@resp_answer)";
+  ( inline_states =>
+    { _start      => \&passive_respondee_start,
+      flood_queue => \&passive_respondee_flood_queue,
+      response    => \&passive_respondee_response,
+      # quiets ASSERT_DEFAULT
+      _stop       => sub {},
+      dummy       => sub {},
+    }
+  );
+sub active_respondee_start {
+  my $kernel = $_[KERNEL];
+  $kernel->alias_set( 'respondee' );
+sub active_respondee_response {
+  my ($request, $response) = @_[ARG0, ARG1];
+  my (@req_job) = @$request;
+  my (@resp_answer) = @$response;
+  DEBUG and warn "active respondee got: (@req_job) = (@resp_answer)";
+  ( inline_states =>
+    { _start   => \&active_respondee_start,
+      response => \&active_respondee_response,
+      # quiets ASSERT_DEFAULT
+      _stop    => sub {},
+    }
+  );
+  ( Alias        => 'active',
+    WorkerLimit  => 5,
+    Worker       =>
+    sub {
+      my $metapostback = shift;
+      my $task = shift @active_tasks;
+      my $test = $active_test++;
+      if (defined $task) {
+        my $postback = $metapostback->($task);
+        &spawn_worker( $postback, $test, $task, 'active' );
+      }
+    },
+    Active =>
+    { AckAlias => 'respondee',
+      AckState => 'response',
+    },
+  );
+  ( Alias       => 'passive',
+    WorkerLimit => 5,
+    Worker      =>
+    sub {
+      my ($postback, $task) = @_;
+      my $test = $passive_test++;
+      &spawn_worker($postback, $test, $task, 'passive') if defined $task;
+    },
+    Passive => { },
+  );
+# Run it all until done.
+# Figure out whether the tests worked.
+print 'not ' unless $active_max_simultaneous == 5;
+print "ok 1\n";
+print 'not ' unless $passive_max_simultaneous == 5;
+print "ok 2\n";
+print 'not ' unless scalar(@tests_done) == $target_done_count;
+print "ok 3\n";

More information about the Pkg-perl-cvs-commits mailing list