[vspline] 22/72: consolidated opertion with thread pool, now header-only.
Kay F. Jahnke
kfj-guest at moszumanska.debian.org
Sun Jul 2 09:02:39 UTC 2017
This is an automated email from the git hooks/post-receive script.
kfj-guest pushed a commit to branch master
in repository vspline.
commit 68692ee107c33afe2c5d407e5d2a290e7cddc4f1
Author: Kay F. Jahnke <kfjahnke at gmail.com>
Date: Thu Dec 22 10:24:26 2016 +0100
consolidated opertion with thread pool, now header-only.
---
multithread.h | 132 +++++++++++++++++++++-------------------
thread_pool.cc => thread_pool.h | 116 ++++++++++++++++++-----------------
2 files changed, 131 insertions(+), 117 deletions(-)
diff --git a/multithread.h b/multithread.h
index 446cd13..0301db7 100644
--- a/multithread.h
+++ b/multithread.h
@@ -93,6 +93,7 @@
#include <queue>
#include <condition_variable>
#include <vspline/common.h>
+#include <vspline/thread_pool.h>
namespace vspline
{
@@ -174,6 +175,11 @@ using shape_partition_type = partition_type < shape_range_type < dimension > > ;
/// 'forbid' prevents that particular axis from being split. The split may succeed
/// producing n or less ranges, and if 'shape' can't be split at all, a single range
/// encompassing the whole of 'shape' will be returned in the result vector.
+/// TODO: with some shapes, splitting will result in subranges which aren't optimal
+/// for b-spline prefiltering (these are fastest with extents which are a multiple of
+/// the simdized data type), so we might add code to preferably use cut locations
+/// coinciding with those extents. And with small extents being split, the result
+/// becomes very inefficient for filtering.
template < int dim >
struct shape_splitter
@@ -183,8 +189,8 @@ struct shape_splitter
typedef partition_type < range_t > partition_t ;
static partition_t part ( const shape_t & shape , ///< shape to be split n-ways
- int n , ///< intended number of chunks
- int forbid = -1 ) ///< axis which shouldn't be split
+ int n , ///< intended number of chunks
+ int forbid = -1 ) ///< axis which shouldn't be split
{
partition_t res ;
@@ -258,7 +264,8 @@ struct shape_splitter
/// of desired threads instead of a ready-made partitioning.
template < int d >
-partition_type < shape_range_type<d> > partition ( shape_range_type<d> range , int nparts )
+partition_type < shape_range_type<d> >
+partition ( shape_range_type<d> range , int nparts )
{
if ( range[0].any() )
{
@@ -312,34 +319,36 @@ struct joint_task
std::mutex finished_mutex ; // to guard 'finished'
bool finished ; // is set to true as predicate to test by caller
std::condition_variable finished_cv ; // the caller waits for this cv
-
+
+ thread_pool & tp ;
std::vector < std::function < void() > > & taskv ; // tasks to perform
- joint_task ( std::vector < std::function < void() > > & _taskv )
- : taskv (_taskv ) , // individual tasks to perform
+ joint_task ( thread_pool & _tp ,
+ std::vector < std::function < void() > > & _taskv )
+ : tp ( _tp ) , // thread pool to use
+ taskv (_taskv ) , // individual tasks to perform
crew ( _taskv.size() ) , // number of tasks
done ( 0 ) , // number of done tasks
finished ( false ) // not yet finished with all tasks
{
{
// under task_mutex, fill tasks into task queue
- std::lock_guard<std::mutex> lk ( task_mutex ) ;
+ std::lock_guard<std::mutex> lk ( tp.task_mutex ) ;
for ( int i = 0 ; i < crew ; i++ )
{
// bind both the ith task and a pointer to this object to action_wrapper,
// resulting in a functional which is then pushed to the task queue
std::function < void() >
action = std::bind ( action_wrapper , taskv[i] , this ) ;
- task_queue.push ( action ) ;
+ tp.task_queue.push ( action ) ;
}
}
- task_cv.notify_all() ; // alert all worker threads
+ tp.task_cv.notify_all() ; // alert all worker threads
{
// now wait for the last task to complete
- std::unique_lock<std::mutex> lk ( task_mutex ) ;
+ std::unique_lock<std::mutex> lk ( tp.task_mutex ) ;
// the predicate done==crew rejects spurious wakes
- task_cv.wait ( lk , [&] { return done == crew
- || worker_stay_alive == false ; } ) ;
+ tp.task_cv.wait ( lk , [&] { return done == crew ; } ) ;
}
// we're sure now that all tasks are done, so we let the caller know
finished = true ;
@@ -365,15 +374,14 @@ void action_wrapper ( std::function < void() > action ,
// number of actions originating from p_coordinator
bool all_done ;
{
- std::lock_guard<std::mutex> lk ( task_mutex ) ;
+ std::lock_guard<std::mutex> lk ( p_coordinator->tp.task_mutex ) ;
all_done = ( ++ (p_coordinator->done) == p_coordinator->crew ) ;
}
if ( all_done )
{
// this was the last action originating from p_coordinator
// notify the coordinator that the joint task is now complete
-// p_coordinator->complete_cv.notify_one() ;
- task_cv.notify_all() ;
+ p_coordinator->tp.task_cv.notify_all() ;
}
}
@@ -399,6 +407,8 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
partition_type < range_type > partitioning ,
Types ...args )
{
+ static thread_pool tp ; // keep a thread pool only for multithread()
+
// get the number of ranges in the partitioning
int nparts = partitioning.size() ;
@@ -414,12 +424,13 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
}
- // create the joint_task object, passing the vector of tasks.
+ // create the joint_task object, passing the thread pool and
+ // the vector of tasks.
// The joint_task object launches the tasks and makes sure they
// all complete before notifying on its' finished_cv condition
// variable
- joint_task jt ( taskv ) ;
+ joint_task jt ( tp , taskv ) ;
{
// wait until 'jt.finished' is true
@@ -428,6 +439,7 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
}
// now the joint task is complete
+
return nparts ;
}
@@ -446,59 +458,57 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
// partition the range and update nparts in case the partitioning produced
// less ranges than we initially asked for. Rely on the presence of a function
// partition() which can process range_type:
- // currently ups the number of partitions to 8-fold, which seems to be a
- // good overall compromise.
- partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
+ partition_type < range_type > partitioning = partition ( range , nparts ) ;
nparts = partitioning.size() ;
return multithread ( pfunc , partitioning , args... ) ;
}
-/// multithread_async works like multithread, but instead of waiting
-/// for the joint task to complete, it immediately returns the joint_task
-/// object (beware, this has to be deleted in the calling code)
-
-template < class range_type , class ...Types >
-joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
- partition_type < range_type > partitioning ,
- Types ...args )
-{
- int nparts = partitioning.size() ;
- std::vector < std::function < void() > > taskv ( nparts ) ;
- for ( int s = 0 ; s < nparts ; s++ )
- {
- taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
- }
- // here we don't wait for the tasks to complete but simply return
- // a pointer to the joint task object, passing control to the caller.
- return new joint_task ( taskv ) ;
-}
-
-/// overload of multithread_async with automatic range partitioning
-
-template < class range_type , class ...Types >
-joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
- int nparts ,
- range_type range ,
- Types ...args )
-{
- partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
- nparts = partitioning.size() ;
-
- return multithread_async ( pfunc , partitioning , args... ) ;
-}
+// /// multithread_async works like multithread, but instead of waiting
+// /// for the joint task to complete, it immediately returns the joint_task
+// /// object (beware, this has to be deleted in the calling code)
+//
+// template < class range_type , class ...Types >
+// joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+// partition_type < range_type > partitioning ,
+// Types ...args )
+// {
+// int nparts = partitioning.size() ;
+// std::vector < std::function < void() > > taskv ( nparts ) ;
+// for ( int s = 0 ; s < nparts ; s++ )
+// {
+// taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
+// }
+// // here we don't wait for the tasks to complete but simply return
+// // a pointer to the joint task object, passing control to the caller.
+// return new joint_task ( taskv ) ;
+// }
+//
+// /// overload of multithread_async with automatic range partitioning
+//
+// template < class range_type , class ...Types >
+// joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+// int nparts ,
+// range_type range ,
+// Types ...args )
+// {
+// partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
+// nparts = partitioning.size() ;
+//
+// return multithread_async ( pfunc , partitioning , args... ) ;
+// }
/// simple routine to run a single task asynchronously
-template < class ...Types >
-void run_async ( void (*pfunc) ( Types... ) ,
- Types ...args )
-{
- auto task = std::bind ( pfunc , args... ) ;
- std::lock_guard<std::mutex> lk ( task_mutex ) ;
- task_queue.push ( task ) ;
-}
+// template < class ...Types >
+// void run_async ( void (*pfunc) ( Types... ) ,
+// Types ...args )
+// {
+// auto task = std::bind ( pfunc , args... ) ;
+// std::lock_guard<std::mutex> lk ( task_mutex ) ;
+// task_queue.push ( task ) ;
+// }
// some apply type routines, preliminary
diff --git a/thread_pool.cc b/thread_pool.h
similarity index 70%
rename from thread_pool.cc
rename to thread_pool.h
index 3eb049b..a917def 100644
--- a/thread_pool.cc
+++ b/thread_pool.h
@@ -33,12 +33,6 @@
///
/// \brief provides a thread pool for vspline's multithread() routine
///
-/// This code needs to be compiled separately. It is used by vspline's
-/// multithread() routines and has to be linked with any code using them.
-///
-/// a typical compile would go like this:
-///
-/// clang++ -c -std=c++11 -o thread_pool.o -O3 thread_pool.cc
#include <thread>
#include <mutex>
@@ -48,30 +42,50 @@
namespace vspline
{
- // task queue, mutex and condition variable
+
+/// code to run a worker thread
+/// We use a thread pool of worker threads. These threads have a very
+/// simple cycle: They try and obtain a task (std::function<void()>).
+/// If there is one to be had, it is invoked, otherwise they wait on
+/// task_cv.
+
+class thread_pool
+{
+ // used to switch off the worker threads at program termination
+
+ bool stay_alive = true ;
+
+ /// the thread pool itself is held in this variable
+
+ std::vector < std::thread * > pool ;
+
+ /// the threads are started, stopped and destroyed by this object
+
+ int nthreads ;
+
+public:
+
+ // mutex and condition variable
// for interaction with the thread pool
std::mutex task_mutex ;
- std::queue < std::function < void() > > task_queue ;
std::condition_variable task_cv ;
+ std::queue < std::function < void() > > task_queue ;
- // used to switch off the worker threads at program termination
-
- bool worker_stay_alive = true ;
-
- // we need a thread pool of worker threads. These threads have a very
- // simple cycle: They try and obtain a task (std::function<void()>).
- // If there is one to be had, it is invoked, otherwise they wait on
- // task_cv.
+private:
void worker_thread()
{
std::function < void() > task ;
- while ( worker_stay_alive ) // if stay_alive is set to false, die
+ while ( true )
{
// under task_mutex, try and obtain a task
std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
+
+ if ( ! stay_alive )
+ break ;
+
int sz = task_queue.size() ;
if ( sz )
{
@@ -86,63 +100,53 @@ namespace vspline
// got a task, perform it, then try for another one
task() ;
}
- else if ( worker_stay_alive )
+ else
{
// no luck. wait.
task_cv.wait ( task_lock ) ; // simply wait, spurious alert is okay
}
- else
- {
- task_lock.unlock() ;
- break ;
- }
// start next cycle, either after having completed a job
// or after having been woken by an alert
}
}
- /// the thread pool itself is held in this variable
-
- std::vector < std::thread * > thread_pool ;
+public:
- /// the threads are started, stopped and destroyed by this object
-
- struct manage_thread_pool
+ thread_pool ( int _nthreads = 2 * std::thread::hardware_concurrency() )
+ : nthreads ( _nthreads )
{
- const int nthreads ;
-
- manage_thread_pool ( int _nthreads = 2 * std::thread::hardware_concurrency() )
- : nthreads ( _nthreads )
- {
- for ( int t = 0 ; t < nthreads ; t++ )
- thread_pool.push_back ( new std::thread ( worker_thread ) ) ;
- }
+ std::function < void() > wf = std::bind ( &thread_pool::worker_thread , this ) ;
+ for ( int t = 0 ; t < nthreads ; t++ )
+ pool.push_back ( new std::thread ( wf ) ) ;
+ }
- void kill_thread_pool()
+ ~thread_pool()
+ {
+ // under task_mutex, set stay_alive to false
+
+ std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
+ stay_alive = false ;
+ task_lock.unlock() ;
+
+ // wake all inactive worker threads,
+ // join all worker threads once they are finished
+
+ task_cv.notify_all() ;
+
+ for ( auto threadp : pool )
{
- worker_stay_alive = false ;
- task_cv.notify_all() ;
- for ( auto threadp : thread_pool )
- {
- threadp->join() ;
- delete threadp ;
- }
+ threadp->join() ;
}
- ~manage_thread_pool()
+ // once all are joined, delete their std::thread object
+
+ for ( auto threadp : pool )
{
- if ( worker_stay_alive )
- kill_thread_pool() ;
+ delete threadp ;
}
- } ;
-
- // of which we have precisely one static instance here:
+ }
+} ;
- manage_thread_pool _thread_pool_manager ;
- void kill_thread_pool()
- {
- _thread_pool_manager.kill_thread_pool() ;
- }
} ; // end of namespace vspline
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/vspline.git
More information about the debian-science-commits
mailing list