[Pkg-running-devel] [antpm] 17/48: lqueue3: add lqueue3_bg

Kristof Ralovich ralovich-guest at moszumanska.debian.org
Mon Aug 11 10:10:32 UTC 2014


This is an automated email from the git hooks/post-receive script.

ralovich-guest pushed a commit to branch upstream
in repository antpm.

commit 3a77f08aad2a7c8e503eec0ce1ae6b25679fc556
Author: RALOVICH, Kristof <tade60 at freemail.hu>
Date:   Wed Mar 26 12:48:52 2014 +0100

    lqueue3: add lqueue3_bg
    
    implements push consumer, with event dispatch in background thread
---
 src/lqueue.hpp | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 77 insertions(+)

diff --git a/src/lqueue.hpp b/src/lqueue.hpp
index ad064e5..c2223e7 100644
--- a/src/lqueue.hpp
+++ b/src/lqueue.hpp
@@ -244,3 +244,80 @@ public:
 };
 
 
+/// implements push consumer, with event dispatch in background thread
+template < class DataType>
+class lqueue3_bg : public lqueue2<DataType>
+{
+public:
+  typedef boost::function<bool (DataType&)>     Listener;
+  typedef boost::function<bool (std::vector<DataType>&)> Listener2;
+  typedef lqueue2<DataType>                              Super;
+
+  struct ListenerProc
+  {
+    void operator() (lqueue3_bg* This)
+    {
+      This->eventLoop();
+    }
+  };
+
+
+  lqueue3_bg()
+    : stop(false)
+  {
+    th_listener.reset( new boost::thread(lp, this) );
+  }
+
+  ~lqueue3_bg()
+  {
+    kill();
+  }
+
+  void
+  kill()
+  {
+    stop = true;
+    if(th_listener.get())
+      th_listener->join();
+  }
+
+  void
+  setOnDataArrivedCallback(Listener2 l)
+  {
+    mCallback = l;
+  }
+
+public:
+  void eventLoop()
+  {
+    while(!stop)
+    {
+      boost::unique_lock<boost::mutex> lock(Super::m_mtx);
+
+      boost::posix_time::time_duration td = boost::posix_time::milliseconds(2000);
+      if(!Super::m_pushEvent.timed_wait(lock, td)) // will automatically and atomically unlock mutex while it waits
+      {
+        //std::cout << "no event before timeout\n";
+        continue;
+      }
+
+      if(Super::m_q.empty())
+        continue; // spurious wakeup
+      size_t s = Super::m_q.size();
+      std::vector<DataType> v(s);
+      for(size_t i = 0; i < s; i++)
+      {
+        v[i] = Super::m_q.front();
+        Super::m_q.pop_front();
+      }
+      if(mCallback)
+        /*bool rv =*/ mCallback(v);
+    }
+  }
+
+protected:
+  ListenerProc lp;
+  boost::scoped_ptr<boost::thread> th_listener;
+  volatile bool stop;
+  Listener2 mCallback;
+};

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-running/antpm.git



More information about the Pkg-running-devel mailing list