[Aptitude-svn-commit] r4031 - in branches/aptitude-0.3/aptitude: .
src/generic tests
Daniel Burrows
dburrows at costa.debian.org
Thu Sep 1 05:29:07 UTC 2005
Author: dburrows
Date: Thu Sep 1 05:29:03 2005
New Revision: 4031
Added:
branches/aptitude-0.3/aptitude/src/generic/threads.cc
branches/aptitude-0.3/aptitude/src/generic/threads.h
branches/aptitude-0.3/aptitude/tests/test_threads.cc
Modified:
branches/aptitude-0.3/aptitude/ChangeLog
branches/aptitude-0.3/aptitude/src/generic/Makefile.am
branches/aptitude-0.3/aptitude/tests/Makefile.am
Log:
Add a threading abstraction layer.
Modified: branches/aptitude-0.3/aptitude/ChangeLog
==============================================================================
--- branches/aptitude-0.3/aptitude/ChangeLog (original)
+++ branches/aptitude-0.3/aptitude/ChangeLog Thu Sep 1 05:29:03 2005
@@ -1,5 +1,12 @@
2005-08-31 Daniel Burrows <dburrows at debian.org>
+ * src/generic/threads.cc, src/generic/threads.h, test/test_threads.cc:
+
+ Add some generic code that wraps pthreads in a less error-prone
+ interface, based on boost::threads (didn't just use
+ boost::threads because of the unstable interface and the extra
+ dependency it would introduce).
+
* src/generic/immset.h:
When extracting and removing the minimum element of a tree,
Modified: branches/aptitude-0.3/aptitude/src/generic/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/Makefile.am (original)
+++ branches/aptitude-0.3/aptitude/src/generic/Makefile.am Thu Sep 1 05:29:03 2005
@@ -47,6 +47,8 @@
tags.cc \
tasks.h \
tasks.cc \
+ threads.cc \
+ threads.h \
undo.h \
undo.cc \
util.h \
Added: branches/aptitude-0.3/aptitude/src/generic/threads.cc
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/threads.cc Thu Sep 1 05:29:03 2005
@@ -0,0 +1,72 @@
+// threads.cc
+//
+// Copyright (C) 2005 Daniel Burrows
+//
+// This program is free software; you can redistribute it and/or
+// modify it under the terms of the GNU General Public License as
+// published by the Free Software Foundation; either version 2 of
+// the License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to
+// the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+// Boston, MA 02111-1307, USA.
+
+#include "threads.h"
+
+#include "util.h"
+
+#include "../aptitude.h"
+
+#include <errno.h>
+
+namespace threads
+{
+ std::string ThreadCreateException::errmsg() const
+ {
+ return _("Not enough resources to create thread");
+ }
+
+ ThreadJoinException::ThreadJoinException(int error)
+ {
+ std::string msg;
+
+ // Deliberately untranslated as these refer to errors that are
+ // internal to the program; the user can't do anything about them.
+ switch(error)
+ {
+ case ESRCH:
+ msg = "Invalid thread ID.";
+ break;
+ case EINVAL:
+ msg = "Thread previously detached or joined";
+ break;
+ case EDEADLK:
+ msg = "Deadlock (attempt to self-join)";
+ break;
+ }
+
+ reason = ssprintf("Unable to join thread: %s", msg.c_str());
+ }
+
+ std::string ThreadJoinException::errmsg() const
+ {
+ return reason;
+ }
+
+ std::string ConditionNotLockedException::errmsg() const
+ {
+ return "Attempt to wait on a condition with a non-locked mutex";
+ }
+
+ std::string DoubleLockException::errmsg() const
+ {
+ return "Mutex double-locked";
+ }
+}
+
Added: branches/aptitude-0.3/aptitude/src/generic/threads.h
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/threads.h Thu Sep 1 05:29:03 2005
@@ -0,0 +1,564 @@
+// threads.h -*-c++-*-
+//
+// Copyright (C) 2005 Daniel Burrows
+//
+// This program is free software; you can redistribute it and/or
+// modify it under the terms of the GNU General Public License as
+// published by the Free Software Foundation; either version 2 of
+// the License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to
+// the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+// Boston, MA 02111-1307, USA.
+//
+// A simple thread wrapper library. I'm not using the existing ones
+// in order to keep aptitude's dependency count low (as long as I
+// don't need too much out of it, this should be fairly
+// simple..right?). The API was inspired by that of boost::threads.
+
+#ifndef THREADS_H
+#define THREADS_H
+
+#include "exception.h"
+
+namespace threads
+{
+ /** Base exception class for thread errors. */
+ class ThreadException : public Exception
+ {
+ };
+
+ /** Creation failure; according to pthread_create(3), this only occurs
+ * if there aren't enough system resources to create a thread.
+ */
+ class ThreadCreateException : public ThreadException
+ {
+ public:
+ std::string errmsg() const;
+ };
+
+ /** Join failure. */
+ class ThreadJoinException : public ThreadException
+ {
+ std::string reason;
+ public:
+ ThreadJoinException(const int error);
+
+ std::string errmsg() const;
+ };
+
+ /** Thrown when code tries to wait on a condition and passes a
+ * released lock in.
+ */
+ class ConditionNotLockedException : public ThreadException
+ {
+ public:
+ std::string errmsg() const;
+ };
+
+ /** Thrown when a mutex is locked twice. */
+ class DoubleLockException : public ThreadException
+ {
+ public:
+ std::string errmsg() const;
+ };
+
+ /** A thread class based on the Boost thread class. Like the Boost
+ * class, it is non-copyable.
+ */
+ class thread
+ {
+ pthread_t tid;
+ bool joined;
+
+ thread(const thread &other);
+ thread &operator=(const thread &other);
+
+
+
+ template<typename F>
+ static void *bootstrap(void *p)
+ {
+ F thunk(*((F *) p));
+
+ delete ((F *) p);
+
+ thunk();
+
+ return 0;
+ }
+
+ public:
+ class attr
+ {
+ pthread_attr_t attrs;
+
+ friend class thread;
+ public:
+ attr()
+ {
+ pthread_attr_init(&attrs);
+ }
+
+ // All attributes except detach state can be manipulated (detach
+ // state is left at PTHREAD_CREATE_JOINABLE).
+
+ void set_inherit_sched(int i)
+ {
+ pthread_attr_setinheritsched(&attrs, i);
+ }
+
+ int get_inherit_sched() const
+ {
+ int rval;
+ pthread_attr_getinheritsched(&attrs, &rval);
+ return rval;
+ }
+
+ void set_sched_param(const sched_param &sp)
+ {
+ pthread_attr_setschedparam(&attrs, &sp);
+ }
+
+ sched_param get_sched_param() const
+ {
+ sched_param rval;
+ pthread_attr_getschedparam(&attrs, &rval);
+ return rval;
+ }
+
+ void set_sched_policy(int p)
+ {
+ pthread_attr_setschedpolicy(&attrs, p);
+ }
+
+ int get_sched_policy() const
+ {
+ int rval;
+ pthread_attr_getschedpolicy(&attrs, &rval);
+ return rval;
+ }
+
+ void set_scope(int p)
+ {
+ pthread_attr_setscope(&attrs, p);
+ }
+
+ int get_scope() const
+ {
+ int rval;
+ pthread_attr_getscope(&attrs, &rval);
+ return rval;
+ }
+
+ ~attr()
+ {
+ pthread_attr_destroy(&attrs);
+ }
+ };
+
+ /** Create a new thread.
+ *
+ * \param thunk a function object of no parameters that should
+ * be invoked to start this thread. Must be copyable.
+ *
+ * \param a the attributes with which to create the new thread.
+ */
+ template<typename F>
+ thread(const F &thunk, const attr &a = attr())
+ :joined(false)
+ {
+ // Create a thunk on the heap to pass to the new thread.
+ F *tmp = new F(thunk);
+
+ if(pthread_create(&tid, &a.attrs, &thread::bootstrap<F>, tmp) != 0)
+ {
+ delete tmp;
+
+ throw ThreadCreateException();
+ }
+ }
+
+ ~thread()
+ {
+ if(!joined)
+ pthread_detach(tid);
+ }
+
+ /** Wait for this thread to finish. */
+ void join()
+ {
+ int rval = pthread_join(tid, NULL);
+
+ if(rval != 0)
+ throw ThreadJoinException(rval);
+ else
+ joined = true;
+ }
+ };
+
+ class condition;
+
+ // The mutex abstraction
+ class mutex
+ {
+ public:
+ class lock;
+ class try_lock;
+
+ private:
+ pthread_mutex_t m;
+
+ friend class lock;
+ friend class try_lock;
+
+ // Conditions need to look inside mutexes and locks to find the
+ // real mutex object so the underlying thread library can do an
+ // atomic unlock-and-wait.
+ friend class condition;
+
+ mutex(const mutex &other);
+ mutex &operator=(const mutex &other);
+ public:
+ /** Represents a lock on a mutex. Can be released and re-asserted
+ * as desired; when the lock goes out of scope, it will
+ * automatically be released if necessary.
+ */
+ class lock
+ {
+ mutex &parent;
+
+ bool locked;
+
+ friend class condition;
+
+ lock(const lock &other);
+ lock &operator=(const lock &other);
+ public:
+ lock(mutex &_parent)
+ :parent(_parent), locked(false)
+ {
+ acquire();
+ }
+
+ /** Lock the associated mutex. */
+ void acquire()
+ {
+ if(locked)
+ throw DoubleLockException();
+
+ pthread_mutex_lock(&parent.m);
+ locked = true;
+ }
+
+ /** Unlock the associated mutex. */
+ void release()
+ {
+ pthread_mutex_unlock(&parent.m);
+ locked = false;
+ }
+
+ bool get_locked() const
+ {
+ return locked;
+ }
+
+ ~lock()
+ {
+ if(locked)
+ pthread_mutex_unlock(&parent.m);
+ }
+ };
+
+ /** Represents a non-blocking lock on a mutex. */
+ class try_lock
+ {
+ mutex &parent;
+
+ bool locked;
+
+ friend class condition;
+
+ try_lock(const try_lock &other);
+ try_lock &operator=(const try_lock &other);
+ public:
+ try_lock(mutex &_parent)
+ :parent(_parent)
+ {
+ acquire();
+ }
+
+ ~try_lock()
+ {
+ if(locked)
+ pthread_mutex_unlock(&parent.m);
+ }
+
+ void acquire()
+ {
+ if(locked)
+ throw DoubleLockException();
+
+ locked = pthread_mutex_trylock(&parent.m);
+ }
+
+ void release()
+ {
+ pthread_mutex_unlock(&parent.m);
+ locked = false;
+ }
+
+ bool get_locked() const
+ {
+ return locked;
+ }
+ };
+
+ mutex()
+ {
+ pthread_mutex_init(&m, NULL);
+ }
+
+ ~mutex()
+ {
+ pthread_mutex_destroy(&m);
+ }
+ };
+
+ /** A abstraction over conditions. When a condition variable is
+ * destroyed, any threads that are still blocked on it are woken
+ * up.
+ */
+ class condition
+ {
+ pthread_cond_t cond;
+ public:
+ condition()
+ {
+ pthread_cond_init(&cond, NULL);
+ }
+
+ ~condition()
+ {
+ // Wakey wakey
+ pthread_cond_broadcast(&cond);
+ pthread_cond_destroy(&cond);
+ }
+
+ void wake_one()
+ {
+ pthread_cond_signal(&cond);
+ }
+
+ void wake_all()
+ {
+ pthread_cond_broadcast(&cond);
+ }
+
+ /** Wait with the given guard (should be a lock type that is a
+ * friend of this condition object).
+ */
+ template<typename Lock>
+ void wait(const Lock &l)
+ {
+ if(!l.get_locked())
+ throw ConditionNotLockedException();
+
+ pthread_cond_wait(&cond, &l.parent.m);
+ }
+
+ /** Wait until the given predicate returns \b true. */
+ template<typename Lock, typename Pred>
+ void wait(const Lock &l, Pred p)
+ {
+ if(!l.get_locked())
+ throw ConditionNotLockedException();
+
+ while(!p())
+ wait(l);
+ }
+ };
+
+ /** A higher-level abstraction borrowed from Concurrent Haskell,
+ * which borrowed it from another language I forget. This
+ * represents a "box" that can either hold a value or be empty.
+ * Any thread can take the current value of the box or place a new
+ * value inside it; the attempt will block until a value is
+ * available or the box is empty, respectively. It's sort of a
+ * single-element bounded communications channel.
+ *
+ * The value in the box is stored with copying semantics. Like the
+ * other threading primitives, boxes are not copyable.
+ */
+ template<typename T>
+ class box
+ {
+ T val;
+ bool filled;
+
+ condition cond;
+ mutex m;
+ public:
+ /** Create an empty box. */
+ box()
+ :filled(false)
+ {
+ }
+
+ /** Create a box containing the given value. */
+ box(const T &_val)
+ :val(_val), filled(true)
+ {
+ }
+
+ /** Retrieve the current value of this box. If the box is empty,
+ * block until it is full.
+ */
+ T take();
+
+ /** Fill this box with a value. If the box is full, block until
+ * it is empty.
+ */
+ void put(const T &t);
+
+ /** If there is a value in the box, retrieve it immediately;
+ * otherwise do nothing.
+ *
+ * \param out the location in which the value should be stored
+ * \return \b true iff a value was found in the box
+ */
+ bool try_take(T &out);
+
+ /** If the box is empty, place a value in it; otherwise, do
+ * nothing.
+ *
+ * \param t the value to place in the box
+ *
+ * \return \b true iff the box was empty (and hence was filled
+ * with t)
+ */
+ bool try_put(const T &t);
+
+ /** Atomically modify the contents of the box; if an exception is
+ * thrown by the given function object, no action will be
+ * performed.
+ */
+ template<typename Mutator>
+ void box::update(const Mutator &m);
+ };
+
+ /** Internal helper struct. */
+ struct bool_ref_pred
+ {
+ const bool &b;
+ public:
+ bool_ref_pred(const bool &_b)
+ :b(_b)
+ {
+ }
+
+ bool operator()() const
+ {
+ return b;
+ }
+ };
+
+ /** Internal helper struct. */
+ struct not_bool_ref_pred
+ {
+ const bool &b;
+ public:
+ not_bool_ref_pred(const bool &_b)
+ :b(_b)
+ {
+ }
+
+ bool operator()() const
+ {
+ return !b;
+ }
+ };
+
+ template<typename T>
+ T box<T>::take()
+ {
+ mutex::lock l(m);
+
+ cond.wait(l, bool_ref_pred(filled));
+
+ assert(filled);
+
+ filled = false;
+
+ // Interesting question: does l get released before or after the
+ // copy? To be safe, I explicitly copy before I return.
+ T rval = val;
+ return rval;
+ }
+
+ template<typename T>
+ bool box<T>::try_take(T &out)
+ {
+ mutex::lock l(m);
+
+ if(filled)
+ {
+ filled = false;
+ out = val;
+ return true;
+ }
+ else
+ return false;
+ }
+
+ template<typename T>
+ void box<T>::put(const T &new_val)
+ {
+ mutex::lock l(m);
+
+ cond.wait(l, not_bool_ref_pred(filled));
+
+ filled = true;
+ val = new_val;
+ cond.wake_one();
+ }
+
+ template<typename T>
+ bool box<T>::try_put(const T &new_val)
+ {
+ mutex::lock l(m);
+
+ if(!filled)
+ {
+ filled = true;
+ val = new_val;
+ cond.wake_one();
+ return true;
+ }
+ else
+ return false;
+ }
+
+ template<typename T>
+ template<typename Mutator>
+ void box<T>::update(const Mutator &m)
+ {
+ mutex::lock l(m);
+
+ cond.wait(l, bool_ref_pred(filled));
+
+ T new_val = m(val);
+
+ val = new_val;
+ cond.wake_one();
+ }
+}
+
+#endif
+
Modified: branches/aptitude-0.3/aptitude/tests/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/tests/Makefile.am (original)
+++ branches/aptitude-0.3/aptitude/tests/Makefile.am Thu Sep 1 05:29:03 2005
@@ -22,4 +22,5 @@
test_resolver.cc \
test_setset.cc \
test_tags.cc \
+ test_threads.cc \
test_wtree.cc
Added: branches/aptitude-0.3/aptitude/tests/test_threads.cc
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/tests/test_threads.cc Thu Sep 1 05:29:03 2005
@@ -0,0 +1,106 @@
+// test_threads.cc
+//
+// Copyright (C) 2005 Daniel Burrows
+//
+// This program is free software; you can redistribute it and/or
+// modify it under the terms of the GNU General Public License as
+// published by the Free Software Foundation; either version 2 of
+// the License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; see the file COPYING. If not, write to
+// the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+// Boston, MA 02111-1307, USA.
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <src/generic/threads.h>
+
+#include <iostream>
+
+class TestThreads : public CppUnit::TestFixture
+{
+ CPPUNIT_TEST_SUITE(TestThreads);
+
+ CPPUNIT_TEST(testBox);
+ CPPUNIT_TEST(testAutoDetach);
+
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ // Ye Olde Addition Thread
+ struct add_thread
+ {
+ int n;
+ threads::box<int> &count;
+ public:
+ add_thread(int _n, threads::box<int> &_count)
+ :n(_n), count(_count)
+ {
+ }
+
+ void operator()() const
+ {
+ for(int i = 0; i < n; ++i)
+ count.put(count.take()+1);
+ }
+ };
+
+ void do_testBox()
+ {
+ const int thread_count = 50;
+ const int thread_limit = 1000;
+
+ threads::box<int> b(100);
+
+ CPPUNIT_ASSERT_EQUAL(100, b.take());
+
+ std::auto_ptr<threads::thread> writers[thread_count];
+
+ for(int i = 0; i<thread_count; ++i)
+ writers[i] = std::auto_ptr<threads::thread>(new threads::thread(add_thread(thread_limit, b)));
+
+ int foo;
+ CPPUNIT_ASSERT(!b.try_take(foo));
+
+ CPPUNIT_ASSERT(b.try_put(-1));
+
+ for(int i = 0; i<thread_count; ++i)
+ writers[i]->join();
+
+ CPPUNIT_ASSERT_EQUAL(thread_count*thread_limit-1, b.take());
+ }
+
+ void testBox()
+ {
+ try
+ {
+ do_testBox();
+ }
+ catch(const Exception &e)
+ {
+ std::cerr << "Caught exception in testBox: " << e.errmsg() << std::endl;
+ throw;
+ }
+ }
+
+ struct do_nothing
+ {
+ public:
+ void operator()() const
+ {
+ }
+ };
+
+ void testAutoDetach()
+ {
+ threads::thread t(do_nothing());
+ }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(TestThreads);
More information about the Aptitude-svn-commit
mailing list