[tbb] 07/64: Imported Upstream version 2.0r020

Graham Inggs ginggs at moszumanska.debian.org
Mon Jul 3 12:27:36 UTC 2017


This is an automated email from the git hooks/post-receive script.

ginggs pushed a commit to branch master
in repository tbb.

commit da7a2b13ed8b032c52eb4045bac8ef8fc46859ad
Author: Graham Inggs <ginggs at debian.org>
Date:   Mon Jul 3 14:13:25 2017 +0200

    Imported Upstream version 2.0r020
---
 build/windows.icl.inc                     |   5 +-
 examples/test_all/fibonacci/Fibonacci.cpp |  11 +-
 include/tbb/cache_aligned_allocator.h     |   5 +-
 include/tbb/machine/linux_common.h        |  95 ++++++
 include/tbb/machine/linux_em64t.h         |   5 +-
 include/tbb/machine/linux_ia32.h          |   5 +-
 include/tbb/machine/linux_itanium.h       |   5 +-
 include/tbb/pipeline.h                    |  12 +-
 include/tbb/scalable_allocator.h          |   2 +-
 include/tbb/tbb_machine.h                 |  15 +
 src/tbb/concurrent_queue.cpp              | 539 +++++++++++++++++++++++++++++-
 src/tbb/concurrent_vector.cpp             |   1 +
 src/tbb/gate.h                            |  39 ++-
 src/tbb/pipeline.cpp                      |  84 ++---
 src/tbb/queuing_mutex.cpp                 |   3 -
 src/tbb/queuing_rw_mutex.cpp              |   3 -
 src/tbb/task.cpp                          | 170 ++++++----
 src/tbbmalloc/MemoryAllocator.cpp         |   5 +-
 src/test/test_allocator.h                 |   6 +
 src/test/test_assembly.cpp                |  10 +
 src/test/test_malloc_pure_c.c             |  11 +-
 src/test/test_pipeline.cpp                |  27 +-
 src/test/test_yield.cpp                   |   2 +-
 23 files changed, 905 insertions(+), 155 deletions(-)

diff --git a/build/windows.icl.inc b/build/windows.icl.inc
index 6eb1f76..96c762f 100644
--- a/build/windows.icl.inc
+++ b/build/windows.icl.inc
@@ -67,7 +67,7 @@ INCLUDE_KEY = /I
 DEFINE_KEY = /D
 OUTPUT_KEY = /Fe
 OUTPUTOBJ_KEY = /Fo
-WARNING_KEY = 
+WARNING_KEY =
 DYLIB_KEY = /DLL
 
 ifeq (em64t,$(arch))
@@ -91,6 +91,9 @@ ifeq ($(VCCOMPAT_FLAG),)
         VCCOMPAT_FLAG := $(if $(findstring vc8, $(VCVERSION)),/Qvc8)
 endif
 ifeq ($(VCCOMPAT_FLAG),)
+        VCCOMPAT_FLAG := $(if $(findstring vc9, $(VCVERSION)),/Qvc9)
+endif
+ifeq ($(VCCOMPAT_FLAG),)
         $(error VC version not detected correctly: $(VCVERSION) )
 endif
 export VCCOMPAT_FLAG
diff --git a/examples/test_all/fibonacci/Fibonacci.cpp b/examples/test_all/fibonacci/Fibonacci.cpp
index 5d50020..8db4453 100644
--- a/examples/test_all/fibonacci/Fibonacci.cpp
+++ b/examples/test_all/fibonacci/Fibonacci.cpp
@@ -333,7 +333,7 @@ value ParallelQueueFib(int n)
 
 //! filter to fills queue
 class InputFilter: public filter {
-    atomic<int> N; //< index of Fibonacci number
+    atomic<int> N; //< index of Fibonacci number minus 1
 public:
     concurrent_queue<Matrix2x2> Queue;
     //! fill filter arguments
@@ -344,7 +344,7 @@ public:
         int n = --N;
         if(n <= 0) return 0;
         Queue.push( Matrix1110 );
-        return n == 1? 0 : &Queue; // one less multiplications
+        return &Queue;
     }
 };
 //! filter to process queue
