[Aptitude-svn-commit] r4032 - in branches/aptitude-0.3/aptitude: . src/generic tests

Daniel Burrows dburrows at costa.debian.org
Thu Sep 1 20:17:59 UTC 2005


Author: dburrows
Date: Thu Sep  1 20:17:56 2005
New Revision: 4032

Added:
   branches/aptitude-0.3/aptitude/src/generic/event_queue.h
Modified:
   branches/aptitude-0.3/aptitude/ChangeLog
   branches/aptitude-0.3/aptitude/src/generic/Makefile.am
   branches/aptitude-0.3/aptitude/tests/test_threads.cc
Log:
Write a simple event queue abstraction.

Modified: branches/aptitude-0.3/aptitude/ChangeLog
==============================================================================
--- branches/aptitude-0.3/aptitude/ChangeLog	(original)
+++ branches/aptitude-0.3/aptitude/ChangeLog	Thu Sep  1 20:17:56 2005
@@ -1,3 +1,9 @@
+2005-09-01  Daniel Burrows  <dburrows at debian.org>
+
+	* src/generic/event_queue.h, tests/test_threads.cc:
+
+	  Write a simple event queue abstraction.
+
 2005-08-31  Daniel Burrows  <dburrows at debian.org>
 
 	* src/generic/threads.cc, src/generic/threads.h, test/test_threads.cc:

Modified: branches/aptitude-0.3/aptitude/src/generic/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/Makefile.am	(original)
+++ branches/aptitude-0.3/aptitude/src/generic/Makefile.am	Thu Sep  1 20:17:56 2005
@@ -29,6 +29,7 @@
 	apt_undo_group.cc\
 	config_signal.h \
 	config_signal.cc\
+	event_queue.h	\
 	exception.h	\
 	infer_reason.h  \
 	infer_reason.cc \

Added: branches/aptitude-0.3/aptitude/src/generic/event_queue.h
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/event_queue.h	Thu Sep  1 20:17:56 2005
@@ -0,0 +1,131 @@
+// channel.h
+//
+//   Copyright (C) 2005 Daniel Burrows
+//
+//   This program is free software; you can redistribute it and/or
+//   modify it under the terms of the GNU General Public License as
+//   published by the Free Software Foundation; either version 2 of
+//   the License, or (at your option) any later version.
+//
+//   This program is distributed in the hope that it will be useful,
+//   but WITHOUT ANY WARRANTY; without even the implied warranty of
+//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+//   General Public License for more details.
+//
+//   You should have received a copy of the GNU General Public License
+//   along with this program; see the file COPYING.  If not, write to
+//   the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+//   Boston, MA 02111-1307, USA.
+
+#ifndef EVENT_QUEUE_H
+#define EVENT_QUEUE_H
+
+#include "threads.h"
+
+#include <deque>
+
+namespace threads
+{
+  /** A simple unbounded communications channel suitable for use as,
+   *  eg, an event queue.  Writers never block (except as necessary to
+   *  maintain consistency), but readers block while the queue is
+   *  empty.  If there are multiple readers, they will receive results
+   *  in an arbitrary order.
+   *
+   *  This implementation is safe and flexible, but not terribly
+   *  efficient.  For instance, readers and writers block each other
+   *  out (other approaches can avoid this unless the queue is empty).
+   *  In aptitude it's used for the global event queue, which doesn't
+   *  get all that many deliveries, so it should be good enough.  Just
+   *  don't use it to stream bits off a network connection and then
+   *  complain I didn't warn you!
+   */
+  template<typename T>
+  class event_queue
+  {
+    std::deque<T> q;
+
+    condition c;
+    mutable mutex m;
+
+    struct not_empty
+    {
+      const std::deque<T> &q;
+    public:
+      not_empty(const std::deque<T> &_q)
+	:q(_q)
+      {
+      }
+
+      bool operator()() const
+      {
+	return !q.empty();
+      }
+    };
+
+    event_queue(const event_queue &other);
+    event_queue &operator=(const event_queue &other);
+  public:
+    /** Create an empty queue. */
+    event_queue()
+    {
+    }
+
+    ~event_queue()
+    {
+    }
+
+    /** Push the given value onto the event queue. */
+    void put(const T &t)
+    {
+      mutex::lock l(m);
+
+      q.push_back(t);
+      c.wake_one();
+    }
+
+    /** Retrieve a single value from the event queue. */
+    T get()
+    {
+      mutex::lock l(m);
+
+      c.wait(l, not_empty(q));
+      T rval = q.front();
+      q.pop_front();
+
+      return rval;
+    }
+
+    /** Retrieve a single value from the event queue if the queue is
+     *  non-empty.
+     *
+     *  \param out the location in which to store the retrieved value
+     *  \return \b true iff a value was retrieved.
+     */
+    bool try_get(T &out)
+    {
+      mutex::lock l(m);
+
+      if(q.empty())
+	return false;
+      else
+	{
+	  out = q.front();
+	  q.pop_front();
+	  return true;
+	}
+    }
+
+    /** Return \b true if the event queue is currently empty. */
+    bool empty() const
+    {
+      // Not sure the lock is required here, but it makes things a bit
+      // safer in case the STL is thread-unsafe in weird ways.
+      mutex::lock l(m);
+      bool rval = q.empty();
+      return rval;
+    }
+  };
+}
+
+#endif

