[Aptitude-svn-commit] r4032 - in branches/aptitude-0.3/aptitude: .
src/generic tests
Daniel Burrows
dburrows at costa.debian.org
Thu Sep 1 20:17:59 UTC 2005
Author: dburrows
Date: Thu Sep 1 20:17:56 2005
New Revision: 4032
Added:
branches/aptitude-0.3/aptitude/src/generic/event_queue.h
Modified:
branches/aptitude-0.3/aptitude/ChangeLog
branches/aptitude-0.3/aptitude/src/generic/Makefile.am
branches/aptitude-0.3/aptitude/tests/test_threads.cc
Log:
Write a simple event queue abstraction.
Modified: branches/aptitude-0.3/aptitude/ChangeLog
==============================================================================
--- branches/aptitude-0.3/aptitude/ChangeLog (original)
+++ branches/aptitude-0.3/aptitude/ChangeLog Thu Sep 1 20:17:56 2005
@@ -1,3 +1,9 @@
+2005-09-01 Daniel Burrows <dburrows at debian.org>
+
+ * src/generic/event_queue.h, tests/test_threads.cc:
+
+ Write a simple event queue abstraction.
+
2005-08-31 Daniel Burrows <dburrows at debian.org>
* src/generic/threads.cc, src/generic/threads.h, test/test_threads.cc:
Modified: branches/aptitude-0.3/aptitude/src/generic/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/Makefile.am (original)
+++ branches/aptitude-0.3/aptitude/src/generic/Makefile.am Thu Sep 1 20:17:56 2005
@@ -29,6 +29,7 @@
apt_undo_group.cc\
config_signal.h \
config_signal.cc\
+ event_queue.h \
exception.h \
infer_reason.h \
infer_reason.cc \
Added: branches/aptitude-0.3/aptitude/src/generic/event_queue.h
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/event_queue.h Thu Sep 1 20:17:56 2005
@@ -0,0 +1,131 @@
+// channel.h
+//
+// Copyright (C) 2005 Daniel Burrows
+//
+// This program is free software; you can redistribute it and/or
+// modify it under the terms of the GNU General Public License as
+// published by the Free Software Foundation; either version 2 of
+// the License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to
+// the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+// Boston, MA 02111-1307, USA.
+
+#ifndef EVENT_QUEUE_H
+#define EVENT_QUEUE_H
+
+#include "threads.h"
+
+#include <deque>
+
+namespace threads
+{
+ /** A simple unbounded communications channel suitable for use as,
+ * eg, an event queue. Writers never block (except as necessary to
+ * maintain consistency), but readers block while the queue is
+ * empty. If there are multiple readers, they will receive results
+ * in an arbitrary order.
+ *
+ * This implementation is safe and flexible, but not terribly
+ * efficient. For instance, readers and writers block each other
+ * out (other approaches can avoid this unless the queue is empty).
+ * In aptitude it's used for the global event queue, which doesn't
+ * get all that many deliveries, so it should be good enough. Just
+ * don't use it to stream bits off a network connection and then
+ * complain I didn't warn you!
+ */
+ template<typename T>
+ class event_queue
+ {
+ std::deque<T> q;
+
+ condition c;
+ mutable mutex m;
+
+ struct not_empty
+ {
+ const std::deque<T> &q;
+ public:
+ not_empty(const std::deque<T> &_q)
+ :q(_q)
+ {
+ }
+
+ bool operator()() const
+ {
+ return !q.empty();
+ }
+ };
+
+ event_queue(const event_queue &other);
+ event_queue &operator=(const event_queue &other);
+ public:
+ /** Create an empty queue. */
+ event_queue()
+ {
+ }
+
+ ~event_queue()
+ {
+ }
+
+ /** Push the given value onto the event queue. */
+ void put(const T &t)
+ {
+ mutex::lock l(m);
+
+ q.push_back(t);
+ c.wake_one();
+ }
+
+ /** Retrieve a single value from the event queue. */
+ T get()
+ {
+ mutex::lock l(m);
+
+ c.wait(l, not_empty(q));
+ T rval = q.front();
+ q.pop_front();
+
+ return rval;
+ }
+
+ /** Retrieve a single value from the event queue if the queue is
+ * non-empty.
+ *
+ * \param out the location in which to store the retrieved value
+ * \return \b true iff a value was retrieved.
+ */
+ bool try_get(T &out)
+ {
+ mutex::lock l(m);
+
+ if(q.empty())
+ return false;
+ else
+ {
+ out = q.front();
+ q.pop_front();
+ return true;
+ }
+ }
+
+ /** Return \b true if the event queue is currently empty. */
+ bool empty() const
+ {
+ // Not sure the lock is required here, but it makes things a bit
+ // safer in case the STL is thread-unsafe in weird ways.
+ mutex::lock l(m);
+ bool rval = q.empty();
+ return rval;
+ }
+ };
+}
+
+#endif
Modified: branches/aptitude-0.3/aptitude/tests/test_threads.cc
==============================================================================
--- branches/aptitude-0.3/aptitude/tests/test_threads.cc (original)
+++ branches/aptitude-0.3/aptitude/tests/test_threads.cc Thu Sep 1 20:17:56 2005
@@ -19,6 +19,7 @@
#include <cppunit/extensions/HelperMacros.h>
+#include <src/generic/event_queue.h>
#include <src/generic/threads.h>
#include <iostream>
@@ -28,6 +29,7 @@
CPPUNIT_TEST_SUITE(TestThreads);
CPPUNIT_TEST(testBox);
+ CPPUNIT_TEST(testEventQueue);
CPPUNIT_TEST(testAutoDetach);
CPPUNIT_TEST_SUITE_END();
@@ -89,6 +91,55 @@
}
}
+ class event_queue_write_thread
+ {
+ threads::event_queue<std::pair<int, int> > &eq;
+
+ int id, n;
+ public:
+ event_queue_write_thread(threads::event_queue<std::pair<int, int> > &_eq,
+ int _id, int _n)
+ :eq(_eq), id(_id), n(_n)
+ {
+ }
+
+ void operator()() const
+ {
+ for(int i = 0; i < n; ++i)
+ eq.put(std::pair<int, int>(id, i));
+ }
+ };
+
+ void testEventQueue()
+ {
+ const int thread_count = 100;
+ const int thread_limit = 1000;
+
+ threads::event_queue<std::pair<int, int> > eq;
+
+ std::auto_ptr<threads::thread> writers[thread_count];
+ int last_thread_msg[thread_count];
+
+ for(int i = 0; i < thread_count; ++i)
+ last_thread_msg[i] = -1;
+
+ for(int i = 0; i < thread_count; ++i)
+ writers[i] = std::auto_ptr<threads::thread>(new threads::thread(event_queue_write_thread(eq, i, thread_limit)));
+
+ for(int i = 0; i < thread_count * thread_limit; ++i)
+ {
+ std::pair<int, int> next = eq.get();
+
+ CPPUNIT_ASSERT_EQUAL(next.second-1, last_thread_msg[next.first]);
+ last_thread_msg[next.first] = next.second;
+ }
+
+ for(int i = 0; i < thread_count; ++i)
+ writers[i]->join();
+
+ CPPUNIT_ASSERT(eq.empty());
+ }
+
struct do_nothing
{
public:
More information about the Aptitude-svn-commit
mailing list