[vspline] 22/72: consolidated opertion with thread pool, now header-only.

Kay F. Jahnke kfj-guest at moszumanska.debian.org
Sun Jul 2 09:02:39 UTC 2017


This is an automated email from the git hooks/post-receive script.

kfj-guest pushed a commit to branch master
in repository vspline.

commit 68692ee107c33afe2c5d407e5d2a290e7cddc4f1
Author: Kay F. Jahnke <kfjahnke at gmail.com>
Date:   Thu Dec 22 10:24:26 2016 +0100

    consolidated opertion with thread pool, now header-only.
---
 multithread.h                   | 132 +++++++++++++++++++++-------------------
 thread_pool.cc => thread_pool.h | 116 ++++++++++++++++++-----------------
 2 files changed, 131 insertions(+), 117 deletions(-)

diff --git a/multithread.h b/multithread.h
index 446cd13..0301db7 100644
--- a/multithread.h
+++ b/multithread.h
@@ -93,6 +93,7 @@
 #include <queue>
 #include <condition_variable>
 #include <vspline/common.h>
+#include <vspline/thread_pool.h>
 
 namespace vspline
 {
@@ -174,6 +175,11 @@ using shape_partition_type = partition_type < shape_range_type < dimension > > ;
 /// 'forbid' prevents that particular axis from being split. The split may succeed
 /// producing n or less ranges, and if 'shape' can't be split at all, a single range
 /// encompassing the whole of 'shape' will be returned in the result vector.
+/// TODO: with some shapes, splitting will result in subranges which aren't optimal
+/// for b-spline prefiltering (these are fastest with extents which are a multiple of
+/// the simdized data type), so we might add code to preferably use cut locations
+/// coinciding with those extents. And with small extents being split, the result
+/// becomes very inefficient for filtering.
 
 template < int dim >
 struct shape_splitter
@@ -183,8 +189,8 @@ struct shape_splitter
   typedef partition_type < range_t > partition_t ;
   
   static partition_t part ( const shape_t & shape , ///< shape to be split n-ways
-                               int n ,                    ///< intended number of chunks
-                               int forbid = -1 )          ///< axis which shouldn't be split
+                            int n ,                 ///< intended number of chunks
+                            int forbid = -1 )       ///< axis which shouldn't be split
   {
     partition_t res ;
 
@@ -258,7 +264,8 @@ struct shape_splitter
 /// of desired threads instead of a ready-made partitioning.
 
 template < int d >
-partition_type < shape_range_type<d> > partition ( shape_range_type<d> range , int nparts )
+partition_type < shape_range_type<d> >
+partition ( shape_range_type<d> range , int nparts )
 {
   if ( range[0].any() )
   {
@@ -312,34 +319,36 @@ struct joint_task
   std::mutex finished_mutex ; // to guard 'finished'
   bool finished ;             // is set to true as predicate to test by caller
   std::condition_variable finished_cv ; // the caller waits for this cv  
-  
+
+  thread_pool & tp ;
   std::vector < std::function < void() > > & taskv ; // tasks to perform
   
-  joint_task ( std::vector < std::function < void() > > & _taskv )
-  : taskv (_taskv ) ,        // individual tasks to perform
+  joint_task ( thread_pool & _tp ,
+               std::vector < std::function < void() > > & _taskv )
+  : tp ( _tp ) ,             // thread pool to use
+    taskv (_taskv ) ,        // individual tasks to perform
     crew ( _taskv.size() ) , // number of tasks
     done ( 0 ) ,             // number of done tasks
     finished ( false )       // not yet finished with all tasks
   {
     {
       // under task_mutex, fill tasks into task queue
-      std::lock_guard<std::mutex> lk ( task_mutex ) ;
+      std::lock_guard<std::mutex> lk ( tp.task_mutex ) ;
       for ( int i = 0 ; i < crew ; i++ )
       {
         // bind both the ith task and a pointer to this object to action_wrapper,
         // resulting in a functional which is then pushed to the task queue
         std::function < void() >
           action = std::bind ( action_wrapper , taskv[i] , this ) ;
-        task_queue.push ( action ) ;
+        tp.task_queue.push ( action ) ;
       }
     }
-    task_cv.notify_all() ; // alert all worker threads
+    tp.task_cv.notify_all() ; // alert all worker threads
     {
       // now wait for the last task to complete
-      std::unique_lock<std::mutex> lk ( task_mutex ) ;
+      std::unique_lock<std::mutex> lk ( tp.task_mutex ) ;
       // the predicate done==crew rejects spurious wakes
-      task_cv.wait ( lk , [&] {    return done == crew
-                                || worker_stay_alive == false ; } ) ;
+      tp.task_cv.wait ( lk , [&] { return done == crew ; } ) ;
     }
     // we're sure now that all tasks are done, so we let the caller know
     finished = true ;
@@ -365,15 +374,14 @@ void action_wrapper ( std::function < void() > action ,
   // number of actions originating from p_coordinator
   bool all_done ;
   {
-    std::lock_guard<std::mutex> lk ( task_mutex ) ;
+    std::lock_guard<std::mutex> lk ( p_coordinator->tp.task_mutex ) ;
     all_done = ( ++ (p_coordinator->done) == p_coordinator->crew ) ;
   }
   if ( all_done )
   {
     // this was the last action originating from p_coordinator
     // notify the coordinator that the joint task is now complete
-//     p_coordinator->complete_cv.notify_one() ;
-    task_cv.notify_all() ;
+    p_coordinator->tp.task_cv.notify_all() ;
   }
 }
 
@@ -399,6 +407,8 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
                   partition_type < range_type > partitioning ,
                   Types ...args )
 {
+  static thread_pool tp ; // keep a thread pool only for multithread()
+
   // get the number of ranges in the partitioning
 
   int nparts = partitioning.size() ;
@@ -414,12 +424,13 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
     taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
   }
 
-  // create the joint_task object, passing the vector of tasks.
+  // create the joint_task object, passing the thread pool and
+  // the vector of tasks.
   // The joint_task object launches the tasks and makes sure they
   // all complete before notifying on its' finished_cv condition
   // variable
 
-  joint_task jt ( taskv ) ;
+  joint_task jt ( tp , taskv ) ;
   
   {
     // wait until 'jt.finished' is true
@@ -428,6 +439,7 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
   }
 
   // now the joint task is complete
+
   return nparts ;
 }
 
@@ -446,59 +458,57 @@ int multithread ( void (*pfunc) ( range_type , Types... ) ,
   // partition the range and update nparts in case the partitioning produced
   // less ranges than we initially asked for. Rely on the presence of a function
   // partition() which can process range_type:
-  // currently ups the number of partitions to 8-fold, which seems to be a
-  // good overall compromise.
 
-  partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
+  partition_type < range_type > partitioning = partition ( range , nparts ) ;
   nparts = partitioning.size() ;
   
   return multithread ( pfunc , partitioning , args... ) ;
 }
 
-/// multithread_async works like multithread, but instead of waiting
-/// for the joint task to complete, it immediately returns the joint_task
-/// object (beware, this has to be deleted in the calling code)
-
-template < class range_type , class ...Types >
-joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
-                                 partition_type < range_type > partitioning ,
-                                 Types ...args )
-{
-  int nparts = partitioning.size() ;
-  std::vector < std::function < void() > > taskv ( nparts ) ;
-  for ( int s = 0 ; s < nparts ; s++ )
-  {
-    taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
-  }
-  // here we don't wait for the tasks to complete but simply return
-  // a pointer to the joint task object, passing control to the caller.
-  return new joint_task ( taskv ) ;
-}
-
-/// overload of multithread_async with automatic range partitioning
-
-template < class range_type , class ...Types >
-joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
-                                 int nparts ,
-                                 range_type range ,
-                                 Types ...args )
-{
-  partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
-  nparts = partitioning.size() ;
-  
-  return multithread_async ( pfunc , partitioning , args... ) ;
-}
+// /// multithread_async works like multithread, but instead of waiting
+// /// for the joint task to complete, it immediately returns the joint_task
+// /// object (beware, this has to be deleted in the calling code)
+// 
+// template < class range_type , class ...Types >
+// joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+//                                  partition_type < range_type > partitioning ,
+//                                  Types ...args )
+// {
+//   int nparts = partitioning.size() ;
+//   std::vector < std::function < void() > > taskv ( nparts ) ;
+//   for ( int s = 0 ; s < nparts ; s++ )
+//   {
+//     taskv[s] = std::bind ( pfunc , partitioning[s] , args... ) ;
+//   }
+//   // here we don't wait for the tasks to complete but simply return
+//   // a pointer to the joint task object, passing control to the caller.
+//   return new joint_task ( taskv ) ;
+// }
+// 
+// /// overload of multithread_async with automatic range partitioning
+// 
+// template < class range_type , class ...Types >
+// joint_task * multithread_async ( void (*pfunc) ( range_type , Types... ) ,
+//                                  int nparts ,
+//                                  range_type range ,
+//                                  Types ...args )
+// {
+//   partition_type < range_type > partitioning = partition ( range , nparts ) ; // * 8 ) ;
+//   nparts = partitioning.size() ;
+//   
+//   return multithread_async ( pfunc , partitioning , args... ) ;
+// }
 
 /// simple routine to run a single task asynchronously
 