Modified: branches/aptitude-0.3/aptitude/tests/test_threads.cc
==============================================================================
--- branches/aptitude-0.3/aptitude/tests/test_threads.cc	(original)
+++ branches/aptitude-0.3/aptitude/tests/test_threads.cc	Thu Sep  1 20:17:56 2005
@@ -19,6 +19,7 @@
 
 #include <cppunit/extensions/HelperMacros.h>
 
+#include <src/generic/event_queue.h>
 #include <src/generic/threads.h>
 
 #include <iostream>
@@ -28,6 +29,7 @@
   CPPUNIT_TEST_SUITE(TestThreads);
 
   CPPUNIT_TEST(testBox);
+  CPPUNIT_TEST(testEventQueue);
   CPPUNIT_TEST(testAutoDetach);
 
   CPPUNIT_TEST_SUITE_END();
@@ -89,6 +91,55 @@
       }
   }
 
+  class event_queue_write_thread
+  {
+    threads::event_queue<std::pair<int, int> > &eq;
+
+    int id, n;
+  public:
+    event_queue_write_thread(threads::event_queue<std::pair<int, int> > &_eq,
+			     int _id, int _n)
+      :eq(_eq), id(_id), n(_n)
+    {
+    }
+
+    void operator()() const
+    {
+      for(int i = 0; i < n; ++i)
+	eq.put(std::pair<int, int>(id, i));
+    }
+  };
+
+  void testEventQueue()
+  {
+    const int thread_count = 100;
+    const int thread_limit = 1000;
+
+    threads::event_queue<std::pair<int, int> > eq;
+
+    std::auto_ptr<threads::thread> writers[thread_count];
+    int last_thread_msg[thread_count];
+
+    for(int i = 0; i < thread_count; ++i)
+      last_thread_msg[i] = -1;
+
+    for(int i = 0; i < thread_count; ++i)
+      writers[i] = std::auto_ptr<threads::thread>(new threads::thread(event_queue_write_thread(eq, i, thread_limit)));
+
+    for(int i = 0; i < thread_count * thread_limit; ++i)
+      {
+	std::pair<int, int> next = eq.get();
+
+	CPPUNIT_ASSERT_EQUAL(next.second-1, last_thread_msg[next.first]);
+	last_thread_msg[next.first] = next.second;
+      }
+
+    for(int i = 0; i < thread_count; ++i)
+      writers[i]->join();
+
+    CPPUNIT_ASSERT(eq.empty());
+  }
+
   struct do_nothing
   {
   public:



More information about the Aptitude-svn-commit mailing list