@@ -365,7 +365,7 @@ public:
 //! Root function
 value ParallelPipeFib(int n)
 {
-    InputFilter input( n );
+    InputFilter input( n-1 );
     MultiplyFilter process;
     // Create the pipeline
     pipeline pipeline;
@@ -373,6 +373,7 @@ value ParallelPipeFib(int n)
     pipeline.add_filter( input ); // first
     pipeline.add_filter( process ); // second
 
+    input.Queue.push( Matrix1110 );
     // Run the pipeline
     pipeline.run( n ); // must be larger then max threads number
     pipeline.clear(); // do not forget clear the pipeline
@@ -460,7 +461,7 @@ struct FibTask: public task {
     //! Execute task
     /*override*/ task* execute() {
         // Using Lucas' formula here
-        if( second_phase ) { // childs finished
+        if( second_phase ) { // children finished
             sum = n&1 ? x*x + y*y : x*x - y*y;
             return NULL;
         }
@@ -468,7 +469,7 @@ struct FibTask: public task {
             sum = n!=0;
             return NULL;
         } else {
-            recycle_as_continuation();  // repeat this task when childs finish
+            recycle_as_continuation();  // repeat this task when children finish
             second_phase = true; // mark second phase
             FibTask& a = *new( allocate_child() ) FibTask( n/2 + 1, x );
             FibTask& b = *new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y );
diff --git a/include/tbb/cache_aligned_allocator.h b/include/tbb/cache_aligned_allocator.h
index 08beaa4..93a01c7 100644
--- a/include/tbb/cache_aligned_allocator.h
+++ b/include/tbb/cache_aligned_allocator.h
@@ -92,8 +92,9 @@ public:
     const_pointer address(const_reference x) const {return &x;}
     
     //! Allocate space for n objects, starting on a cache/sector line.
-    pointer allocate( size_type n, void* hint=0 ) {
-        return pointer(internal::NFS_Allocate( n, sizeof(T), hint ));
+    pointer allocate( size_type n, const void* hint=0 ) {
+        // The "hint" argument is always ignored in NFS_Allocate thus const_cast shouldn't hurt
+        return pointer(internal::NFS_Allocate( n, sizeof(T), const_cast<void*>(hint) ));
     }
 
     //! Free block of memory that starts on a cache line
diff --git a/include/tbb/machine/linux_common.h b/include/tbb/machine/linux_common.h
new file mode 100644
index 0000000..c2b824e
--- /dev/null
+++ b/include/tbb/machine/linux_common.h
@@ -0,0 +1,95 @@
+/*
+    Copyright 2005-2008 Intel Corporation.  All Rights Reserved.
+
+    This file is part of Threading Building Blocks.
+
+    Threading Building Blocks is free software; you can redistribute it
+    and/or modify it under the terms of the GNU General Public License
+    version 2 as published by the Free Software Foundation.
+
+    Threading Building Blocks is distributed in the hope that it will be
+    useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with Threading Building Blocks; if not, write to the Free Software
+    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+
+    As a special exception, you may use this file as part of a free software
+    library without restriction.  Specifically, if other files instantiate
+    templates or use macros or inline functions from this file, or you compile
+    this file and link it with other files to produce an executable, this
+    file does not by itself cause the resulting executable to be covered by
+    the GNU General Public License.  This exception does not however
+    invalidate any other reasons why the executable file might be covered by
+    the GNU General Public License.
+*/
+
+#ifndef __TBB_machine_H
+#error Do not include this file directly; include tbb_machine.h instead
+#endif
+
+#include <stdint.h>
+#include <unistd.h>
+#include <sched.h>
+
+// Definition of __TBB_Yield()
+#define __TBB_Yield()  sched_yield()
+
+/* Futex definitions */
+#include <sys/syscall.h>
+
+#if defined(SYS_futex)
+
+#define __TBB_USE_FUTEX 1
+#include <limits.h>
+#include <errno.h>
+// Unfortunately, some versions of Linux do not have a header that defines FUTEX_WAIT and FUTEX_WAKE.
+
+#ifdef FUTEX_WAIT
+#define __TBB_FUTEX_WAIT FUTEX_WAIT
+#else
+#define __TBB_FUTEX_WAIT 0
+#endif
+
+#ifdef FUTEX_WAKE
+#define __TBB_FUTEX_WAKE FUTEX_WAKE
+#else
+#define __TBB_FUTEX_WAKE 1
+#endif
+
+#ifndef __TBB_ASSERT
+#error machine specific headers must be included after tbb_stddef.h
+#endif
+
+namespace tbb {
+
+namespace internal {
+
+inline int futex_wait( void *futex, int comparand ) {
+    int r = ::syscall( SYS_futex,futex,__TBB_FUTEX_WAIT,comparand,NULL,NULL,0 );
+#if TBB_DO_ASSERT
+    int e = errno;
+    __TBB_ASSERT( r==0||r==EWOULDBLOCK||(r==-1&&(e==EAGAIN||e==EINTR)), "futex_wait failed." );
+#endif /* TBB_DO_ASSERT */
+    return r;
+}
+
+inline int futex_wakeup_one( void *futex ) {
+    int r = ::syscall( SYS_futex,futex,__TBB_FUTEX_WAKE,1,NULL,NULL,0 );
+    __TBB_ASSERT( r==0||r==1, "futex_wakeup_one: more than one thread woken up?" );
+    return r;
+}
+
+inline int futex_wakeup_all( void *futex ) {
+    int r = ::syscall( SYS_futex,futex,__TBB_FUTEX_WAKE,INT_MAX,NULL,NULL,0 );
+    __TBB_ASSERT( r>=0, "futex_wakeup_all: error in waking up threads" );
+    return r;
+}
+
+} /* namespace internal */
+
+} /* namespace tbb */
+
+#endif /* SYS_futex */
diff --git a/include/tbb/machine/linux_em64t.h b/include/tbb/machine/linux_em64t.h
index 753e7f2..9e2d9c7 100644
--- a/include/tbb/machine/linux_em64t.h
+++ b/include/tbb/machine/linux_em64t.h
@@ -30,9 +30,7 @@
 #error Do not include this file directly; include tbb_machine.h instead
 #endif
 
-#include <stdint.h>
-#include <unistd.h>
-#include <sched.h>
+#include "linux_common.h"
 
 #define __TBB_WORDSIZE 8
 #define __TBB_BIG_ENDIAN 0
@@ -135,7 +133,6 @@ static inline void __TBB_machine_pause( int32_t delay ) {
 #define __TBB_AtomicOR(P,V) __TBB_machine_or(P,V)
 
 // Definition of other functions
-#define __TBB_Yield()  sched_yield()
 #define __TBB_Pause(V) __TBB_machine_pause(V)
 #define __TBB_Log2(V)    __TBB_machine_lg(V)
 
diff --git a/include/tbb/machine/linux_ia32.h b/include/tbb/machine/linux_ia32.h
index 4c17d9a..61d7c68 100644
--- a/include/tbb/machine/linux_ia32.h
+++ b/include/tbb/machine/linux_ia32.h
@@ -30,9 +30,7 @@
 #error Do not include this file directly; include tbb_machine.h instead
 #endif
 
-#include <stdint.h>
-#include <unistd.h>
-#include <sched.h>
+#include "linux_common.h"
 
 #define __TBB_WORDSIZE 4
 #define __TBB_BIG_ENDIAN 0
@@ -194,7 +192,6 @@ inline void __TBB_machine_store_with_release(volatile T &location, V value) {
 #undef __TBB_FetchAndStore8
 
 // Definition of other functions
-#define __TBB_Yield()  sched_yield()
 #define __TBB_Pause(V) __TBB_machine_pause(V)
 #define __TBB_Log2(V)    __TBB_machine_lg(V)
 
diff --git a/include/tbb/machine/linux_itanium.h b/include/tbb/machine/linux_itanium.h
index 1a50257..6757921 100644
--- a/include/tbb/machine/linux_itanium.h
+++ b/include/tbb/machine/linux_itanium.h
@@ -30,9 +30,7 @@
 #error Do not include this file directly; include tbb_machine.h instead
 #endif
 
-#include <stdint.h>
-#include <unistd.h>
-#include <sched.h>
+#include "linux_common.h"
 #include <ia64intrin.h>
 
 #define __TBB_WORDSIZE 8
@@ -173,7 +171,6 @@ inline T __TBB_load_with_acquire_via_explicit_fence(const volatile T& location)
 #define __TBB_LockByte(P)    __TBB_machine_lockbyte(P)
 
 // Definition of other utility functions
-#define __TBB_Yield()  sched_yield()
 #define __TBB_Pause(V) __TBB_machine_pause(V)
 #define __TBB_Log2(V)  __TBB_machine_lg(V)
 
diff --git a/include/tbb/pipeline.h b/include/tbb/pipeline.h
index ce17a38..41f5f74 100644
--- a/include/tbb/pipeline.h
+++ b/include/tbb/pipeline.h
@@ -29,8 +29,8 @@
 #ifndef __TBB_pipeline_H 
 #define __TBB_pipeline_H 
 
+#include "atomic.h"
 #include "task.h"
-#include "spin_mutex.h"
 #include <cstddef>
 
 namespace tbb {
@@ -42,6 +42,7 @@ class filter;
 namespace internal {
 
 typedef unsigned long Token;
+typedef long tokendiff_t;
 class stage_task;
 class ordered_buffer;
 
@@ -117,17 +118,16 @@ private:
     //! task who's reference count is used to determine when all stages are done.
     empty_task* end_counter;
 
-    //! Mutex protecting token_counter and end_of_input.
-    spin_mutex input_mutex;
+    //! Number of idle tokens waiting for input stage.
+    atomic<internal::Token> input_tokens;
 
     //! Number of tokens created so far.
     internal::Token token_counter;
 
     //! False until fetch_input returns NULL.
     bool end_of_input;
-    
-    //! Attempt to fetch a new input item and put it in the pipeline.
-    /** "self" is used only for sake of providing the contextual "this" for task::allocate_child_of. */
+
+    //! Not used, but retained to satisfy old export files.
     void inject_token( task& self );
 };
 
diff --git a/include/tbb/scalable_allocator.h b/include/tbb/scalable_allocator.h
index bac5fda..47e72a1 100644
--- a/include/tbb/scalable_allocator.h
+++ b/include/tbb/scalable_allocator.h
@@ -96,7 +96,7 @@ public:
     const_pointer address(const_reference x) const {return &x;}
 
     //! Allocate space for n objects, starting on a cache/sector line.
-    pointer allocate( size_type n, void* /*hint*/ =0 ) {
+    pointer allocate( size_type n, const void* /*hint*/ =0 ) {
         return static_cast<pointer>( scalable_malloc( n * sizeof(value_type) ) );
     }
 
diff --git a/include/tbb/tbb_machine.h b/include/tbb/tbb_machine.h
index 7e7fae6..d034842 100644
--- a/include/tbb/tbb_machine.h
+++ b/include/tbb/tbb_machine.h
@@ -134,6 +134,21 @@ public:
             __TBB_Yield();
         }
     }
+
+    // pause for a few times and then return false immediately.
+    bool bounded_pause() {
+        if( count<=LOOPS_BEFORE_YIELD ) {
+            __TBB_Pause(count);
+            // Pause twice as long the next time.
+            count*=2;
+            return true;
+        } else {
+            __TBB_Pause(8);
+            count+=8;
+            return (count<64);
+        }
+    }
+
     void reset() {
         count = 1;
     }
diff --git a/src/tbb/concurrent_queue.cpp b/src/tbb/concurrent_queue.cpp
index c1257b1..da0116c 100644
--- a/src/tbb/concurrent_queue.cpp
+++ b/src/tbb/concurrent_queue.cpp
@@ -26,16 +26,26 @@
     the GNU General Public License.
 */
 
-#include "tbb/concurrent_queue.h"
+#include <cstring>
+#include <cstdio>
+#include "tbb/tbb_machine.h"
 #include "tbb/cache_aligned_allocator.h"
 #include "tbb/spin_mutex.h"
 #include "tbb/atomic.h"
 #include "tbb_misc.h"
-#include <cstring>
-#include <stdio.h>
+#include "tbb/concurrent_queue.h"
+#include "itt_notify.h"
+
+#define __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE 1
 
 #define RECORD_EVENTS 0
 
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+#if !_WIN32&&!_WIN64&&!__TBB_USE_FUTEX
+#define __TBB_USE_PTHREAD_CONDWAIT 1
+#include <pthread.h>
+#endif
+#endif
 
 namespace tbb {
 
@@ -106,6 +116,19 @@ class concurrent_queue_rep {
 public:
     typedef size_t ticket;
 
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+
+#if _WIN32||_WIN64
+    typedef HANDLE cq_cond_t;
+#elif __TBB_USE_FUTEX
+    typedef int cq_cond_t;
+#else /* including MacOS */
+    typedef pthread_cond_t  cq_cond_t;
+    typedef pthread_mutex_t cq_mutex_t;
+#endif
+
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
+
 private:
     friend struct micro_queue;
 
@@ -121,13 +144,52 @@ public:
         return k*phi%n_queue;
     }
 
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+
+    atomic<ticket> head_counter;
+#if !__TBB_USE_PTHREAD_CONDWAIT
+    atomic<size_t> n_waiting_consumers;
+    cq_cond_t  var_wait_for_items;
+    char pad1[NFS_MaxLineSize-sizeof(size_t)-sizeof(atomic<size_t>)-sizeof(cq_cond_t)];
+#else /* including MacOS */
+    size_t n_waiting_consumers;
+    cq_cond_t  var_wait_for_items;
+    cq_mutex_t mtx_items_avail;
+    char pad1[NFS_MaxLineSize-sizeof(size_t)-sizeof(atomic<size_t>)-sizeof(cq_cond_t)-sizeof(cq_mutex_t)];
+#endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+
+    atomic<ticket> tail_counter;
+#if !__TBB_USE_PTHREAD_CONDWAIT
+    atomic<size_t> n_waiting_producers;
+    cq_cond_t  var_wait_for_slots;
+    char pad2[NFS_MaxLineSize-sizeof(size_t)-sizeof(atomic<size_t>)-sizeof(cq_cond_t)];
+#else /* including MacOS */
+    size_t n_waiting_producers;
+    cq_cond_t  var_wait_for_slots;
+    cq_mutex_t mtx_slots_avail;
+    char pad2[NFS_MaxLineSize-sizeof(ticket)-sizeof(atomic<size_t>)-sizeof(cq_cond_t)-sizeof(cq_mutex_t)];
+#endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+
+#else /* !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
+
     atomic<ticket> head_counter;
-    char pad1[NFS_MaxLineSize-sizeof(size_t)];
+    char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
 
     atomic<ticket> tail_counter;
-    char pad2[NFS_MaxLineSize-sizeof(ticket)];
+    char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
+
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
+
     micro_queue array[n_queue];    
 
+#if !__TBB_USE_PTHREAD_CONDWAIT
+    ptrdiff_t size_to_use;
+    atomic<size_t> nthreads_in_transition;
+    ptrdiff_t nthreads_to_read_size;
+#define __TBB_INVALID_QSIZE (concurrent_queue_rep::infinite_capacity)
+    static const ptrdiff_t thread_woken = -1;
+#endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+
     micro_queue& choose( ticket k ) {
         // The formula here approximates LRU in a cache-oblivious way.
         return array[index(k)];
@@ -169,7 +231,9 @@ void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base
         } else {
             p = tail_page;
         }
+        ITT_NOTIFY( sync_acquired, p );
         base.copy_item( *p, index, item );
+        ITT_NOTIFY( sync_releasing, p );
         // If no exception was thrown, mark item as present.
         p->mask |= uintptr(1)<<index;
     } 
@@ -187,7 +251,15 @@ bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
         pop_finalizer finalizer( *this, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? &p : NULL ); 
         if( p.mask & uintptr(1)<<index ) {
             success = true;
+#if DO_ITT_NOTIFY
+            if( ((intptr_t)dst&1) ) {
+                dst = (void*) ((intptr_t)dst&~1);
+                ITT_NOTIFY( sync_acquired, dst );
+            }
+#endif
+            ITT_NOTIFY( sync_acquired, head_page );
             base.assign_and_destroy_item( dst, p, index );
+            ITT_NOTIFY( sync_releasing, head_page );
         }
     }
     return success;
@@ -215,6 +287,33 @@ concurrent_queue_base::concurrent_queue_base( size_t item_size ) {
     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
     memset(my_rep,0,sizeof(concurrent_queue_rep));
     this->item_size = item_size;
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+#if _WIN32||_WIN64
+    my_rep->size_to_use = __TBB_INVALID_QSIZE;
+    my_rep->var_wait_for_items = CreateEvent( NULL, FALSE, FALSE, NULL);
+    my_rep->var_wait_for_slots = CreateEvent( NULL, FALSE, FALSE, NULL);
+#elif __TBB_USE_FUTEX
+    my_rep->size_to_use = __TBB_INVALID_QSIZE;
+    // do nothing extra
+#else /* __TBB_USE_PTHREAD_CONDWAIT; including MacOS */
+    // initialize pthread_mutex_t, and pthread_cond_t
+
+    // use default mutexes
+    pthread_mutexattr_t m_attr;
+    pthread_mutexattr_init( &m_attr );
+    pthread_mutexattr_setprotocol( &m_attr, PTHREAD_PRIO_INHERIT );
+    pthread_mutex_init( &my_rep->mtx_items_avail, &m_attr );
+    pthread_mutex_init( &my_rep->mtx_slots_avail, &m_attr );
+    pthread_mutexattr_destroy( &m_attr );
+
+    pthread_condattr_t c_attr;
+    pthread_condattr_init( &c_attr );
+    pthread_cond_init( &my_rep->var_wait_for_items, &c_attr );
+    pthread_cond_init( &my_rep->var_wait_for_slots, &c_attr );
+    pthread_condattr_destroy( &c_attr );
+
+#endif
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
 }
 
 concurrent_queue_base::~concurrent_queue_base() {
@@ -225,15 +324,28 @@ concurrent_queue_base::~concurrent_queue_base() {
         if( tp!=NULL )
             delete tp;
     }
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+# if _WIN32||_WIN64
+    CloseHandle( my_rep->var_wait_for_items );
+    CloseHandle( my_rep->var_wait_for_slots );
+#endif
+# if __TBB_USE_PTHREAD_CONDWAIT
+    pthread_mutex_destroy( &my_rep->mtx_items_avail );
+    pthread_mutex_destroy( &my_rep->mtx_slots_avail );
+    pthread_cond_destroy( &my_rep->var_wait_for_items );
+    pthread_cond_destroy( &my_rep->var_wait_for_slots );
+# endif
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
     cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
 }
 
 void concurrent_queue_base::internal_push( const void* src ) {
     concurrent_queue_rep& r = *my_rep;
+#if !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
     concurrent_queue_rep::ticket k  = r.tail_counter++;
     ptrdiff_t e = my_capacity;
     if( e<concurrent_queue_rep::infinite_capacity ) {
-        ExponentialBackoff backoff;
+        AtomicBackoff backoff;
         for(;;) {
             if( (ptrdiff_t)(k-r.head_counter)<e ) break;
             backoff.pause();
@@ -241,61 +353,452 @@ void concurrent_queue_base::internal_push( const void* src ) {
         }
     } 
     r.choose(k).push(src,k,*this);
+#else /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
+
+#if DO_ITT_NOTIFY
+    bool sync_prepare_done = false;
+#endif
+
+# if !__TBB_USE_PTHREAD_CONDWAIT
+    concurrent_queue_rep::ticket k = r.tail_counter;
+    bool in_transition = false;
+    ptrdiff_t e = my_capacity;
+    if( e<concurrent_queue_rep::infinite_capacity ) {
+        AtomicBackoff backoff;
+        for( ;; ) {
+            while( (ptrdiff_t)(k-r.head_counter)>=e ) {
+#if DO_ITT_NOTIFY
+                if( !sync_prepare_done ) {
+                     ITT_NOTIFY( sync_prepare, &sync_prepare_done );
+                     sync_prepare_done = true;
+                }
+#endif
+                if( !backoff.bounded_pause() ) {
+                    // queue is full.  go to sleep.
+                    r.n_waiting_producers++;
+
+                    // i created the mess. so I am the one who better clean it up.
+                    // and if someone else did not clean it up yet.
+                    if( in_transition ) {
+                        in_transition = false;
+                        __TBB_ASSERT( r.nthreads_in_transition>0, NULL );
+                        --r.nthreads_in_transition; // atomic decrement
+                    }
+                    
+                    if( (ptrdiff_t)(k-r.head_counter)>=e ) {
+#if _WIN32||_WIN64
+                        WaitForSingleObject( r.var_wait_for_slots, INFINITE );
+#elif __TBB_USE_FUTEX
+                        futex_wait( &r.var_wait_for_slots, 0 );
+                        // only one thread will wake up and come here at a time
+#endif
+                        in_transition = true;
+
+                        // raise barrier
+                        backoff.reset();
+                        while ( __TBB_CompareAndSwapW( &r.nthreads_to_read_size, r.thread_woken, 0 )!=0 )
+                            backoff.pause();
+                        
+                        r.nthreads_in_transition++; // atomic increment
+
+                        // lower barrier
+                        r.nthreads_to_read_size = 0;
+                    }
+#if __TBB_USE_FUTEX
+                    r.var_wait_for_slots = 0;
+#endif
+                    --r.n_waiting_producers;
+                    k = r.tail_counter;
+                    backoff.reset();
+                }
+                e = const_cast<volatile ptrdiff_t&>(my_capacity);
+            }
+            // increment the tail counter
+            concurrent_queue_rep::ticket tk = k;
+            k = r.tail_counter.compare_and_swap( tk+1, tk );
+            if( k==tk ) {
+                if( in_transition ) {
+                    in_transition = false;
+                    __TBB_ASSERT( r.nthreads_in_transition>0, NULL );
+                    --r.nthreads_in_transition;
+                }
+                break;
+            }
+        }
+
+#if DO_ITT_NOTIFY
+        if( sync_prepare_done )
+            ITT_NOTIFY( sync_acquired, &sync_prepare_done );
+#endif
+
+        r.choose( k ).push( src, k, *this );
+
+#if _WIN32||_WIN64
+        if( r.n_waiting_consumers>0 )
+            SetEvent( r.var_wait_for_items );
+        if( r.n_waiting_producers>0 && (ptrdiff_t)(r.tail_counter-r.head_counter)<my_capacity )
+            SetEvent( r.var_wait_for_slots );
+#elif __TBB_USE_FUTEX
+        if( r.n_waiting_consumers>0 && __TBB_CompareAndSwapW( &r.var_wait_for_items,1,0 )==0 )
+            futex_wakeup_one( &r.var_wait_for_items );
+        if( r.n_waiting_producers>0 && (ptrdiff_t)(r.tail_counter-r.head_counter)<my_capacity )
+            if( __TBB_CompareAndSwapW( &r.var_wait_for_slots,1,0 )==0)
+                futex_wakeup_one( &r.var_wait_for_slots );
+#endif
+    } else {
+        // in infinite capacity, no producers would ever sleep.
+        r.choose(k).push(src,k,*this);
+
+#if _WIN32||_WIN64
+        if( r.n_waiting_consumers>0 )
+            SetEvent( r.var_wait_for_items );
+#elif __TBB_USE_FUTEX
+        if( r.n_waiting_consumers>0 && __TBB_CompareAndSwapW( &r.var_wait_for_items,1,0 )==0 )
+            futex_wakeup_one( &r.var_wait_for_items );
+#endif
+    }
+
+# else /* __TBB_USE_PTHREAD_CONDWAIT */
+
+    concurrent_queue_rep::ticket k = r.tail_counter;
+    ptrdiff_t e = my_capacity;
+    if( e<concurrent_queue_rep::infinite_capacity ) {
+        AtomicBackoff backoff;
+        for( ;; ) {
+            while( (ptrdiff_t)(k-r.head_counter)>=e ) {
+#if DO_ITT_NOTIFY
+                if( !sync_prepare_done ) {
+                    ITT_NOTIFY( sync_prepare, &sync_prepare_done );
+                    sync_prepare_done = true;
+                }
+#endif
+                if( !backoff.bounded_pause() ) {
+                    // queue is full.  go to sleep.
+
+                    pthread_mutex_lock( &r.mtx_slots_avail );
+
+                    r.n_waiting_producers++;
+
+                    while( (ptrdiff_t)(k-r.head_counter)>=e )
+                        pthread_cond_wait( &r.var_wait_for_slots, &r.mtx_slots_avail );
+
+                    --r.n_waiting_producers;
+
+                    pthread_mutex_unlock( &r.mtx_slots_avail );
+
+                    k = r.tail_counter;
+                    backoff.reset();
+                }
+                e = const_cast<volatile ptrdiff_t&>(my_capacity);
+            }
+            // increment the tail counter
+            concurrent_queue_rep::ticket tk = k;
+            k = r.tail_counter.compare_and_swap( tk+1, tk );
+            if( tk==k )
+                break;
+        }
+    }
+#if DO_ITT_NOTIFY
+    if( sync_prepare_done )
+        ITT_NOTIFY( sync_acquired, &sync_prepare_done );
+#endif
+    r.choose( k ).push( src, k, *this );
+
+    if( r.n_waiting_consumers>0 ) {
+        pthread_mutex_lock( &r.mtx_items_avail );
+        // pthread_cond_signal() wakes up 'at least' one consumer.
+        if( r.n_waiting_consumers>0 )
+            pthread_cond_signal( &r.var_wait_for_items );
+        pthread_mutex_unlock( &r.mtx_items_avail );
+    }
+
+# endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+
+#endif /* !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
 }
 
 void concurrent_queue_base::internal_pop( void* dst ) {
     concurrent_queue_rep& r = *my_rep;
+#if !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
     concurrent_queue_rep::ticket k;
     do {
         k = r.head_counter++;
     } while( !r.choose(k).pop(dst,k,*this) );
+#else /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
+
+#if DO_ITT_NOTIFY
+    bool sync_prepare_done = false;
+#endif
+# if !__TBB_USE_PTHREAD_CONDWAIT
+    concurrent_queue_rep::ticket k;
+    bool in_transition = false;
+    AtomicBackoff backoff;
+
+    do {
+        k = r.head_counter;
+        for( ;; ) {
+            while( r.tail_counter<=k ) {
+#if DO_ITT_NOTIFY
+                if( !sync_prepare_done ) {
+                    ITT_NOTIFY( sync_prepare, dst );
+                    dst = (void*) ((intptr_t)dst | 1);
+                    sync_prepare_done = true;
+                }
+#endif
+                // Queue is empty; pause and re-try a few times
+                if( !backoff.bounded_pause() ) {
+                    // it is really empty.. go to sleep
+                    r.n_waiting_consumers++;
+
+                    if( in_transition ) {
+                        in_transition = false;
+                        __TBB_ASSERT( r.nthreads_in_transition>0, NULL );
+                        --r.nthreads_in_transition;
+                    }
+                    
+                    if( r.tail_counter<=k ) {
+#if _WIN32||_WIN64
+                        WaitForSingleObject( r.var_wait_for_items, INFINITE );
+#elif __TBB_USE_FUTEX
+                        futex_wait( &r.var_wait_for_items, 0 );
+                        r.var_wait_for_items = 0;
+#endif 
+                        in_transition = true;
+
+                        // raise barrier
+                        backoff.reset();
+                        while ( __TBB_CompareAndSwapW( &r.nthreads_to_read_size, r.thread_woken, 0 )!=0 )
+                            backoff.pause();
+                        
+                        ++r.nthreads_in_transition;
+
+                        // lower barrier
+                        r.nthreads_to_read_size = 0;
+
+                        --r.n_waiting_consumers;
+                        backoff.reset();
+                        k = r.head_counter;
+                    } else {
+#if __TBB_USE_FUTEX
+                        r.var_wait_for_items = 0;
+#endif
+                        --r.n_waiting_consumers;
+                        break;
+                    }
+                }
+            }
+            // Queue had item with ticket k when we looked.  Attempt to get that item.
+            concurrent_queue_rep::ticket tk=k;
+            k = r.head_counter.compare_and_swap( tk+1, tk );
+            if( k==tk ) {
+                if( in_transition ) {
+                    in_transition = false;
+                    __TBB_ASSERT( r.nthreads_in_transition>0, NULL );
+                    --r.nthreads_in_transition;
+                }
+
+                break; // break from the middle 'for' loop
+            }
+            // Another thread snatched the item, so pause and retry.
+        }
+    } while( !r.choose(k).pop(dst,k,*this) );
+
+#if _WIN32||_WIN64
+    if( r.n_waiting_consumers>0 && (ptrdiff_t)(r.tail_counter-r.head_counter)>0 )
+        SetEvent( r.var_wait_for_items );
+    if( r.n_waiting_producers>0 )
+        SetEvent( r.var_wait_for_slots );
+#elif __TBB_USE_FUTEX
+    if( r.n_waiting_consumers>0 && (ptrdiff_t)(r.tail_counter-r.head_counter)>0 )
+        if( __TBB_CompareAndSwapW( &r.var_wait_for_items, 1, 0 )==0 )
+            futex_wakeup_one( &r.var_wait_for_items );
+    // wake up a producer..
+    if( r.n_waiting_producers>0 && __TBB_CompareAndSwapW( &r.var_wait_for_slots, 1, 0 )==0 )
+        futex_wakeup_one( &r.var_wait_for_slots );
+#endif
+
+# else /* __TBB_USE_PTHREAD_CONDWAIT */
+
+    concurrent_queue_rep::ticket k;
+    AtomicBackoff backoff;
+
+    do {
+        k = r.head_counter;
+        for( ;; ) {
+            while( r.tail_counter<=k ) {
+#if DO_ITT_NOTIFY
+                if( !sync_prepare_done ) {
+                    ITT_NOTIFY( sync_prepare, dst );
+                    dst = (void*) ((intptr_t)dst | 1);
+                    sync_prepare_done = true;
+                }
+#endif
+                // Queue is empty; pause and re-try a few times
+                if( !backoff.bounded_pause() ) {
+                    // it is really empty.. go to sleep
+
+                    pthread_mutex_lock( &r.mtx_items_avail );
+
+                    r.n_waiting_consumers++;
+
+                    if( r.tail_counter<=k ) {
+                        while( r.tail_counter<=k )
+                            pthread_cond_wait( &r.var_wait_for_items, &r.mtx_items_avail );
+
+                        --r.n_waiting_consumers;
+
+                        pthread_mutex_unlock( &r.mtx_items_avail );
+
+                        backoff.reset();
+                        k = r.head_counter;
+                    } else {
+                        --r.n_waiting_consumers;
+
+                        pthread_mutex_unlock( &r.mtx_items_avail );
+                        break;
+                    }
+                }
+            }
+            // Queue had item with ticket k when we looked.  Attempt to get that item.
+            concurrent_queue_rep::ticket tk=k;
+            k = r.head_counter.compare_and_swap( tk+1, tk );
+            if( tk==k )
+                break; // break from the middle 'for' loop
+            // Another thread snatched the item, so pause and retry.
+        }
+    } while( !r.choose(k).pop(dst,k,*this) );
+
+    if( r.n_waiting_producers>0 ) {
+        pthread_mutex_lock( &r.mtx_slots_avail );
+        if( r.n_waiting_producers>0 )
+            pthread_cond_signal( &r.var_wait_for_slots );
+        pthread_mutex_unlock( &r.mtx_slots_avail );
+    }
+
+# endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+
+#endif /* !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
 }
 
 bool concurrent_queue_base::internal_pop_if_present( void* dst ) {
     concurrent_queue_rep& r = *my_rep;
     concurrent_queue_rep::ticket k;
     do {
-        ExponentialBackoff backoff;
+        k = r.head_counter;
         for(;;) {
-            k = r.head_counter;
             if( r.tail_counter<=k ) {
                 // Queue is empty 
                 return false;
             }
             // Queue had item with ticket k when we looked.  Attempt to get that item.
-            if( r.head_counter.compare_and_swap(k+1,k)==k ) {
+            concurrent_queue_rep::ticket tk=k;
+            k = r.head_counter.compare_and_swap( tk+1, tk );
+            if( k==tk )
                 break;
-            }
-            // Another thread snatched the item, so pause and retry.
-            backoff.pause();
+            // Another thread snatched the item, so retry.
         }
     } while( !r.choose(k).pop(dst,k,*this) );
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+#if _WIN32||_WIN64
+    // wake up a producer..
+    if( r.n_waiting_producers>0 )
+        SetEvent( r.var_wait_for_slots );
+#elif __TBB_USE_FUTEX
+    if( r.n_waiting_producers>0 && __TBB_CompareAndSwapW( &r.var_wait_for_slots, 1, 0 )==0 )
+        futex_wakeup_one( &r.var_wait_for_slots );
+#else /* including MacOS */
+    if( r.n_waiting_producers>0 ) {
+        pthread_mutex_lock( &r.mtx_slots_avail );
+        if( r.n_waiting_producers>0 )
+            pthread_cond_signal( &r.var_wait_for_slots );
+        pthread_mutex_unlock( &r.mtx_slots_avail );
+    }
+#endif
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
     return true;
 }
 
 bool concurrent_queue_base::internal_push_if_not_full( const void* src ) {
     concurrent_queue_rep& r = *my_rep;
-    ExponentialBackoff backoff;
-    concurrent_queue_rep::ticket k;
+    concurrent_queue_rep::ticket k = r.tail_counter;
     for(;;) {
-        k = r.tail_counter;
         if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
             // Queue is full
             return false;
         }
         // Queue had empty slot with ticket k when we looked.  Attempt to claim that slot.
-        if( r.tail_counter.compare_and_swap(k+1,k)==k ) 
+        concurrent_queue_rep::ticket tk=k;
+        k = r.tail_counter.compare_and_swap( tk+1, tk );
+        if( k==tk )
             break;
-        // Another thread claimed the slot, so pause and retry.
-        backoff.pause();
+        // Another thread claimed the slot, so retry.
     }
     r.choose(k).push(src,k,*this);
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+#if _WIN32||_WIN64
+    // wake up a consumer..
+    if( r.n_waiting_consumers>0 )
+        SetEvent( r.var_wait_for_items );
+#elif __TBB_USE_FUTEX
+    if( r.n_waiting_consumers>0 && __TBB_CompareAndSwapW( &r.var_wait_for_items, 1, 0 )==0 )
+        futex_wakeup_one( &r.var_wait_for_items );
+#else /* including MacOS */
+    if( r.n_waiting_consumers>0 ) {
+        pthread_mutex_lock( &r.mtx_items_avail );
+        if( r.n_waiting_consumers>0 )
+            pthread_cond_signal( &r.var_wait_for_items );
+        pthread_mutex_unlock( &r.mtx_items_avail );
+    }
+#endif
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
     return true;
 }
 
 ptrdiff_t concurrent_queue_base::internal_size() const {
     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
+#if __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE
+# if !__TBB_USE_PTHREAD_CONDWAIT
+    concurrent_queue_rep& r = *my_rep;
+    ptrdiff_t sz;
+    {
+        ptrdiff_t n, nn;
+        AtomicBackoff bo;
+    restart_read_size:
+        while( (n=r.nthreads_to_read_size)==r.thread_woken ) // a woken thread is incrementing nthreads_in_transition
+            bo.pause();
+        bo.reset();
+        do {
+            nn = n;
+            n = __TBB_CompareAndSwapW( &r.nthreads_to_read_size, nn+1, nn );
+            if( n==r.thread_woken ) // I lost to a woken thread
+                goto restart_read_size;
+        } while ( n!=nn );
+
+        while( r.nthreads_in_transition>0 ) // wait until already woken threads to finish
+            bo.pause();
+
+        sz = ptrdiff_t((r.tail_counter-r.head_counter)+(r.n_waiting_producers-r.n_waiting_consumers));
+
+        n = r.nthreads_to_read_size;
+        do {
+            nn = n;
+            n = __TBB_CompareAndSwapW( &r.nthreads_to_read_size, nn-1, nn );
+        } while ( n!=nn );
+    }
+    return sz;
+#else /* __TBB_USE_PTHREAD_CONDWAIT */
+    concurrent_queue_rep& r = *my_rep;
+    pthread_mutex_lock( &r.mtx_slots_avail );
+    int nwp = r.n_waiting_producers;
+    pthread_mutex_unlock( &r.mtx_slots_avail );
+    pthread_mutex_lock( &r.mtx_items_avail );
+    int nwc = r.n_waiting_consumers;
+    pthread_mutex_unlock( &r.mtx_items_avail );
+    return ptrdiff_t((r.tail_counter-r.head_counter)+(nwp - nwc));
+#endif /* !__TBB_USE_PTHREAD_CONDWAIT */
+#else /* !__TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE || __TBB_USE_PTHREAD_CONDWAIT */
     return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter);
+#endif /* __TBB_NO_BUSY_WAIT_IN_CONCURRENT_QUEUE */
 }
 
 void concurrent_queue_base::internal_set_capacity( ptrdiff_t capacity, size_t /*item_size*/ ) {
diff --git a/src/tbb/concurrent_vector.cpp b/src/tbb/concurrent_vector.cpp
index a523e7a..1892508 100644
--- a/src/tbb/concurrent_vector.cpp
+++ b/src/tbb/concurrent_vector.cpp
@@ -31,6 +31,7 @@
 #include <stdexcept>
 #include "itt_notify.h"
 #include "tbb/task.h"
+#include <cstring>
 
 
 namespace tbb {
diff --git a/src/tbb/gate.h b/src/tbb/gate.h
index d47e0c0..b57ace5 100644
--- a/src/tbb/gate.h
+++ b/src/tbb/gate.h
@@ -29,7 +29,7 @@
 #ifndef _TBB_Gate_H
 #define _TBB_Gate_H
 
-#define IMPROVED_GATING 0
+#define IMPROVED_GATING 1
 namespace tbb {
 
 namespace internal {
@@ -44,6 +44,43 @@ public:
     void wait() {}
 };
 
+#elif __TBB_USE_FUTEX
+
+//! Implementation of Gate based on futex.
+/** Use this futex-based implementation where possible, because it is the simplest and usually fastest. */
+class Gate {
+public:
+    typedef intptr state_t;
+
+    //! Get current state of gate
+    state_t get_state() const {
+        return state;
+    }
+    //! Update state=value if state==comparand (flip==false) or state!=comparand (flip==true)
+    void try_update( intptr value, intptr comparand, bool flip=false ) {
+        __TBB_ASSERT( comparand!=0 || value!=0, "either value or comparand must be non-zero" );
+        intptr old = state;
+        if( flip ) {
+            if( old==comparand ) 
+                return;
+            comparand = old;
+        } else {
+            if( old!=comparand )
+                return;
+        }   
+        old = state.compare_and_swap( value, comparand );
+        if( old==comparand && value!=0 )
+            futex_wakeup_all( &state );
+    }
+    //! Wait for state!=0.
+    void wait() {
+        if( state==0 )
+            futex_wait( &state, 0 );
+    }
+private:
+    atomic<state_t> state;
+};
+
 #elif USE_WINTHREAD
 
 class Gate {
diff --git a/src/tbb/pipeline.cpp b/src/tbb/pipeline.cpp
index 1bed434..238988b 100644
--- a/src/tbb/pipeline.cpp
+++ b/src/tbb/pipeline.cpp
@@ -27,6 +27,7 @@
 */
 
 #include "tbb/pipeline.h"
+#include "tbb/spin_mutex.h"
 #include "tbb/cache_aligned_allocator.h"
 #include "itt_notify.h"
 
@@ -88,7 +89,7 @@ public:
                 // Trying to put token that is beyond low_token.
                 // Need to wait until low_token catches up before dispatching.
                 result = NULL;
-                __TBB_ASSERT( token>low_token, NULL );
+                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );
                 if( token-low_token>=array_size ) 
                     grow( token-low_token+1 );
                 ITT_NOTIFY( sync_releasing, this );
@@ -142,20 +143,44 @@ private:
     pipeline& my_pipeline;
     void* my_object;
     filter* my_filter;  
-    const Token my_token;
+    //! Invalid until this task has read the input.
+    Token my_token;
+    //! True if this task has not yet read the input.
+    bool my_at_start;
 public:
-    stage_task( pipeline& pipeline, Token token, filter* filter_list ) : 
+    //! Construct stage_task for first stage in a pipeline.
+    /** Such a stage has not read any input yet. */
+    stage_task( pipeline& pipeline ) : 
+        my_pipeline(pipeline), 
+        my_object(NULL),
+        my_filter(pipeline.filter_list),
+        my_at_start(true)
+    {}
+    stage_task( pipeline& pipeline, filter* filter_list ) : 
         my_pipeline(pipeline), 
         my_filter(filter_list),
-        my_token(token)
+        my_at_start(false)
     {}
     task* execute();
 };
 
 task* stage_task::execute() {
+    __TBB_ASSERT( !my_at_start || !my_object, NULL );
     my_object = (*my_filter)(my_object);
-    if( ordered_buffer* input_buffer = my_filter->input_buffer )
-        input_buffer->note_done(my_token,*this);
+    if( my_at_start ) {
+        if( my_object ) {
+            my_token = my_pipeline.token_counter++;
+            if( --my_pipeline.input_tokens>0 ) 
+                spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );
+        } else {
+            my_pipeline.end_of_input = true;
+            return NULL;
+        }
+        my_at_start = false;
+    } else {
+        if( ordered_buffer* input_buffer = my_filter->input_buffer )
+            input_buffer->note_done(my_token,*this);
+    }
     task* next = NULL;
     my_filter = my_filter->next_filter_in_pipeline; 
     if( my_filter ) {
@@ -164,7 +189,8 @@ task* stage_task::execute() {
         add_to_depth(1);
         if( ordered_buffer* input_buffer = my_filter->input_buffer ) {
             // The next filter must execute tokens in order.
-            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_token, my_filter );
+            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );
+            clone.my_token = my_token;
             clone.my_object = my_object;
             next = input_buffer->put_token(clone,my_token);
         } else {
@@ -176,41 +202,17 @@ task* stage_task::execute() {
         // The token must be injected before execute() returns, in order to prevent the
         // end_counter task's reference count from prematurely reaching 0.
         set_depth( my_pipeline.end_counter->depth()+1 ); 
-        my_pipeline.inject_token( *this );
+        if( ++my_pipeline.input_tokens==1 ) 
+            if( !my_pipeline.end_of_input ) 
+                spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );
     }
     return next;
 }
 
 } // namespace internal
 
-void pipeline::inject_token( task& self ) {
-    void* o = NULL;
-    filter* f = filter_list;
-    spin_mutex::scoped_lock lock( input_mutex );
-    if( !end_of_input ) {
-        ITT_NOTIFY(sync_acquired, this );
-        o = (*f)(NULL);
-        ITT_NOTIFY(sync_releasing, this );
-        if( o ) {
-            internal::Token token = token_counter++;
-            lock.release(); // release the lock as soon as finished updating shared fields
-
-            f = f->next_filter_in_pipeline;
-            // Successfully fetched another input object.  
-            // Create a stage_task to process it.
-            internal::stage_task* s = new( self.allocate_additional_child_of(*end_counter) ) internal::stage_task( *this, token, f );
-            s->my_object = o;
-            if( internal::ordered_buffer* input_buffer = f->input_buffer ) {
-                // The next filter must execute tokens in order.
-                s = static_cast<internal::stage_task*>(input_buffer->put_token(*s,token));
-            } 
-            if( s ) {
-                self.spawn(*s);
-            }
-        } 
-        else 
-            end_of_input = true;
-    }
+void pipeline::inject_token( task& ) {
+    __TBB_ASSERT(0,"illegal call to inject_token");
 }
 
 pipeline::pipeline() : 
@@ -220,6 +222,7 @@ pipeline::pipeline() :
     token_counter(0),
     end_of_input(false)
 {
+    input_tokens = 0;
 }
 
 pipeline::~pipeline() {
@@ -257,10 +260,11 @@ void pipeline::run( size_t max_number_of_live_tokens ) {
         if( filter_list->next_filter_in_pipeline ) {
             end_of_input = false;
             end_counter = new( task::allocate_root() ) empty_task;
-            end_counter->set_ref_count(1);
-            for( size_t i=0; i<max_number_of_live_tokens; ++i )
-                inject_token( *end_counter );
-            end_counter->wait_for_all();
+            // 2 = 1 for spawned child + 1 for wait
+            end_counter->set_ref_count(2);
+            input_tokens = internal::Token(max_number_of_live_tokens);
+            // Prime the pump with the non-waiter
+            end_counter->spawn_and_wait_for_all( *new( end_counter->allocate_child() ) internal::stage_task( *this ) );
             end_counter->destroy(*end_counter);
             end_counter = NULL;
         } else {
diff --git a/src/tbb/queuing_mutex.cpp b/src/tbb/queuing_mutex.cpp
index 6effd50..5067892 100644
--- a/src/tbb/queuing_mutex.cpp
+++ b/src/tbb/queuing_mutex.cpp
@@ -105,9 +105,6 @@ void queuing_mutex::scoped_lock::release( )
         SpinwaitWhileEq( next, (scoped_lock*)0 );
     }
     __TBB_ASSERT(next,NULL);
-    // The volatile here ensures release semantics on IPF, which are necessary
-    // so that the user's critical section sends the correct values to the next
-    // process that acquires the critical section.
     __TBB_store_with_release(next->going, 1);
 done:
     initialize();
diff --git a/src/tbb/queuing_rw_mutex.cpp b/src/tbb/queuing_rw_mutex.cpp
index 483b5f9..3b5e61b 100644
--- a/src/tbb/queuing_rw_mutex.cpp
+++ b/src/tbb/queuing_rw_mutex.cpp
@@ -253,9 +253,6 @@ void queuing_rw_mutex::scoped_lock::release( )
             acquire_internal_lock();
             queuing_rw_mutex::scoped_lock* tmp = tricky_pointer::fetch_and_store<tbb::release>(&(n->prev), NULL);
             n->state = STATE_UPGRADE_LOSER;
-            // The volatile here ensures release semantics on IPF, which is necessary
-            // so that the user's critical section sends the correct values to the next
-            // process that acquires the critical section.
             __TBB_store_with_release(n->going,1);
             unblock_or_wait_on_internal_lock(get_flag(tmp));
         } else {
diff --git a/src/tbb/task.cpp b/src/tbb/task.cpp
index a44f03e..88fd8a8 100644
--- a/src/tbb/task.cpp
+++ b/src/tbb/task.cpp
@@ -213,9 +213,12 @@ class UnpaddedArenaPrefix {
     //! Total number of slots in the arena
     const unsigned number_of_slots;
 
-    //! One more than number of workers that belong to this arena
+    //! Number of workers that belong to this arena
     const unsigned number_of_workers;
 
+    //! Number of workers still using this arena (plus one if a master is still using the arena)
+    atomic<int> gc_ref_count;
+
     //! Array of workers.
     WorkerDescriptor* worker_list;
 
@@ -237,7 +240,8 @@ protected:
 #if COUNT_TASK_NODES
         task_node_count = 0;
 #endif /* COUNT_TASK_NODES */
-        limit = number_of_workers;
+        limit = number_of_workers_;
+        gc_ref_count = number_of_workers_+1; 
     }
 };
 
@@ -256,7 +260,10 @@ public:
 struct UnpaddedArenaSlot {
     //! Holds copy of task_pool->deepest and a lock bit
     /** Computed as 2*task_pool->deepest+(is_locked).
-        I.e., the low order bit indicates whether the slot is locked. */
+        I.e., the low order bit indicates whether the slot is locked. 
+        -2 denotes an empty task pool
+        -3 denotes an unused arena slot that is locked
+        -4 denotes an unused arena slot that is unlocked */
     volatile depth_type steal_end;
     TaskPool* task_pool;
     bool owner_waits;
@@ -280,11 +287,17 @@ class Arena {
         Creates the worker threads, but does not wait for them to start. */
     static Arena* allocate_arena( unsigned number_of_slots, unsigned number_of_workers );
 
+    void free_arena() {
+        NFS_Free( &prefix() );
+    }
 
     //! Terminate worker threads
     /** Wait for worker threads to complete. */
     void terminate_workers();
 
+    //! Remove a reference to the arena, and free the arena if no references remain.
+    void remove_gc_reference();
+
 #if COUNT_TASK_NODES
     //! Returns the number of task objects "living" in worker threads
     inline intptr workers_task_node_count();
@@ -766,9 +779,10 @@ public:
     void mark_pool_full();
 
     //! Wait while pool is empty
-    void wait_while_pool_is_empty();
+    /** Return true if pool transitioned from empty to non-empty while thread was waiting. */
+    bool wait_while_pool_is_empty();
 #endif /* IMPROVED_GATING */
-                 
+
 #if TEST_ASSEMBLY_ROUTINES
     /** Defined in test_assembly.cpp */
     void test_assembly_routines();
@@ -858,9 +872,9 @@ inline void GenericScheduler::free_task( task& t ) {
         GATHER_STATISTIC(current_length+=1);
         p.next = free_list;
         free_list = &t;
-    } else if( h&is_local || p.origin ) {
+    } else if( !(h&is_local) && p.origin ) {
         free_nonlocal_small_task(t);
-    } else {  
+    } else {
         deallocate_task(t);
     }
 }
@@ -894,13 +908,13 @@ inline void GenericScheduler::mark_pool_full() {
          arena->prefix().gate.try_update( SNAPSHOT_FULL, SNAPSHOT_PERMANENTLY_OPEN, true );
 }
 
-void GenericScheduler::wait_while_pool_is_empty() {
+bool GenericScheduler::wait_while_pool_is_empty() {
     for(;;) {
         Gate::state_t snapshot = arena->prefix().gate.get_state();
         switch( snapshot ) {
             case SNAPSHOT_EMPTY:
                 arena->prefix().gate.wait();
-                return;
+                return true;
             case SNAPSHOT_FULL: {
                 // Use unique id for "busy" in order to avoid ABA problems.
                 const Gate::state_t busy = Gate::state_t(this);
@@ -922,13 +936,12 @@ void GenericScheduler::wait_while_pool_is_empty() {
                             arena->prefix().gate.try_update( SNAPSHOT_FULL, busy );
                         }
                     }
-                    return;
-                }
-                break;
+                } 
+                return false;
             }
             default:
                 // Another thread is taking a snapshot or gate is permanently open.
-                return;
+                return false;
         }
     }
 }
@@ -1024,53 +1037,61 @@ void Arena::terminate_workers() {
     __TBB_ASSERT( n>=0, "negative number of workers; casting error?" );
 #if IMPROVED_GATING
     prefix().gate.try_update( GenericScheduler::SNAPSHOT_PERMANENTLY_OPEN, GenericScheduler::SNAPSHOT_PERMANENTLY_OPEN, true );
-#else
-    prefix().gate.open();
 #endif /* IMPROVED_GATING */
     for( int i=n; --i>=0; ) {
         WorkerDescriptor& w = prefix().worker_list[i];
-        // The following while wait_for_all protects against situation where worker has not started yet.
-        // It would be more elegant to do this with condition variables, but the situation
-        // is probably rare enough in practice that it might not be worth the added complexity
-        // of condition variables.
-        ITT_NOTIFY(sync_prepare, &w.scheduler);
-        SpinwaitWhileEq( w.scheduler, (scheduler*)NULL );
+        if( w.scheduler || __TBB_CompareAndSwapW( &w.scheduler, intptr(-1), intptr(0) ) ) {
+            // Worker published itself.  Tell worker to quit.
         ITT_NOTIFY(sync_acquired, &w.scheduler);
         task* t = w.scheduler->dummy_task;
         ITT_NOTIFY(sync_releasing, &t->prefix().ref_count);
         t->prefix().ref_count = 1;
+        } else {
+            // Worker did not publish itself yet, and we have set w.scheduler to -1, 
+            // which tells the worker that it should never publish itself.
+        }
     }
-    // Wait for all workers to quit
+#if !IMPROVED_GATING
+    prefix().gate.open();
+#endif /* IMPROVED_GATING */
+    // Wait for all published workers to quit
     for( int i=n; --i>=0; ) {
         WorkerDescriptor& w = prefix().worker_list[i];
+        if( intptr(w.scheduler)!=-1 ) {
 #if USE_WINTHREAD
-        DWORD status = WaitForSingleObject( w.thread_handle, INFINITE );
-        if( status==WAIT_FAILED ) {
-            fprintf(stderr,"Arena::terminate_workers: WaitForSingleObject failed\n");
-            exit(1);
-        }
-        CloseHandle( w.thread_handle );
-        w.thread_handle = (HANDLE)0;
+            DWORD status = WaitForSingleObject( w.thread_handle, INFINITE );
+            if( status==WAIT_FAILED ) {
+                fprintf(stderr,"Arena::terminate_workers: WaitForSingleObject failed\n");
+                exit(1);
+            }
+            CloseHandle( w.thread_handle );
+            w.thread_handle = (HANDLE)0;
 #else
-        int status = pthread_join( w.thread_handle, NULL );
-        if( status )
-            handle_perror(status,"pthread_join");
+            int status = pthread_join( w.thread_handle, NULL );
+            if( status )
+                handle_perror(status,"pthread_join");
 #endif /* USE_WINTHREAD */
+        }
     }
     // All workers have quit
 #if !IMPROVED_GATING
     prefix().gate.close();
 #endif /* !IMPROVED_GATING */
-    delete[] prefix().worker_list;
-    prefix().worker_list = NULL;
 #if COUNT_TASK_NODES && !TEST_ASSEMBLY_ROUTINES
     if( prefix().task_node_count ) {
         fprintf(stderr,"warning: leaked %ld task objects\n", long(prefix().task_node_count));
     }
 #endif /* COUNT_TASK_NODES && !TEST_ASSEMBLY_ROUTINES */
-    __TBB_ASSERT( this, "attempt to free NULL Arena" );
-    prefix().~ArenaPrefix();
-    NFS_Free( &prefix() );
+    remove_gc_reference();
+}
+
+void Arena::remove_gc_reference() {
+    __TBB_ASSERT( this, "attempt to remove reference to NULL Arena" );
+    if( --prefix().gc_ref_count==0 ) {
+        delete[] prefix().worker_list;
+        prefix().~ArenaPrefix();
+        free_arena();
+    }
 }
 
 #if COUNT_TASK_NODES
@@ -1227,13 +1248,12 @@ inline void GenericScheduler::acquire_task_pool() const {
         __TBB_ASSERT( arena_slot==&dummy_slot || arena_slot==&arena->slot[dummy_slot.task_pool->prefix().arena_index], "slot ownership corrupt?" );
         __TBB_ASSERT( arena_slot->task_pool==dummy_slot.task_pool, "slot ownership corrupt?" );
         depth_type steal_end = arena_slot->steal_end;
-        if( steal_end==2*deepest && (steal_end=__TBB_CompareAndSwapW( (volatile void *)&(arena_slot->steal_end), 2*deepest+1, 2*deepest ))==2*deepest ) {
+        if( (steal_end&1)==0 && (__TBB_CompareAndSwapW( &arena_slot->steal_end, steal_end|1, steal_end )&1)==0 ) {
             // We acquired our own slot
             ITT_NOTIFY(sync_acquired, arena_slot);
             arena_slot->owner_waits = false;
             break;
         } else {
-            __TBB_ASSERT( steal_end&1, "steal_end and/or deepest corrupt?" );
             // Someone else acquired a lock, so pause and do exponential backoff.
             if( !sync_prepare_done ) {
                 // Start waiting
@@ -1416,8 +1436,13 @@ task* GenericScheduler::steal_task( UnpaddedArenaSlot& arena_slot, depth_type d
             break;
         }
     }
-    if( tp->prefix().steal_begin>=d )
+    if( tp->prefix().steal_begin>=d ) {
         tp->prefix().steal_begin = i;
+        if( i>steal_end>>1 ) {
+            // Pool is empty.  This is important information for threads taking snapshots.
+            steal_end = -2;
+        }
+    }
     // Release the task pool
     ITT_NOTIFY(sync_releasing, &arena_slot);
     arena_slot.steal_end = steal_end;
@@ -1444,7 +1469,7 @@ void CustomScheduler<SchedulerTraits>::wait_for_all( task& parent, task* child )
     if( innermost_running_task==dummy_task ) {
         // We are in the innermost task dispatch loop of a master thread.
         __TBB_ASSERT( !is_worker(), NULL );
-        // Forcefully make this loop operating on zero depth.
+        // Forcefully make this loop operate at zero depth.
         d = 0;
     } else {
         d = parent.prefix().depth+1;
@@ -1526,7 +1551,8 @@ void CustomScheduler<SchedulerTraits>::wait_for_all( task& parent, task* child )
                                 __TBB_ASSERT(assert_okay(),NULL);
                             }
                         }
-                        } break;
+                        break;
+                    }
 
                     case task::reexecute: // set by recycle_to_reexecute()
                         __TBB_ASSERT( t_next, "reexecution requires that method 'execute' return a task" );
@@ -1611,23 +1637,27 @@ void CustomScheduler<SchedulerTraits>::wait_for_all( task& parent, task* child )
             }
             // Pause, even if we are going to yield, because the yield might return immediately.
             __TBB_Pause(PauseTime);
+#if IMPROVED_GATING
+            int yield_threshold = 2*int(n);
+            if( failure_count>=yield_threshold ) {
+                __TBB_Yield();
+                if( failure_count>=yield_threshold+100 ) {
+                    if( d==0 && is_worker() && wait_while_pool_is_empty() )
+                        failure_count = 0;
+                    else
+                        failure_count = yield_threshold;
+#else
             int yield_threshold = int(n);
             if( failure_count>=yield_threshold ) {
                 if( failure_count>=2*yield_threshold ) {
                     __TBB_Yield();
-#if IMPROVED_GATING
-                    // Note: if d!=0 or !is_worker(), it is not safe to wait for a non-empty pool,
-                    // because of the polling of parent.prefix().ref_count.
-                    if( d==0 && is_worker() ) 
-                        wait_while_pool_is_empty();
-#else
                     arena->prefix().gate.wait();
-#endif /* IMPROVED_GATING */
                     failure_count = 0;
                 } else if( failure_count==yield_threshold ) {
                     // We have paused n times since last yield.
                     // Odds are that there is no other work to do.
                     __TBB_Yield();
+#endif /* IMPROVED_GATING */
                 }
             }
         }
@@ -1717,7 +1747,6 @@ void GenericScheduler::leave_arena( bool compress ) {
 }
 
 GenericScheduler* GenericScheduler::create_worker( WorkerDescriptor& w ) {
-    __TBB_ASSERT( !w.scheduler, NULL );
     unsigned n = w.arena->prefix().number_of_workers;
     WorkerDescriptor* worker_list = w.arena->prefix().worker_list;
     __TBB_ASSERT( &w >= worker_list, NULL );
@@ -1746,9 +1775,34 @@ GenericScheduler* GenericScheduler::create_worker( WorkerDescriptor& w ) {
     slot.steal_end = -2;
     slot.owner_waits = false;
 
-    // Publish worker
+#if USE_WINTHREAD
+    HANDLE cur_process = GetCurrentProcess();
+    BOOL bRes = DuplicateHandle(cur_process, GetCurrentThread(), cur_process, &w.thread_handle, 0, FALSE, DUPLICATE_SAME_ACCESS);
+    if( !bRes ) {
+        printf("ERROR: DuplicateHandle failed with status 0x%08X", GetLastError());
+        w.thread_handle = INVALID_HANDLE_VALUE;
+    }
+#else /* USE_PTHREAD */
+    w.thread_handle = pthread_self();
+#endif /* USE_PTHREAD */
+    // Attempt to publish worker
     ITT_NOTIFY(sync_releasing, &w.scheduler);
-    w.scheduler = s;
+    // Note: really need only release fence on the compare-and-swap.
+    if( __TBB_CompareAndSwapW( &w.scheduler, (intptr)s, intptr(0) )==-1 ) {
+        // Master thread has already commenced terminate_workers() and not waited for us to respond.
+        // Thus we are responsible for cleaning up ourselves.
+        s->dummy_task->prefix().ref_count = 1;
+#if USE_WINTHREAD
+        CloseHandle( w.thread_handle );
+        w.thread_handle = (HANDLE)0;
+#else /* USE_PTHREAD */
+        int status = pthread_detach( w.thread_handle );
+        if( status )
+            handle_perror(status,"pthread_detach");
+#endif /* USE_PTHREAD */
+    } else {
+        __TBB_ASSERT( w.scheduler==s, NULL );
+    }
     return s;
 }
 
@@ -1810,9 +1864,10 @@ void GenericScheduler::cleanup_worker_thread( void* arg ) {
     TBB_TRACE(("%p.cleanup_worker_thread enter\n",arg));
     GenericScheduler& s = *(GenericScheduler*)arg;
     __TBB_ASSERT( s.dummy_slot.task_pool, "cleaning up worker with missing TaskPool" );
-    //Arena* a = s.arena;
+    Arena* a = s.arena;
     __TBB_ASSERT( s.arena_slot!=&s.dummy_slot, "worker not in arena?" );
     s.free_scheduler();
+    a->remove_gc_reference();
 }
 
 //------------------------------------------------------------------------
@@ -1827,15 +1882,18 @@ void WorkerDescriptor::start_one_worker_thread() {
     if( status==0 )
         handle_perror(errno,"__beginthreadex");
     else
-        thread_handle = (HANDLE)status;
+        CloseHandle((HANDLE)status);
 #else
+    // this->thread_handle will be set from the thread function to avoid possible
+    // race with subsequent pthread_detach or pthread_join calls.
+    pthread_t handle;
     // This if is due to an Intel Compiler Bug, tracker # C70996
     // This #if should be removed as soon as the bug is fixed
 #if __APPLE__ && __TBB_x86_64
     static void *(*r)(void*) = GenericScheduler::worker_routine;
-    int status = pthread_create( &thread_handle, NULL, r, this );
+    int status = pthread_create( &handle, NULL, r, this );
 #else
-    int status = pthread_create( &thread_handle, NULL, GenericScheduler::worker_routine, this );
+    int status = pthread_create( &handle, NULL, GenericScheduler::worker_routine, this );
 #endif
     if( status )
         handle_perror(status,"pthread_create");
diff --git a/src/tbbmalloc/MemoryAllocator.cpp b/src/tbbmalloc/MemoryAllocator.cpp
index 2e686b1..aa6a77c 100644
--- a/src/tbbmalloc/MemoryAllocator.cpp
+++ b/src/tbbmalloc/MemoryAllocator.cpp
@@ -1113,12 +1113,15 @@ static inline void *mallocLargeObject (size_t size)
 
     // TODO: can the requestedSize be optimized somehow?
     size_t requestedSize = size + sizeof(LargeObjectHeader) + blockSize;
+    /* errno should be tested because some Linux versions have a known issue
+       of returning non-NULL even if there is no memory */
+    errno = 0;
 #if USE_MALLOC_FOR_LARGE_OBJECT
     unalignedArea = malloc(requestedSize);
 #else
     unalignedArea = getMemory(requestedSize);
 #endif /* USE_MALLOC_FOR_LARGE_OBJECT */
-    if (errno || !unalignedArea) {
+    if (!unalignedArea || errno) {
         /* We can't get any more memory from the OS or executive so return 0 */
         return 0;
     }
diff --git a/src/test/test_allocator.h b/src/test/test_allocator.h
index bcb2e58..9942256 100644
--- a/src/test/test_allocator.h
+++ b/src/test/test_allocator.h
@@ -102,6 +102,12 @@ void TestBasic( A& a ) {
             s[j] = PseudoRandomValue(j,k);
     }
 
+    // Test hint argument. This can't be compiled when hint is void*, It should be const void*
+    typename A::pointer a_ptr;
+    const void * const_hint = NULL;    
+    a_ptr = a.allocate (1, const_hint);    
+    a.deallocate(a_ptr, 1);
+
     // Test "a.deallocate(p,n)
     for( size_t k=0; k<100; ++k ) {
         char* s = reinterpret_cast<char*>(reinterpret_cast<void*>(array[k]));
diff --git a/src/test/test_assembly.cpp b/src/test/test_assembly.cpp
index 6d030a4..4549a20 100644
--- a/src/test/test_assembly.cpp
+++ b/src/test/test_assembly.cpp
@@ -169,7 +169,13 @@ static void TestCompareExchange() {
     for( intptr a=-10; a<10; ++a )
         for( intptr b=-10; b<10; ++b )
             for( intptr c=-10; c<10; ++c ) {
+// Workaround for a bug in GCC 4.3.0; and one more is below.
+#if __GNUC__==4&&__GNUC_MINOR__==3&&__GNUC_PATCHLEVEL__==0
+                intptr x;
+		        __TBB_store_with_release( x, a );
+#else
                 intptr x = a;
+#endif
                 intptr y = __TBB_CompareAndSwapW(&x,b,c);
                 ASSERT( y==a, NULL ); 
                 if( a==c ) 
@@ -231,7 +237,11 @@ static void TestTinyLock() {
     unsigned char flags[16];
     for( int i=0; i<16; ++i )
         flags[i] = i;
+#if __GNUC__==4&&__GNUC_MINOR__==3&&__GNUC_PATCHLEVEL__==0
+    __TBB_store_with_release( flags[8], 0 );
+#else
     flags[8] = 0;
+#endif
     __TBB_LockByte(flags[8]);
     for( int i=0; i<16; ++i )
         ASSERT( flags[i]==(i==8?1:i), NULL );
diff --git a/src/test/test_malloc_pure_c.c b/src/test/test_malloc_pure_c.c
index 06d2264..9bc71ad 100644
--- a/src/test/test_malloc_pure_c.c
+++ b/src/test/test_malloc_pure_c.c
@@ -42,9 +42,14 @@
 int main(void)
 {
     size_t i, j;
-    void *p1=NULL, *p2=NULL;
-    for( i=0; i<=1<<16; ++i)
-        scalable_free(scalable_malloc(i));
+    void *p1, *p2;
+    for( i=0; i<=1<<16; ++i) {
+        p1 = scalable_malloc(i);
+        if( !p1 )
+            printf("Warning: there should be memory but scalable_malloc returned NULL\n");
+        scalable_free(p1);
+    }
+    p1 = p2 = NULL;
     for( i=1024*1024; ; i/=2 )
     {
         scalable_free(p1);
diff --git a/src/test/test_pipeline.cpp b/src/test/test_pipeline.cpp
index 0f755dc..cda4257 100644
--- a/src/test/test_pipeline.cpp
+++ b/src/test/test_pipeline.cpp
@@ -39,6 +39,17 @@ struct Buffer {
     Buffer() : id(-1), is_busy(false) {}
 };
 
+class waiting_probe {
+    size_t check_counter;
+public:
+    waiting_probe() : check_counter(0) {}
+    bool required( ) {
+        ++check_counter;
+        return !((check_counter+1)&size_t(0x7FFF));
+    }
+    void probe( ); // defined below
+};
+
 static size_t InputCounter;
 static const size_t MaxStreamSize = 1<<12;
 //! Maximum number of filters allowed
@@ -46,6 +57,7 @@ static const int MaxFilters = 5;
 static size_t StreamSize;
 static const size_t MaxBuffer = 8;
 static bool Done[MaxFilters][MaxStreamSize];
+static waiting_probe WaitTest;
 
 class MyFilter: public tbb::filter {
     bool* const my_done;
@@ -93,8 +105,10 @@ retry:
         ASSERT( &buffer[0] <= &b, NULL );
         ASSERT( &b <= &buffer[MaxBuffer-1], NULL ); 
         next_buffer = (next_buffer+1) % MaxBuffer;
-        if( !last_filter_is_ordered && b.is_busy ) 
+        if( !last_filter_is_ordered && b.is_busy ) {
+            WaitTest.probe();
             goto retry;
+        }
         ASSERT( !b.is_busy, "premature reuse of buffer");
         b.id = int(InputCounter++);
         b.is_busy = my_number_of_filters>1;
@@ -147,6 +161,15 @@ void TestTrivialpipeline( size_t nthread, int number_of_filters ) {
 #include "tbb/task_scheduler_init.h"
 #include "harness_cpu.h"
 
+static int nthread; // knowing number of threads is necessary to call TestCPUUserTime
+
+void waiting_probe::probe( ) {
+    if( nthread==1 ) return;
+    if( Verbose ) printf("emulating wait for input\n");
+    // Test that threads sleep while no work.
+    TestCPUUserTime(nthread-1);
+}
+
 int main( int argc, char* argv[] ) {
     // Default is at least one thread.
     MinThread = 1;
@@ -157,7 +180,7 @@ int main( int argc, char* argv[] ) {
     }
 
     // Test with varying number of threads.
-    for( int nthread=MinThread; nthread<=MaxThread; ++nthread ) {
+    for( nthread=MinThread; nthread<=MaxThread; ++nthread ) {
         // Initialize TBB task scheduler
         tbb::task_scheduler_init init( nthread );
 
diff --git a/src/test/test_yield.cpp b/src/test/test_yield.cpp
index 247eb94..866fc29 100644
--- a/src/test/test_yield.cpp
+++ b/src/test/test_yield.cpp
@@ -63,7 +63,7 @@ struct RoundRobin {
     }
 };
 
-int main( long argc, char* argv[] ) {
+int main( int argc, char* argv[] ) {
     // Set defaults
     MaxThread = MinThread = 3;
     ParseCommandLine( argc, argv );

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/tbb.git



More information about the debian-science-commits mailing list