-template < class ...Types >
-void run_async ( void (*pfunc) ( Types... ) ,
-                 Types ...args )
-{
-  auto task = std::bind ( pfunc , args... ) ;
-  std::lock_guard<std::mutex> lk ( task_mutex ) ;
-  task_queue.push ( task ) ;
-}
+// template < class ...Types >
+// void run_async ( void (*pfunc) ( Types... ) ,
+//                  Types ...args )
+// {
+//   auto task = std::bind ( pfunc , args... ) ;
+//   std::lock_guard<std::mutex> lk ( task_mutex ) ;
+//   task_queue.push ( task ) ;
+// }
 
 
 // some apply type routines, preliminary
diff --git a/thread_pool.cc b/thread_pool.h
similarity index 70%
rename from thread_pool.cc
rename to thread_pool.h
index 3eb049b..a917def 100644
--- a/thread_pool.cc
+++ b/thread_pool.h
@@ -33,12 +33,6 @@
 ///
 /// \brief provides a thread pool for vspline's multithread() routine
 ///
-/// This code needs to be compiled separately. It is used by vspline's
-/// multithread() routines and has to be linked with any code using them.
-///
-/// a typical compile would go like this:
-///
-/// clang++ -c -std=c++11 -o thread_pool.o -O3 thread_pool.cc
 
 #include <thread>
 #include <mutex>
