[vspline] 21/72: mainly work on multithreading, pv I changed the multithreading code to use a thread pool. At the same time I introduced new joint processing code which makes the multithreading more fine-grained, so that the workload is more evenly distributed to the worker threads. The new code is partly in multithread.h, which has the joint task handling, and partly in thread_pool.cc, which has to be compiled separately and linked in. pv is using the new methods. The biggest change is the calculation of the HQ b-spline in the background while panning is already running on the LQ spline.
Kay F. Jahnke
kfj-guest at moszumanska.debian.org
Sun Jul 2 09:02:39 UTC 2017
This is an automated email from the git hooks/post-receive script.
kfj-guest pushed a commit to branch master
in repository vspline.
commit 8221de10edd1074801a0dd3c53c6a87244aae192
Author: Kay F. Jahnke <kfjahnke at gmail.com>
Date: Wed Dec 21 11:14:39 2016 +0100
mainly work on multithreading, pv
I changed the multithreading code to use a thread pool. At the same time
I introduced new joint processing code which makes the multithreading
more fine-grained, so that the workload is more evenly distributed to
the worker threads. The new code is partly in multithread.h, which
has the joint task handling, and partly in thread_pool.cc, which has
to be compiled separately and linked in.
pv is using the new methods. The biggest change is the calculation
of the HQ b-spline in the background while panning is already running
on the LQ spline.
---
bspline.h | 2 +-
common.h | 7 +
example/roundtrip.cc | 8 +-
filter.h | 5 +-
interpolator.h | 2 +-
multithread.h | 324 +++++++++++++++++++++++++---------------
prefilter.h | 2 +-
remap.h | 408 +++++++++++++++++++++++++++++++++------------------
thread_pool.cc | 148 +++++++++++++++++++
9 files changed, 633 insertions(+), 273 deletions(-)
diff --git a/bspline.h b/bspline.h
index d2fda9b..3d55b93 100644
--- a/bspline.h
+++ b/bspline.h
@@ -400,7 +400,7 @@ public:
/// bspline object.
void prefilter ( bool use_vc = true , ///< use Vc by default
- int nthreads = ncores , ///< number of threads to use
+ int nthreads = ncores * 8 , ///< number of tasks to use
view_type data = view_type() ) ///< view to knot point data to use instead of 'core'
{
// if the user should have modified 'bcv' since the bspline object's creation,
diff --git a/common.h b/common.h
index 74e4ff6..bee75bb 100644
--- a/common.h
+++ b/common.h
@@ -50,8 +50,15 @@
#endif
+#include <thread>
+
namespace vspline {
+/// number of CPU cores in the system; per default, multithreading routines
+/// will perform their task with as many threads.
+
+const int ncores = std::thread::hardware_concurrency() ;
+
#ifdef USE_VC
// initially, I used Vc::SimdArray in many places, but I figured that having a central
diff --git a/example/roundtrip.cc b/example/roundtrip.cc
index cd0ce1f..c15b5c8 100644
--- a/example/roundtrip.cc
+++ b/example/roundtrip.cc
@@ -296,7 +296,7 @@ void roundtrip ( view_type & data ,
#ifdef USE_VC
- vspline::transformation < pixel_type , rc_type , 2 , 2 >
+ vspline::transformation < rc_type , 2 , 2 , vsize >
tf ( tf_identity<rc_type> ) ; // , vtf_identity<rc_v> ) ;
#else
@@ -305,7 +305,7 @@ void roundtrip ( view_type & data ,
// of the single-element coordinate transform. The effect is the same,
// but the code is potentially slower.
- vspline::transformation < pixel_type , rc_type , 2 , 2 >
+ vspline::transformation < rc_type , 2 , 2 , 1 >
tf ( tf_identity<rc_type> ) ;
#endif
@@ -315,7 +315,7 @@ void roundtrip ( view_type & data ,
#endif
for ( int times = 0 ; times < TIMES ; times++ )
- vspline::tf_remap < pixel_type , rc_type , 2 , 2 >
+ vspline::tf_remap < pixel_type , pixel_type , rc_type , 2 , 2 >
( data , tf , target , bcv , DEGREE , use_vc ) ;
@@ -333,7 +333,7 @@ void roundtrip ( view_type & data ,
#endif
for ( int times = 0 ; times < TIMES ; times++ )
- vspline::tf_remap < pixel_type , rc_type , eval_type , 2 , 2 >
+ vspline::tf_remap < eval_type , 2 , 2 >
( ev , tf , target , use_vc ) ;
// note:: with REFLECT this doesn't come out right, because of the .5 difference!
diff --git a/filter.h b/filter.h
index 851ac74..baf74d6 100644
--- a/filter.h
+++ b/filter.h
@@ -1482,7 +1482,10 @@ public:
// (by passing the 'forbid' parameter, which is taken as -1 (no effect) in the default
// partitioning of nD arrays).
- partition_t partitioning = shape_splitter < dim > :: part ( input.shape() , nslices , axis ) ;
+ partition_t partitioning = shape_splitter < dim > :: part
+ ( input.shape() ,
+ nslices ,
+ axis ) ;
multithread ( pf ,
partitioning ,
diff --git a/interpolator.h b/interpolator.h
index df4f16d..d24ef59 100644
--- a/interpolator.h
+++ b/interpolator.h
@@ -55,7 +55,7 @@
namespace vspline {
template < int _dimension , ///< dimension of the spline
- class _value_type , ///< type of spline coefficients/result (pixels etc.)
+ class _value_type , ///< type of interpolation result
class _rc_type , ///< singular real coordinate, like float or double
class _ic_type > ///< singular integral coordinate, like int
struct interpolator
diff --git a/multithread.h b/multithread.h
index bb27cad..446cd13 100644
--- a/multithread.h
+++ b/multithread.h
@@ -47,7 +47,7 @@
/// ranges so that the start point is inside and the end point outside the data
/// set described by the range, just like iterators obtained by begin() and end().
/// This range is made explicit, even if it is implicit in the data which we want to
-/// submit to multithreading, and there is a type for the purpose, struct range_type.
+/// submit to multithreading, and there is a type for the purpose: struct range_type.
/// range_type merely captures the concept of a range, taking 'limit_type' as it's
/// template parameter, so that any type of range can be accomodated.
/// Next we define an object holding a set of ranges, modeling a partitioning of
@@ -58,20 +58,29 @@
/// group is dealing with multidimensional shapes, where a range extends from a 'lower'
/// coordinate to a 'higer' coordinate. These two coordinates can be used directly
/// to call vigra's 'subarray' function.
-/// Next we provide code to partition such ranges into sets of subranges.
+/// Next we provide code to partition ranges into sets of subranges.
/// Finally we can express a generalized multithreading routine. This routine takes
/// a functor capable of processing a range specification and a parameter pack of
/// arbitrary further parameters, of which some will usually be refering to manifolds
/// of data for which the given range makes sense. We call this routine with a
/// partitioning of the original range and the same parameter pack that is to be passed
-/// to the functor. The multithreading routine proceeds to set up threads as needed,
-/// starting each with the functor as it's functional, receiving a subrange from
+/// to the functor. The multithreading routine proceeds to set up 'tasks' as needed,
+/// providing each with the functor as it's functional, a subrange from
/// the partitioning and the parameter pack as arguments. There's also an overload
/// to multithread() where we pass in a range over the data and a desired number of
-/// threads, leaving the partitioning to code inside multithread(), resulting in some
+/// tasks, leaving the partitioning to code inside multithread(), resulting in some
/// default partitioning provided by a suitable overload of function partition().
/// This is the commonly used variant, since it's usually not necessary to obtain
/// a partitioning other than the default one.
+/// The tasks, once prepared, are handed over to a 'joint_task' object which handles
+/// the interaction with the thread pool (in thread_pool.cc). While my initial code
+/// used one thread per task, this turned out inefficient, because it was not granular
+/// enough: the slowest thread became the limiting factor. Now the job at hand is split
+/// into more individual tasks (something like 8 times the number of cores), resulting
+/// in a fair compromise concerning granularity. multithread() waits for all tasks to
+/// terminate and returns when it's certain that the job is complete.
+/// With this method, we assure that on return of multithread() we can safely access
+/// whatever results we anticipate.
#ifndef VSPLINE_MULTITHREAD_H
#define VSPLINE_MULTITHREAD_H
@@ -80,15 +89,13 @@
#include <vigra/tinyvector.hxx>
#include <vigra/multi_array.hxx>
#include <thread>
+#include <mutex>
+#include <queue>
+#include <condition_variable>
#include <vspline/common.h>
namespace vspline
{
-/// number of CPU cores in the system; per default, multithreading routines
-/// will perform their task with as many threads.
-
-const int ncores = std::thread::hardware_concurrency() ;
-
/// given limit_type, we define range_type as a TinyVector of two limit_types,
/// the first denoting the beginning of the range and the second it's end, with
/// end being outside of the range.
@@ -272,156 +279,233 @@ partition_type < shape_range_type<d> > partition ( shape_range_type<d> range , i
return shape_splitter < d > :: part ( range[1] , nparts ) ;
}
-/// multithread is a routine to perform multithreading with functors accessing several
-/// containers, iterators etc. ('manifolds') of identical structure together.
-/// when such a functor is invoked for a specific thread, it will only process a subset
-/// of each of the original manifold objects. This 'subset' notion is codified as range_type:
-/// range_type specifies a starting point and an end point inside the manifolds defining
-/// the extent of the subset. While it is tempting to code so that the multithreading
-/// routine feeds the subsets to the individual threads, I found that passing the
-/// range to the thread, together with the original variadic arguments to multithread,
-/// produces clean and compact code with a slightly higher burden on the functor, since
-/// it has to 'cut out' the subset(s) it is meant to process itself. Why is this better?
-/// Because it unburdens the multithreading code from knowing any specifics about the
-/// 'cutting out' procedure, and at the same time allows the functor to access the
-/// 'whole' manifold and it's metrics if this is needed, like in code looking at the
-/// coordinates into an array, which should refer to the 'whole' array, not to the
-/// subarrays produced by partitioning.
-/// I have also taken the actual partitioning of the original range out of the core
-/// multithreading routine, because at times the calling code needs to influence this
-/// process. The situation inside vspline where this is necessary is in the prefiltering
-/// code, where the partitioning has to avoid splitting up the current processing axis,
-/// passing the exis as additional parameter 'forbid' to class shape_splitter.
-// TODO I can't yet pass references 'through' the multithreading mechanism.
-
-// template < class range_type , class ...Types >
-// int multithread ( std::function < void ( range_type , Types... ) > func ,
-// partition_type < range_type > partitioning ,
-// Types ...args )
-// {
-// // get the number of ranges in the partitioning
-//
-// int nparts = partitioning.size() ;
-//
-// assert ( nparts > 0 ) ; // make sure, for now, that there is at least one range
-//
-// // set up thread objects for the additional threads we'll use (if any)
-//
-// int new_threads = nparts - 1 ;
-// std::thread * t[new_threads] ;
-//
-// // start the additional threads (if any), passing func as the thread's functional,
-// // followed by the range over which func is meant to operate, followed by any other
-// // arguments passed to multithread as parameter pack 'args'
-//
-// for ( int s = 0 ; s < new_threads ; s++ )
-// {
-// t[s] = new std::thread ( func , partitioning[s] , args... ) ;
-// }
-//
-// // do the last partial job in this very thread. this may be the only job,
-// // if partitioning only contains a single range
-//
-// func ( partitioning [ new_threads ] , args... ) ;
-//
-// // join the additional threads (if any) and delete their thread objects
-//
-// for ( int s = 0 ; s < new_threads ; s++ )
-// {
-// t[s]->join() ;
-// delete t[s] ;
-// }
-//
-// // and return the number of threads which was used.
-//
-// return nparts ;
-// }
+// next we need some collateral code for the implementation of multithread().
+// We have a thread pool (in thread_pool.cc) which we want to use for
+// multithreading. To interface with the thread pool, we need to declare
+// a few objects living in threadpool.cc:
+
+extern std::mutex task_mutex ;
+extern std::queue < std::function < void() > > task_queue ;
+extern std::condition_variable task_cv ;
+extern void kill_thread_pool() ;
+extern bool worker_stay_alive ;
+
+// next some forward declarations
+
+struct joint_task ;
+
+void action_wrapper ( std::function < void() > action , joint_task * p_coordinator ) ;
+
+/// struct joint_task serves as a coordinator of a multithreaded operation.
+/// on creation, it receives a vector of tasks, which are packaged as
+/// std::function<void()>. These tasks are wrapped in 'action_wrapper',
+/// which executes the task and communicates with the joint_task object
+/// to keep it updated on the progress of the tasks.
+/// Once the last task is complete, the joint_task in turn notifies
+/// the calling code.
+
+struct joint_task
+{
+ const int crew ; // number of partial jobs to perform
+ int done ; // number of partial jobs already completed
+
+ std::mutex finished_mutex ; // to guard 'finished'
+ bool finished ; // is set to true as predicate to test by caller
+ std::condition_variable finished_cv ; // the caller waits for this cv
+
+ std::vector < std::function < void() > > & taskv ; // tasks to perform
+
+ joint_task ( std::vector < std::function < void() > > & _taskv )
+ : taskv (_taskv ) , // individual tasks to perform
+ crew ( _taskv.size() ) , // number of tasks
+ done ( 0 ) , // number of done tasks
+ finished ( false ) // not yet finished with all tasks
+ {
+ {
+ // under task_mutex, fill tasks into task queue
+ std::lock_guard<std::mutex> lk ( task_mutex ) ;
+ for ( int i = 0 ; i < crew ; i++ )
+ {
+ // bind both the ith task and a pointer to this object to action_wrapper,
+ // resulting in a functional which is then pushed to the task queue
+ std::function < void() >
+ action = std::bind ( action_wrapper , taskv[i] , this ) ;
+ task_queue.push ( action ) ;
+ }
+ }
+ task_cv.notify_all() ; // alert all worker threads
+ {
+ // now wait for the last task to complete
+ std::unique_lock<std::mutex> lk ( task_mutex ) ;
+ // the predicate done==crew rejects spurious wakes
+ task_cv.wait ( lk , [&] { return done == crew
+ || worker_stay_alive == false ; } ) ;
+ }
+ // we're sure now that all tasks are done, so we let the caller know
+ finished = true ;
+ finished_cv.notify_one() ;
+ } ;
+} ;
+
+/// action_wrapper wraps a functional into an outer function which
+/// first calls the functional and then interacts with a joint_task
+/// object, increasing the joint_task object's 'done' count and checking
+/// if the increased done count is now equal to the total number of tasks
+/// the joint_task object coordinates. If that is so, the joint_task
+/// object is notified.
+
+void action_wrapper ( std::function < void() > action ,
+ joint_task * p_coordinator )
+{
+ // perform wrapped action
+ action() ;
+
+ // under the coordinator's complete_mutex, increase the coordinator's
+ // 'done' counter and test if it's now equal to 'crew', the total
+ // number of actions originating from p_coordinator
+ bool all_done ;
+ {
+ std::lock_guard<std::mutex> lk ( task_mutex ) ;
+ all_done = ( ++ (p_coordinator->done) == p_coordinator->crew ) ;
+ }
+ if ( all_done )
+ {
+ // this was the last action originating from p_coordinator
+ // notify the coordinator that the joint task is now complete
+// p_coordinator->complete_cv.notify_one() ;
+ task_cv.notify_all() ;
+ }
+}
+
+// with this collateral code at hand, we can now implement multithread().
+
+/// multithread uses a thread pool of worker threads to perform
+/// a multithreaded operation. It receives a functor (a single-threaded
+/// function used for all individual tasks), a partitioning, which contains
+/// information on which part of the data each task should process, and
+/// a set of parameters to pass on to the functor.
+/// The individual tasks are created by binding the functor with
+///
+/// - a range from the partitioning, describing it's share of the data
+///
+/// - the remaining parameters
+///
+/// These tasks are stored in a std::vector 'taskv' which is used to
+/// create a 'joint_task' object which handles the interaction with the
+/// thread pool and notifies once all tasks have completed.
template < class range_type , class ...Types >
int multithread ( void (*pfunc) ( range_type , Types... ) ,
partition_type < range_type > partitioning ,
- Types ...args )
+ Types ...args )
{
// get the number of ranges in the partitioning
int nparts = partitioning.size() ;
- assert ( nparts > 0 ) ; // make sure, for now, that there is at least one range
+ // set up the individual tasks by binding all arguments to p_func,
+ // resulting in a vecor of std::function<void()> to pass to the
+ // joint_task object
- // set up thread objects for the additional threads we'll use (if any)
+ std::vector < std::function < void() > > taskv ( nparts ) ;
- int new_threads = nparts - 1 ;
- std::thread * t[new_threads] ;
-
- // start the additional threads (if any), passing func as the thread's functional,
- // followed by the range over which func is meant to operate, followed by any other
- // arguments passed to multithread as parameter pack 'args'
-
- for ( int s = 0 ; s < new_threads ; s++ )
+ for ( int s = 0 ; s < nparts ; s++ )
{
- t[s] = new std::thread ( pfunc , partitioning[s] , args... ) ;
+ taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
}
- // do the last partial job in this very thread. this may be the only job,
- // if partitioning only contains a single range
+ // create the joint_task object, passing the vector of tasks.
+ // The joint_task object launches the tasks and makes sure they
+ // all complete before notifying on its' finished_cv condition
+ // variable
- pfunc ( partitioning [ new_threads ] , args... ) ;
+ joint_task jt ( taskv ) ;
- // join the additional threads (if any) and delete their thread objects
-
- for ( int s = 0 ; s < new_threads ; s++ )
{
- t[s]->join() ;
- delete t[s] ;
+ // wait until 'jt.finished' is true
+ std::unique_lock<std::mutex> lk ( jt.finished_mutex ) ;
+ jt.finished_cv.wait ( lk , [&jt] { return jt.finished ; } ) ;
}
- // and return the number of threads which was used.
-
+ // now the joint task is complete
return nparts ;
}
-/// variant of multithread taking the desired number of threads and a range instead
-/// of the partitioning. This variant relies on the presence of a suitable function
-/// partition() to provide the partitioning of the incoming range into at most nparts
-/// subranges. If the calling code doesn't to influence the partitioning process, this
-/// is the variant to use.
-
-// template < class range_type , class ...Types >
-// int multithread2 ( std::function < void ( range_type , Types... ) > func ,
-// int nparts ,
-// range_type range ,
-// Types ...args )
-// {
-// // partition the range and update nparts in case the partitioning produced
-// // less ranges than we initially asked for. Rely on the presence of a function
-// // partition() which can process range_type:
-//
-// partition_type < range_type > partitioning = partition ( range , nparts ) ;
-// nparts = partitioning.size() ;
-//
-// return multithread ( func , partitioning , args... ) ;
-// }
+/// this overload of multithread() takes the desired number of tasks and a
+/// range covering the 'whole' data. partition() is called with the range,
+/// resulting in a partitioning which above overload of multithread() can use.
+// TODO we still have to update all calls to multithread() to pass in the
+// number of partial tasks (so we can omit the * 8)
template < class range_type , class ...Types >
int multithread ( void (*pfunc) ( range_type , Types... ) ,
int nparts ,
range_type range ,
- Types ...args )
+ Types ...args )
{
// partition the range and update nparts in case the partitioning produced
// less ranges than we initially asked for. Rely on the presence of a function
// partition() which can process range_type:
+ // currently ups the number of partitions to 8-fold, which seems to be a
+ // good overall compromise.
- partition_type < range_type > partitioning = partition ( range , nparts ) ;
+ partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
nparts = partitioning.size() ;
return multithread ( pfunc , partitioning , args... ) ;
}
+/// multithread_async works like multithread, but instead of waiting
+/// for the joint task to complete, it immediately returns the joint_task
+/// object (beware, this has to be deleted in the calling code)
+
+template < class range_type , class ...Types >
+joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+ partition_type < range_type > partitioning ,
+ Types ...args )
+{
+ int nparts = partitioning.size() ;
+ std::vector < std::function < void() > > taskv ( nparts ) ;
+ for ( int s = 0 ; s < nparts ; s++ )
+ {
+ taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
+ }
+ // here we don't wait for the tasks to complete but simply return
+ // a pointer to the joint task object, passing control to the caller.
+ return new joint_task ( taskv ) ;
+}
+
+/// overload of multithread_async with automatic range partitioning
+
+template < class range_type , class ...Types >
+joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+ int nparts ,
+ range_type range ,
+ Types ...args )
+{
+ partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
+ nparts = partitioning.size() ;
+
+ return multithread_async ( pfunc , partitioning , args... ) ;
+}
+
+/// simple routine to run a single task asynchronously
+
+template < class ...Types >
+void run_async ( void (*pfunc) ( Types... ) ,
+ Types ...args )
+{
+ auto task = std::bind ( pfunc , args... ) ;
+ std::lock_guard<std::mutex> lk ( task_mutex ) ;
+ task_queue.push ( task ) ;
+}
+
+
// some apply type routines, preliminary
// the idea is to write a functor which bundles all processing steps.
// this functor is then aplied to the target array, filling in the result
// by calculating it value by value.
+// TODO why is this code in multithread.h??
template < typename view_type >
void apply_point_func ( void (*pfunc) ( typename view_type::value_type & ) ,
@@ -472,7 +556,7 @@ void apply_v_point_func ( void (*pvfunc) ( typename view_type::value_type * ) ,
}
#endif
-
+
} ; // end if namespace vspline
#endif // #ifndef VSPLINE_MULTITHREAD_H
diff --git a/prefilter.h b/prefilter.h
index c57f74c..3331333 100644
--- a/prefilter.h
+++ b/prefilter.h
@@ -178,7 +178,7 @@ void solve ( input_array_type & input ,
double tolerance ,
double smoothing = 0.0 ,
bool use_vc = true ,
- int nslices = ncores )
+ int nslices = ncores * 8 )
{
if ( smoothing != 0.0 )
{
diff --git a/remap.h b/remap.h
index 16531fc..2f04dbe 100644
--- a/remap.h
+++ b/remap.h
@@ -319,7 +319,7 @@ void remap ( const interpolator_type & ev ,
value_type ,
interpolator_type ,
dim_out > ,
- nthreads , range , &ev , &warp , &output , use_vc ) ;
+ nthreads * 8 , range , &ev , &warp , &output , use_vc ) ;
} ;
/// This is a variant of remap, which directly takes an array of values and remaps it,
@@ -552,49 +552,69 @@ void remap ( const evaluator < dim_in , value_type , rc_type , ic_type > & ev ,
shape_range_type < dim_out > range ( shape_type < dim_out > () , output.shape() ) ;
multithread ( & st_wremap < value_type , ic_type , rc_type , dim_in , dim_out > ,
- nthreads , range , &ev , warp.select , warp.tune , output , use_vc ) ;
+ nthreads * 8 , range , &ev , warp.select , warp.tune , output , use_vc ) ;
} ;
// Next we have some collateral code to get ready for the transformation-based remap
-/// type alias for a coordinate transformation functor, to (hopefully) make the signature
-/// more legible. In words: 'transform' is a standard function transforming an n-dimensional
-/// incoming coordinate to an m-dimensional outgoing coordinate. This function takes both
-/// coordinates as references.
+// /// type alias for a coordinate transformation functor, to (hopefully) make the signature
+// /// more legible. In words: 'transform' is a standard function transforming an n-dimensional
+// /// incoming coordinate to an m-dimensional outgoing coordinate. This function takes both
+// /// coordinates as references.
+//
+// template < class rc_type , // elementary type for coordinates, usually float
+// int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
+// int dim_out > // number of dimensions of output array
+// using transform
+// = std::function < void ( const vigra::TinyVector < rc_type , dim_out > & ,
+// vigra::TinyVector < rc_type , dim_in > & ) > ;
+//
+// #ifdef USE_VC
+//
+// /// We use this type alias, since the type is rather unwieldy
+// /// what we need to define is a SimdArray of rc_type, which has just as
+// /// many elements as a simdized_type of value_type's elementary type:
+//
+// template < class rc_type , class value_type >
+// using rc_v
+// = typename
+// vector_traits < rc_type ,
+// vector_traits < typename ExpandElementResult < value_type > :: type > :: vsize
+// > :: type ;
+//
+// /// This type alias defines a vectorized coordinate transformation.
+// /// it is the equivalent of the unvectorized type above, taking TinyVectors
+// /// of the appropriate SimdArray objects instead of singular values.
+//
+// template < class rc_v , // simdized type of vsize real coordinates
+// int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
+// int dim_out > // number of dimensions of output array
+// using vtransform
+// = std::function < void
+// ( const vigra::TinyVector < rc_v , dim_out > & ,
+// vigra::TinyVector < rc_v , dim_in > & ) > ;
+
+/// declaration of a generalized coordinate transformation. In vspline, we're using
+/// a 'reverse transformation': For every coordinate ct into the target array (the result
+/// of the 'remap' procedure, the final picture) we calculate a corresponding coordinate cs
+/// into the source data/spline coefficients, where we evaluate the interpolator to yield
+/// the value which has to be written to the target array at ct.
+///
+/// target [ ct ] = interpolator ( cs )
+///
+/// target [ ct ] = interpolator ( transform ( ct ) )
+///
+/// the coordinates ct and cs can be of different dimensionality (dim_target, dim_source)
+/// but share their component type, so that
+///
+/// std::is_same < decltype ( cs[0] ) , decltype ( ct[0] ) > :: value == true
-template < class rc_type , // elementary type for coordinates, usually float
- int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
- int dim_out > // number of dimensions of output array
+template < class component_type , // 1D coordinate type, single value or Simdarray
+ int dim_target , // number of dimensions of target array, incoming coordinate
+ int dim_source > // number of dimensions of coefficient array, resultant coordinate
using transform
-= std::function < void ( const vigra::TinyVector < rc_type , dim_out > & ,
- vigra::TinyVector < rc_type , dim_in > & ) > ;
-
-#ifdef USE_VC
-
-/// We use this type alias, since the type is rather unwieldy
-/// what we need to define is a SimdArray of rc_type, which has just as
-/// many elements as a simdized_type of value_type's elementary type:
-
-template < class rc_type , class value_type >
-using rc_v
-= typename
- vector_traits < rc_type ,
- vector_traits < typename ExpandElementResult < value_type > :: type > :: vsize
- > :: type ;
-
-/// This type alias defines a vectorized coordinate transformation.
-/// it is the equivalent of the unvectorized type above, taking TinyVectors
-/// of the appropriate SimdArray objects instead of singular values.
-
-template < class rc_v , // simdized type of vsize real coordinates
- int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
- int dim_out > // number of dimensions of output array
-using vtransform
-= std::function < void
- ( const vigra::TinyVector < rc_v , dim_out > & ,
- vigra::TinyVector < rc_v , dim_in > & ) > ;
-
-#endif
+= std::function < void ( const vigra::TinyVector < component_type , dim_target > & ,
+ vigra::TinyVector < component_type , dim_source > & ) > ;
/// class transformation is a (multi-) functor, handling coordinate transformations
/// this class simplifies pulling in transformations by automatically providing a brodcasting
@@ -621,60 +641,60 @@ using vtransform
/// c_out = c_in ;
/// }
///
-/// vspline::transformation < pixel_type , rc_type , 2 , 2 >
-/// tf ( tf_identity<rc_type> , vtf_identity<rc_v> ) ;
-
-// TODO: it's not really necessary to pass in value_type as a template arg, it's only
-// used to determine the correct vsize.
-// TODO: did I get muddled with dim_in and dim_out here? doublecheck!
-// TODO: naming with 'in' and 'out'creates confusion. use other naming scheme
-
-template < class value_type , /// coefficient/result type
- class rc_type , /// elementary coordinate type (float, double)
- int dim_in , /// dimension of incoming coordinates (== dim. of target array)
- int dim_out > /// dimension of outgoing coordinates (== dim. of coefficient array)
+/// vspline::transformation < float , 2 , 2 >
+/// tf ( tf_identity<float> , vtf_identity<float_v> ) ;
+
+template < class rc_type , /// elementary coordinate type
+ int dim_target , /// dimension of incoming coordinates (== dim. of target array)
+ int dim_source , /// dimension of outgoing coordinates (== dim. of coefficient array)
+ int vsize = 1 /// number of elements in Simdized coordinate
+ >
class transformation
{
- typedef transform < rc_type , dim_in , dim_out > transform_type ;
+ typedef transform < rc_type , dim_target , dim_source > transform_type ;
transform_type tf ; /// functor for single-value coordinate transformation
- typedef vigra::TinyVector < rc_type , dim_in > rc_in_type ;
- typedef vigra::TinyVector < rc_type , dim_out > rc_out_type ;
+ typedef vigra::TinyVector < rc_type , dim_target > target_nd_rc_type ;
+ typedef vigra::TinyVector < rc_type , dim_source > source_nd_rc_type ;
#ifdef USE_VC
- typedef rc_v < rc_type , value_type > rc_v_type ;
- const int vsize = rc_v_type::Size ;
-
- typedef vigra::TinyVector < rc_v_type , dim_in > rc_v_in_type ;
- typedef vigra::TinyVector < rc_v_type , dim_out > rc_v_out_type ;
+ typedef typename vector_traits < rc_type , vsize > :: type rc_v ;
+
+ typedef vigra::TinyVector < rc_v , dim_target > target_nd_rc_v ;
+ typedef vigra::TinyVector < rc_v , dim_source > source_nd_rc_v ;
- typedef vtransform < rc_v_type , dim_in , dim_out > vtransform_type ;
+ typedef transform < rc_v , dim_target , dim_source > vtransform_type ;
vtransform_type vtf ; /// functor for vectorized operation
- /// broadcast calls the single-element transform repeatedly to emulate vectorized operation
-
- void broadcast ( const rc_v_in_type & c_in , rc_v_out_type & c_out )
+ /// broadcast calls the single-element transform repeatedly to emulate vectorized operation.
+ /// This is used to build up to a vectorized transformation: usually the unvectorized code
+ /// is easier to provide or even already present. By broadcasting it, we get a functioning
+ /// prototype and a reference to test emerging vector code against. Once a vectorized version
+ /// of the transformation has been written and is passed into class transformation's constructor,
+ /// bradcasting is no longer used.
+
+ void broadcast ( const target_nd_rc_v & c_target_v , source_nd_rc_v & c_source_v )
{
- rc_in_type cs_in ;
- rc_out_type cs_out ;
+ target_nd_rc_type c_target ;
+ source_nd_rc_type c_source ;
for ( int e = 0 ; e < vsize ; e++ )
{
- for ( int d = 0 ; d < dim_out ; d++ )
- cs_in[d] = c_in[d][e] ;
- tf ( cs_in , cs_out ) ;
- for ( int d = 0 ; d < dim_in ; d++ )
- c_out[d][e] = cs_out[d] ;
+ for ( int d = 0 ; d < dim_source ; d++ )
+ c_target[d] = c_target_v[d][e] ;
+ tf ( c_target , c_source ) ;
+ for ( int d = 0 ; d < dim_target ; d++ )
+ c_source_v[d][e] = c_source[d] ;
}
}
public:
- /// this constructor takes tranformation functors for both single-value and
- /// vectorized coordinate transforms
+ /// the two-argument constructor takes tranformation functors for both unvectorized
+ /// and vectorized coordinate transformations:
transformation ( transform_type _tf , vtransform_type _vtf )
: tf ( _tf ) ,
@@ -705,23 +725,24 @@ public:
public :
- /// class transformation's operator() delegates to the functor passed in at construction
+ /// class transformation's operator() delegates to the functor passed in at construction.
+ /// it transforms atarget coordinate, c_target, into a source coordinate c_source
- void operator() ( const rc_in_type& c_in , rc_out_type& c_out )
+ void operator() ( const target_nd_rc_type& c_target , source_nd_rc_type& c_source )
{
- tf ( c_in , c_out ) ;
+ tf ( c_target , c_source ) ;
}
#ifdef USE_VC
/// if USE_VC is defined, we overload operator() for vecorized arguments
- void operator() ( const rc_v_in_type& c_in , rc_v_out_type& c_out )
+ void operator() ( const target_nd_rc_v& c_target , source_nd_rc_v& c_source )
{
if ( vtf )
- vtf ( c_in , c_out ) ;
+ vtf ( c_target , c_source ) ;
else
- broadcast ( c_in , c_out ) ;
+ broadcast ( c_target , c_source ) ;
}
#endif
@@ -758,11 +779,12 @@ struct coordinate_iterator_v
typedef _rc_v rc_v ;
enum { vsize = rc_v::Size } ;
- typedef vigra::TinyVector < rc_v , dimension > nd_rc_v ; // vectorized n-dimensional coordinate
+ typedef vigra::TinyVector < rc_v , dimension > nd_rc_v ;
- shape_type start ;
- shape_type end ;
- shape_type shape ;
+ const shape_type start ;
+ const shape_type end ;
+ const shape_type shape ;
+
nd_rc_v c ;
public:
@@ -820,6 +842,73 @@ public:
} ;
} ;
+// currently unused variant. seems to perform the same.
+
+// template < class _rc_v , int _dimension >
+// struct coordinate_iterator_v
+// {
+// enum { dimension = _dimension } ;
+// typedef vigra::TinyVector < int , dimension > shape_type ;
+//
+// typedef _rc_v rc_v ;
+// enum { vsize = rc_v::Size } ;
+// typedef vigra::TinyVector < rc_v , dimension > nd_rc_v ;
+//
+// const shape_type start ;
+// const shape_type end ;
+// const shape_type shape ;
+//
+// nd_rc_v c ;
+//
+// typedef vigra::MultiCoordinateIterator<dimension> iter_type ;
+//
+// const iter_type mci_start ;
+// const iter_type mci_end ;
+// iter_type mci ;
+//
+// public:
+//
+// coordinate_iterator_v ( const shape_type & _start ,
+// const shape_type & _end )
+// : mci_start ( _end - _start ) ,
+// mci_end ( iter_type ( _end - start ) . getEndIterator() ) ,
+// start ( _start ) ,
+// end ( _end ) ,
+// shape ( _end - _start )
+// {
+// mci = mci_start ;
+//
+// for ( int i = 0 ; i < vsize ; i++ )
+// {
+// shape_type index = *mci + start ;
+// ++mci ;
+// if ( mci >= mci_end )
+// mci = mci_start ;
+// for ( int d = 0 ; d < dimension ; d++ )
+// c[d][i] = index[d] ;
+// }
+// } ;
+//
+// const nd_rc_v& operator*() const
+// {
+// return c ;
+// }
+//
+// coordinate_iterator_v & operator++()
+// {
+// for ( int i = 0 ; i < vsize ; i++ )
+// {
+// shape_type index = *mci + start ;
+// ++mci ;
+// if ( mci >= mci_end )
+// mci = mci_start ;
+// for ( int d = 0 ; d < dimension ; d++ )
+// c[d][i] = index[d] ;
+// }
+// return *this ;
+// } ;
+// } ;
+
/// this function provides a generalized geometric transformation
/// In image transformations, often the source location at which interpolation
/// of the source image is required is defined by a transformation of the location
@@ -835,40 +924,42 @@ public:
/// also suitable for any type of interpolator satisfying the interface defined in
/// class interpolator in interpolator.h.
-template < class value_type , // like, float, double, complex<>, pixels, TinyVectors etc.
- class rc_type , // float or double, for coordinates
- class interpolator_type , // type satisfying the interface in class interpolator
- int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
- int dim_out > // number of dimensions of output array
-void st_tf_remap ( shape_range_type < dim_out > range ,
- const interpolator_type * const p_ev ,
- transformation < value_type , rc_type , dim_in , dim_out > tf ,
- MultiArrayView < dim_out , value_type > _output ,
+template < class interpolator_type , ///< type satisfying the interface in class interpolator
+ int dim_target , ///< dimension of incoming coordinates (== dim. of target array)
+ int dim_source > ///< dimension of outgoing coordinates (== dim. of coefficient array)
+void st_tf_remap ( shape_range_type < dim_target > range ,
+ const interpolator_type * const p_interpolator ,
+ transformation < typename interpolator_type::rc_type ,
+ dim_target ,
+ dim_source ,
+ interpolator_type::vsize > tf ,
+ vigra::MultiArrayView < dim_target ,
+ typename interpolator_type::value_type > * p_output ,
bool use_vc = true
)
{
typedef typename interpolator_type::ele_type ele_type ;
+ typedef typename interpolator_type::rc_type rc_type ;
- auto output = _output.subarray ( range[0] , range[1] ) ;
+ auto output = p_output->subarray ( range[0] , range[1] ) ;
auto offset = range[0] ;
#ifdef USE_VC
+ typedef typename interpolator_type::value_type value_type ;
typedef typename interpolator_type::ele_v ele_v ;
typedef typename interpolator_type::rc_v rc_v ;
typedef typename interpolator_type::mc_ele_v mc_ele_v ;
const int vsize = interpolator_type::vsize ;
- // incoming coordinates (into output, which has dim_out dims)
- vigra::TinyVector < rc_v , dim_out > c_in ;
+ // incoming coordinates (into output, which has dim_target dimensions)
+ vigra::TinyVector < rc_v , dim_target > c_target ;
+
+ // transformed coordinates (into input, which has dim_source dimensions)
+ vigra::TinyVector < rc_v , dim_source > c_source ;
- // transformed coordinates (into input, which has dim_in dims)
- vigra::TinyVector < rc_v , dim_in > c_out ;
- mc_ele_v result ; // result, as struct of vectors
-
#endif
-// typedef typename CoupledIteratorType < dim_out , value_type > :: type Iterator ;
auto target_it = createCoupledIterator ( output ) ;
int leftover = output.elementCount() ;
@@ -876,27 +967,40 @@ void st_tf_remap ( shape_range_type < dim_out > range ,
if ( use_vc )
{
- int aggregates = output.elementCount() / vsize ; // number of full vectors
- leftover = output.elementCount() - aggregates * vsize ; // any leftover single values
- value_type * destination = output.data() ;
- value_type target_buffer [ vsize ] ;
- // use utility class coordinate_iterator_v to produce vectorized coordinates:
- coordinate_iterator_v < rc_v , dim_out > civ ( offset , offset + output.shape() ) ;
+ int aggregates = output.elementCount() / vsize ; // number of full vectors
+ leftover = output.elementCount() - aggregates * vsize ; // any leftover single values
+ value_type * destination = output.data() ; // pointer to target data
+
+ // use utility class coordinate_iterator_v to produce vectorized integral coordinates:
+ coordinate_iterator_v < rc_v , dim_source >
+ civ ( offset , offset + output.shape() ) ;
- for ( int a = 0 ; a < aggregates ; a++ )
+ if ( output.isUnstrided() )
{
- tf ( *civ , c_out ) ; // perform the coordinate transformation
- ++civ ;
- if ( output.isUnstrided() )
+ for ( int a = 0 ; a < aggregates ; a++ )
{
- // finally, here we evaluate the spline
- p_ev->v_eval ( c_out , destination ) ;
+ c_target = *civ ; // for clarity: vectorized coordinate into target
+ tf ( c_target , c_source ) ; // perform the (reverse) coordinate transformation
+ ++civ ; // advance civ
+ // use the interpolator to yield a value at coordinate c_source
+ p_interpolator->v_eval ( c_source , destination ) ;
destination += vsize ;
}
- else
+ // update target_it which we haven't used
+ target_it += aggregates * vsize ;
+ }
+ else
+ {
+ // alternative evaluation if we can't write straight to destination:
+ // let the interpolator yield into target_buffer and then scatter target_buffer
+ // to successive target values, as directed by target_it
+ value_type target_buffer [ vsize ] ;
+ for ( int a = 0 ; a < aggregates ; a++ )
{
- // alternative evaluation if we can't write straight to destination
- p_ev->v_eval ( c_out , target_buffer ) ;
+ c_target = *civ ; // for clarity: vectorized coordinate into target
+ tf ( c_target , c_source ) ; // perform the (reverse) coordinate transformation
+ ++civ ; // advance civ
+ p_interpolator->v_eval ( c_source , target_buffer ) ;
for ( int e = 0 ; e < vsize ; e++ )
{
target_it.template get<1>() = target_buffer[e] ;
@@ -904,22 +1008,23 @@ void st_tf_remap ( shape_range_type < dim_out > range ,
}
}
}
- if ( output.isUnstrided() )
- target_it += aggregates * vsize ;
}
#endif // USE_VC
- vigra::TinyVector < rc_type , dim_out > cs_in ;
- vigra::TinyVector < rc_type , dim_in > cs_out ;
+ vigra::TinyVector < rc_type , dim_target > cs_target ;
+ vigra::TinyVector < rc_type , dim_source > cs_source ;
- // process leftovers, if any - if vc isn't used, this loop does all the processing
+ // process leftovers, if any - if vc wasn't used, this loop does all the processing
while ( leftover-- )
{
// process leftovers with single-value evaluation of transformed coordinate
- cs_in = target_it.template get<0>() + offset ;
- tf ( cs_in , cs_out ) ;
- p_ev->eval ( cs_out , target_it.template get<1>() ) ;
+ // get the coordinate into the target array:
+ cs_target = target_it.template get<0>() + offset ;
+ // transform it into a (source) coordinate (to feed the interpolator)
+ tf ( cs_target , cs_source ) ;
+ // interpolate at this coordinate, yielding into the target array as directed by target_it
+ p_interpolator->eval ( cs_source , target_it.template get<1>() ) ;
++target_it ;
}
}
@@ -928,22 +1033,24 @@ void st_tf_remap ( shape_range_type < dim_out > range ,
/// This is just as for the warp-array-based remap. The target array is split into
/// chunks, which are in turn processed in a thread each by the single-threaded routine.
-template < class value_type , // like, float, double, complex<>, pixels, TinyVectors etc.
- class rc_type , // float or double for coordinates
- class interpolator_type ,
- int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
- int dim_out > // number of dimensions of output array
+template < class interpolator_type , ///< type satisfying the interface in class interpolator
+ int dim_target , ///< dimension of incoming coordinates (== dim. of target array)
+ int dim_source > ///< dimension of outgoing coordinates (== dim. of coefficient array)
void tf_remap ( const interpolator_type & ev ,
- transformation < value_type , rc_type , dim_in , dim_out > tf ,
- MultiArrayView < dim_out , value_type > output ,
+ transformation < typename interpolator_type::rc_type ,
+ dim_target ,
+ dim_source ,
+ interpolator_type::vsize > tf ,
+ MultiArrayView < dim_target ,
+ typename interpolator_type::value_type > & output ,
bool use_vc = true ,
int nthreads = ncores
)
{
- shape_range_type < dim_out > range ( shape_type < dim_out > () , output.shape() ) ;
+ shape_range_type < dim_target > range ( shape_type < dim_target > () , output.shape() ) ;
- multithread ( st_tf_remap < value_type , rc_type , interpolator_type , dim_in , dim_out > ,
- nthreads , range , &ev , tf , output , use_vc ) ;
+ multithread ( st_tf_remap < interpolator_type , dim_target , dim_source > ,
+ nthreads * 8 , range , &ev , tf , &output , use_vc ) ;
}
/// this highest-level transform-based remap takes an input array and creates
@@ -951,27 +1058,32 @@ void tf_remap ( const interpolator_type & ev ,
/// takes a bspline as it's first parameter. Like with warp-array-based remap,
/// this is for on-the-fly remapping where the b-spline won't be reused.
-template < class value_type , // like, float, double, complex<>, pixels, TinyVectors etc.
- class rc_type , // usually float for coordinates
- int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
- int dim_out > // number of dimensions of output array
-void tf_remap ( MultiArrayView < dim_in , value_type > input ,
- transformation < value_type , rc_type , dim_in , dim_out > tf ,
- MultiArrayView < dim_out , value_type > output ,
- bcv_type<dim_in> bcv = bcv_type<dim_in> ( MIRROR ) ,
+template < typename input_value_type , ///< type of values in input
+ typename output_value_type , /// type of values in output
+ typename rc_type , /// elementary type of coordinates
+ int dim_target , ///< dimension of incoming coordinates (== dim. of target array)
+ int dim_source > ///< dimension of outgoing coordinates (== dim. of coefficient array)
+void tf_remap ( MultiArrayView < dim_source ,
+ input_value_type > input ,
+ transformation < rc_type ,
+ dim_target ,
+ dim_source ,
+ vector_traits < typename vigra::ExpandElementResult < input_value_type > :: type > :: vsize > tf ,
+ MultiArrayView < dim_target , output_value_type > & output ,
+ bcv_type < dim_source > bcv = bcv_type < dim_source > ( MIRROR ) ,
int degree = 3 ,
bool use_vc = true ,
int nthreads = ncores
)
{
// create the bspline object
- bspline < value_type , dim_in > bsp ( input.shape() , degree , bcv ) ;
+ bspline < input_value_type , dim_source > bsp ( input.shape() , degree , bcv ) ;
// copy in the data
bsp.prefilter ( use_vc , nthreads , input ) ;
// create an evaluator for this spline
- typedef evaluator < dim_in , value_type , rc_type , int > evaluator_type ;
+ typedef evaluator < dim_source , input_value_type , rc_type , int > evaluator_type ;
evaluator_type ev ( bsp ) ;
- tf_remap<value_type,rc_type,evaluator_type,dim_in,dim_out>
+ tf_remap < evaluator_type , dim_target , dim_source >
( ev , tf , output , use_vc , nthreads ) ;
}
@@ -987,7 +1099,10 @@ template < class value_type , // like, float, double, complex<>, pixels, Ti
int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
int dim_out > // number of dimensions of output array
void st_make_warp_array ( shape_range_type < dim_out > range ,
- transformation < value_type , rc_type , dim_in , dim_out > tf ,
+ transformation < rc_type ,
+ dim_in ,
+ dim_out ,
+ vector_traits < typename vigra::ExpandElementResult < value_type > :: type > :: vsize > tf ,
MultiArrayView < dim_out ,
vigra::TinyVector < rc_type , dim_in > >
_warp_array ,
@@ -1069,7 +1184,10 @@ template < class value_type , // like, float, double, complex<>, pixels, Ti
class rc_type , // float or double for coordinates
int dim_in , // number of dimensions of input array (and of 'warp' coordinates)
int dim_out > // number of dimensions of output array
-void make_warp_array ( transformation < value_type , rc_type , dim_in , dim_out > tf ,
+void make_warp_array ( transformation < rc_type ,
+ dim_in ,
+ dim_out ,
+ vector_traits < typename vigra::ExpandElementResult < value_type > :: type > :: vsize > tf ,
MultiArrayView < dim_out ,
vigra::TinyVector < rc_type , dim_in > >
& warp_array ,
@@ -1080,7 +1198,7 @@ void make_warp_array ( transformation < value_type , rc_type , dim_in , dim_out
shape_range_type < dim_out > range ( shape_type < dim_out > () , warp_array.shape() ) ;
multithread ( st_make_warp_array < value_type , rc_type , dim_in , dim_out > ,
- nthreads , range , tf , warp_array , use_vc ) ;
+ nthreads * 8 , range , tf , warp_array , use_vc ) ;
}
diff --git a/thread_pool.cc b/thread_pool.cc
new file mode 100644
index 0000000..3eb049b
--- /dev/null
+++ b/thread_pool.cc
@@ -0,0 +1,148 @@
+/************************************************************************/
+/* */
+/* vspline - a set of generic tools for creation and evaluation */
+/* of uniform b-splines */
+/* */
+/* Copyright 2015, 2016 by Kay F. Jahnke */
+/* */
+/* Permission is hereby granted, free of charge, to any person */
+/* obtaining a copy of this software and associated documentation */
+/* files (the "Software"), to deal in the Software without */
+/* restriction, including without limitation the rights to use, */
+/* copy, modify, merge, publish, distribute, sublicense, and/or */
+/* sell copies of the Software, and to permit persons to whom the */
+/* Software is furnished to do so, subject to the following */
+/* conditions: */
+/* */
+/* The above copyright notice and this permission notice shall be */
+/* included in all copies or substantial portions of the */
+/* Software. */
+/* */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
+/* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
+/* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
+/* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
+/* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
+/* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
+/* OTHER DEALINGS IN THE SOFTWARE. */
+/* */
+/************************************************************************/
+
+/// \file thread_pool.h
+///
+/// \brief provides a thread pool for vspline's multithread() routine
+///
+/// This code needs to be compiled separately. It is used by vspline's
+/// multithread() routines and has to be linked with any code using them.
+///
+/// a typical compile would go like this:
+///
+/// clang++ -c -std=c++11 -o thread_pool.o -O3 thread_pool.cc
+
+#include <thread>
+#include <mutex>
+#include <queue>
+#include <condition_variable>
+#include <iostream>
+
+namespace vspline
+{
+ // task queue, mutex and condition variable
+ // for interaction with the thread pool
+
+ std::mutex task_mutex ;
+ std::queue < std::function < void() > > task_queue ;
+ std::condition_variable task_cv ;
+
+ // used to switch off the worker threads at program termination
+
+ bool worker_stay_alive = true ;
+
+ // we need a thread pool of worker threads. These threads have a very
+ // simple cycle: They try and obtain a task (std::function<void()>).
+ // If there is one to be had, it is invoked, otherwise they wait on
+ // task_cv.
+
+ void worker_thread()
+ {
+ std::function < void() > task ;
+
+ while ( worker_stay_alive ) // if stay_alive is set to false, die
+ {
+ // under task_mutex, try and obtain a task
+ std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
+ int sz = task_queue.size() ;
+ if ( sz )
+ {
+ // there are tasks in the queue, take one
+ task = task_queue.front() ;
+ task_queue.pop() ;
+ }
+
+ if ( sz )
+ {
+ task_lock.unlock() ;
+ // got a task, perform it, then try for another one
+ task() ;
+ }
+ else if ( worker_stay_alive )
+ {
+ // no luck. wait.
+ task_cv.wait ( task_lock ) ; // simply wait, spurious alert is okay
+ }
+ else
+ {
+ task_lock.unlock() ;
+ break ;
+ }
+ // start next cycle, either after having completed a job
+ // or after having been woken by an alert
+ }
+ }
+
+ /// the thread pool itself is held in this variable
+
+ std::vector < std::thread * > thread_pool ;
+
+ /// the threads are started, stopped and destroyed by this object
+
+ struct manage_thread_pool
+ {
+ const int nthreads ;
+
+ manage_thread_pool ( int _nthreads = 2 * std::thread::hardware_concurrency() )
+ : nthreads ( _nthreads )
+ {
+ for ( int t = 0 ; t < nthreads ; t++ )
+ thread_pool.push_back ( new std::thread ( worker_thread ) ) ;
+ }
+
+ void kill_thread_pool()
+ {
+ worker_stay_alive = false ;
+ task_cv.notify_all() ;
+ for ( auto threadp : thread_pool )
+ {
+ threadp->join() ;
+ delete threadp ;
+ }
+ }
+
+ ~manage_thread_pool()
+ {
+ if ( worker_stay_alive )
+ kill_thread_pool() ;
+ }
+ } ;
+
+ // of which we have precisely one static instance here:
+
+ manage_thread_pool _thread_pool_manager ;
+
+ void kill_thread_pool()
+ {
+ _thread_pool_manager.kill_thread_pool() ;
+ }
+} ; // end of namespace vspline
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/vspline.git
More information about the debian-science-commits
mailing list