@@ -48,30 +42,50 @@
 
 namespace vspline
 {
-  // task queue, mutex and condition variable
+  
+/// code to run a worker thread
+/// We use a thread pool of worker threads. These threads have a very 
+/// simple cycle: They try and obtain a task (std::function<void()>). 
+/// If there is one to be had, it is invoked, otherwise they wait on
+/// task_cv.
+  
+class thread_pool
+{
+  // used to switch off the worker threads at program termination
+
+  bool stay_alive = true ;
+
+  /// the thread pool itself is held in this variable
+
+  std::vector < std::thread * > pool ;
+  
+  /// the threads are started, stopped and destroyed by this object
+
+  int nthreads ;
+
+public:
+
+  // mutex and condition variable
   // for interaction with the thread pool
 
   std::mutex task_mutex ;
-  std::queue < std::function < void() > > task_queue ;
   std::condition_variable task_cv ;
+  std::queue < std::function < void() > > task_queue ;
 
-  // used to switch off the worker threads at program termination
-
-  bool worker_stay_alive = true ;
-
-  // we need a thread pool of worker threads. These threads have a very 
-  // simple cycle: They try and obtain a task (std::function<void()>). 
-  // If there is one to be had, it is invoked, otherwise they wait on
-  // task_cv.
+private:
   
   void worker_thread()
   {
     std::function < void() > task ;
 
-    while ( worker_stay_alive ) // if stay_alive is set to false, die
+    while ( true )
     {
       // under task_mutex, try and obtain a task
       std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
+
+      if ( ! stay_alive )
+        break ;
+
       int sz = task_queue.size() ;
       if ( sz )
       {
@@ -86,63 +100,53 @@ namespace vspline
         // got a task, perform it, then try for another one
         task() ;
       }
-      else if ( worker_stay_alive )
+      else
       {
         // no luck. wait.
         task_cv.wait ( task_lock ) ; // simply wait, spurious alert is okay
       }
-      else
-      {
-        task_lock.unlock() ;
-        break ;
-      }
       // start next cycle, either after having completed a job
       // or after having been woken by an alert
     }
   }
 
-  /// the thread pool itself is held in this variable
-
-  std::vector < std::thread * > thread_pool ;
+public:
   
-  /// the threads are started, stopped and destroyed by this object
-
-  struct manage_thread_pool
+  thread_pool ( int _nthreads = 2 * std::thread::hardware_concurrency() )
+  : nthreads ( _nthreads )
   {
-    const int nthreads ;
-
-    manage_thread_pool ( int _nthreads = 2 * std::thread::hardware_concurrency() )
-    : nthreads ( _nthreads )
-    {
-      for ( int t = 0 ; t < nthreads ; t++ )
-      thread_pool.push_back ( new std::thread ( worker_thread ) ) ;
-    }
+    std::function < void() > wf = std::bind ( &thread_pool::worker_thread , this ) ; 
+    for ( int t = 0 ; t < nthreads ; t++ )
+    pool.push_back ( new std::thread ( wf ) ) ;
+  }
     
-    void kill_thread_pool()
+  ~thread_pool()
+  {
+    // under task_mutex, set stay_alive to false
+    
+    std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
+    stay_alive = false ;
+    task_lock.unlock() ;
+    
+    // wake all inactive worker threads,
+    // join all worker threads once they are finished
+
+    task_cv.notify_all() ;
+
+    for ( auto threadp : pool )
     {
-      worker_stay_alive = false ;
-      task_cv.notify_all() ;
-      for ( auto threadp : thread_pool )
-      {
-        threadp->join() ;
-        delete threadp ;
-      }
+      threadp->join() ;
     }
     
-    ~manage_thread_pool()
+    // once all are joined, delete their std::thread object
+
+    for ( auto threadp : pool )
     {
-      if ( worker_stay_alive )
-        kill_thread_pool() ;
+      delete threadp ;
     }
-  } ;
-  
-  // of which we have precisely one static instance here:
+  }
+} ;
 
-  manage_thread_pool _thread_pool_manager ;
 
-  void kill_thread_pool()
-  {
-    _thread_pool_manager.kill_thread_pool() ;
-  }
 } ; // end of namespace vspline
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/vspline.git



More information about the debian-science-commits mailing list