[Aptitude-svn-commit] r4031 - in branches/aptitude-0.3/aptitude: . src/generic tests

Daniel Burrows dburrows at costa.debian.org
Thu Sep 1 05:29:07 UTC 2005


Author: dburrows
Date: Thu Sep  1 05:29:03 2005
New Revision: 4031

Added:
   branches/aptitude-0.3/aptitude/src/generic/threads.cc
   branches/aptitude-0.3/aptitude/src/generic/threads.h
   branches/aptitude-0.3/aptitude/tests/test_threads.cc
Modified:
   branches/aptitude-0.3/aptitude/ChangeLog
   branches/aptitude-0.3/aptitude/src/generic/Makefile.am
   branches/aptitude-0.3/aptitude/tests/Makefile.am
Log:
Add a threading abstraction layer.

Modified: branches/aptitude-0.3/aptitude/ChangeLog
==============================================================================
--- branches/aptitude-0.3/aptitude/ChangeLog	(original)
+++ branches/aptitude-0.3/aptitude/ChangeLog	Thu Sep  1 05:29:03 2005
@@ -1,5 +1,12 @@
 2005-08-31  Daniel Burrows  <dburrows at debian.org>
 
+	* src/generic/threads.cc, src/generic/threads.h, test/test_threads.cc:
+
+	  Add some generic code that wraps pthreads in a less error-prone
+	  interface, based on boost::threads (didn't just use
+	  boost::threads because of the unstable interface and the extra
+	  dependency it would introduce).
+
 	* src/generic/immset.h:
 
 	  When extracting and removing the minimum element of a tree,

Modified: branches/aptitude-0.3/aptitude/src/generic/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/Makefile.am	(original)
+++ branches/aptitude-0.3/aptitude/src/generic/Makefile.am	Thu Sep  1 05:29:03 2005
@@ -47,6 +47,8 @@
 	tags.cc		\
 	tasks.h		\
 	tasks.cc	\
+	threads.cc	\
+	threads.h	\
 	undo.h		\
 	undo.cc		\
 	util.h		\

Added: branches/aptitude-0.3/aptitude/src/generic/threads.cc
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/threads.cc	Thu Sep  1 05:29:03 2005
@@ -0,0 +1,72 @@
+// threads.cc
+//
+//   Copyright (C) 2005 Daniel Burrows
+//
+//   This program is free software; you can redistribute it and/or
+//   modify it under the terms of the GNU General Public License as
+//   published by the Free Software Foundation; either version 2 of
+//   the License, or (at your option) any later version.
+//
+//   This program is distributed in the hope that it will be useful,
+//   but WITHOUT ANY WARRANTY; without even the implied warranty of
+//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+//   General Public License for more details.
+//
+//   You should have received a copy of the GNU General Public License
+//   along with this program; see the file COPYING.  If not, write to
+//   the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+//   Boston, MA 02111-1307, USA.
+
+#include "threads.h"
+
+#include "util.h"
+
+#include "../aptitude.h"
+
+#include <errno.h>
+
+namespace threads
+{
+  std::string ThreadCreateException::errmsg() const
+  {
+    return _("Not enough resources to create thread");
+  }
+
+  ThreadJoinException::ThreadJoinException(int error)
+  {
+    std::string msg;
+
+    // Deliberately untranslated as these refer to errors that are
+    // internal to the program; the user can't do anything about them.
+    switch(error)
+      {
+      case ESRCH:
+	msg = "Invalid thread ID.";
+	break;
+      case EINVAL:
+	msg = "Thread previously detached or joined";
+	break;
+      case EDEADLK:
+	msg = "Deadlock (attempt to self-join)";
+	break;
+      }
+
+    reason = ssprintf("Unable to join thread: %s", msg.c_str());
+  }
+
+  std::string ThreadJoinException::errmsg() const
+  {
+    return reason;
+  }
+
+  std::string ConditionNotLockedException::errmsg() const
+  {
+    return "Attempt to wait on a condition with a non-locked mutex";
+  }
+
+  std::string DoubleLockException::errmsg() const
+  {
+    return "Mutex double-locked";
+  }
+}
+

Added: branches/aptitude-0.3/aptitude/src/generic/threads.h
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/src/generic/threads.h	Thu Sep  1 05:29:03 2005
@@ -0,0 +1,564 @@
+// threads.h                                              -*-c++-*-
+//
+//   Copyright (C) 2005 Daniel Burrows
+//
+//   This program is free software; you can redistribute it and/or
+//   modify it under the terms of the GNU General Public License as
+//   published by the Free Software Foundation; either version 2 of
+//   the License, or (at your option) any later version.
+//
+//   This program is distributed in the hope that it will be useful,
+//   but WITHOUT ANY WARRANTY; without even the implied warranty of
+//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+//   General Public License for more details.
+//
+//   You should have received a copy of the GNU General Public License
+//   along with this program; see the file COPYING.  If not, write to
+//   the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+//   Boston, MA 02111-1307, USA.
+//
+// A simple thread wrapper library.  I'm not using the existing ones
+// in order to keep aptitude's dependency count low (as long as I
+// don't need too much out of it, this should be fairly
+// simple..right?).  The API was inspired by that of boost::threads.
+
+#ifndef THREADS_H
+#define THREADS_H
+
+#include "exception.h"
+
+namespace threads
+{
+  /** Base exception class for thread errors. */
+  class ThreadException : public Exception
+  {
+  };
+
+  /** Creation failure; according to pthread_create(3), this only occurs
+   *  if there aren't enough system resources to create a thread.
+   */
+  class ThreadCreateException : public ThreadException
+  {
+  public:
+    std::string errmsg() const;
+  };
+
+  /** Join failure. */
+  class ThreadJoinException : public ThreadException
+  {
+    std::string reason;
+  public:
+    ThreadJoinException(const int error);
+
+    std::string errmsg() const;
+  };
+
+  /** Thrown when code tries to wait on a condition and passes a
+   *  released lock in.
+   */
+  class ConditionNotLockedException : public ThreadException
+  {
+  public:
+    std::string errmsg() const;
+  };
+
+  /** Thrown when a mutex is locked twice. */
+  class DoubleLockException : public ThreadException
+  {
+  public:
+    std::string errmsg() const;
+  };
+
+  /** A thread class based on the Boost thread class.  Like the Boost
+   *  class, it is non-copyable.
+   */
+  class thread
+  {
+    pthread_t tid;
+    bool joined;
+
+    thread(const thread &other);
+    thread &operator=(const thread &other);
+
+
+
+    template<typename F>
+    static void *bootstrap(void *p)
+    {
+      F thunk(*((F *) p));
+
+      delete ((F *) p);
+
+      thunk();
+
+      return 0;
+    }
+
+  public:
+    class attr
+    {
+      pthread_attr_t attrs;
+
+      friend class thread;
+    public:
+      attr()
+      {
+	pthread_attr_init(&attrs);
+      }
+
+      // All attributes except detach state can be manipulated (detach
+      // state is left at PTHREAD_CREATE_JOINABLE).
+
+      void set_inherit_sched(int i)
+      {
+	pthread_attr_setinheritsched(&attrs, i);
+      }
+
+      int get_inherit_sched() const
+      {
+	int rval;
+	pthread_attr_getinheritsched(&attrs, &rval);
+	return rval;
+      }
+
+      void set_sched_param(const sched_param &sp)
+      {
+	pthread_attr_setschedparam(&attrs, &sp);
+      }
+
+      sched_param get_sched_param() const
+      {
+	sched_param rval;
+	pthread_attr_getschedparam(&attrs, &rval);
+	return rval;
+      }
+
+      void set_sched_policy(int p)
+      {
+	pthread_attr_setschedpolicy(&attrs, p);
+      }
+
+      int get_sched_policy() const
+      {
+	int rval;
+	pthread_attr_getschedpolicy(&attrs, &rval);
+	return rval;
+      }
+
+      void set_scope(int p)
+      {
+	pthread_attr_setscope(&attrs, p);
+      }
+
+      int get_scope() const
+      {
+	int rval;
+	pthread_attr_getscope(&attrs, &rval);
+	return rval;
+      }
+
+      ~attr()
+      {
+	pthread_attr_destroy(&attrs);
+      }
+    };
+
+    /** Create a new thread.
+     *
+     *  \param thunk a function object of no parameters that should
+     *  be invoked to start this thread.  Must be copyable.
+     *
+     *  \param a the attributes with which to create the new thread.
+     */
+    template<typename F>
+    thread(const F &thunk, const attr &a = attr())
+      :joined(false)
+    {
+      // Create a thunk on the heap to pass to the new thread.
+      F *tmp = new F(thunk);
+
+      if(pthread_create(&tid, &a.attrs, &thread::bootstrap<F>, tmp) != 0)
+	{
+	  delete tmp;
+
+	  throw ThreadCreateException();
+	}
+    }
+
+    ~thread()
+    {
+      if(!joined)
+	pthread_detach(tid);
+    }
+
+    /** Wait for this thread to finish. */
+    void join()
+    {
+      int rval = pthread_join(tid, NULL);
+
+      if(rval != 0)
+	throw ThreadJoinException(rval);
+      else
+	joined = true;
+    }
+  };
+
+  class condition;
+
+  // The mutex abstraction
+  class mutex
+  {
+  public:
+    class lock;
+    class try_lock;
+
+  private:
+    pthread_mutex_t m;
+
+    friend class lock;
+    friend class try_lock;
+
+    // Conditions need to look inside mutexes and locks to find the
+    // real mutex object so the underlying thread library can do an
+    // atomic unlock-and-wait.
+    friend class condition;
+
+    mutex(const mutex &other);
+    mutex &operator=(const mutex &other);
+  public:
+    /** Represents a lock on a mutex.  Can be released and re-asserted
+     *  as desired; when the lock goes out of scope, it will
+     *  automatically be released if necessary.
+     */
+    class lock
+    {
+      mutex &parent;
+
+      bool locked;
+
+      friend class condition;
+
+      lock(const lock &other);
+      lock &operator=(const lock &other);
+    public:
+      lock(mutex &_parent)
+	:parent(_parent), locked(false)
+      {
+	acquire();
+      }
+
+      /** Lock the associated mutex. */
+      void acquire()
+      {
+	if(locked)
+	  throw DoubleLockException();
+
+	pthread_mutex_lock(&parent.m);
+	locked = true;
+      }
+
+      /** Unlock the associated mutex. */
+      void release()
+      {
+	pthread_mutex_unlock(&parent.m);
+	locked = false;
+      }
+
+      bool get_locked() const
+      {
+	return locked;
+      }
+
+      ~lock()
+      {
+	if(locked)
+	  pthread_mutex_unlock(&parent.m);
+      }
+    };
+
+    /** Represents a non-blocking lock on a mutex. */
+    class try_lock
+    {
+      mutex &parent;
+
+      bool locked;
+
+      friend class condition;
+
+      try_lock(const try_lock &other);
+      try_lock &operator=(const try_lock &other);
+    public:
+      try_lock(mutex &_parent)
+	:parent(_parent)
+      {
+	acquire();
+      }
+
+      ~try_lock()
+      {
+	if(locked)
+	  pthread_mutex_unlock(&parent.m);
+      }
+
+      void acquire()
+      {
+	if(locked)
+	  throw DoubleLockException();
+
+	locked = pthread_mutex_trylock(&parent.m);
+      }
+
+      void release()
+      {
+	pthread_mutex_unlock(&parent.m);
+	locked = false;
+      }
+
+      bool get_locked() const
+      {
+	return locked;
+      }
+    };
+
+    mutex()
+    {
+      pthread_mutex_init(&m, NULL);
+    }
+
+    ~mutex()
+    {
+      pthread_mutex_destroy(&m);
+    }
+  };
+
+  /** A abstraction over conditions.  When a condition variable is
+   *  destroyed, any threads that are still blocked on it are woken
+   *  up.
+   */
+  class condition
+  {
+    pthread_cond_t cond;
+  public:
+    condition()
+    {
+      pthread_cond_init(&cond, NULL);
+    }
+
+    ~condition()
+    {
+      // Wakey wakey
+      pthread_cond_broadcast(&cond);
+      pthread_cond_destroy(&cond);
+    }
+
+    void wake_one()
+    {
+      pthread_cond_signal(&cond);
+    }
+
+    void wake_all()
+    {
+      pthread_cond_broadcast(&cond);
+    }
+
+    /** Wait with the given guard (should be a lock type that is a
+     *  friend of this condition object).
+     */
+    template<typename Lock>
+    void wait(const Lock &l)
+    {
+      if(!l.get_locked())
+	throw ConditionNotLockedException();
+
+      pthread_cond_wait(&cond, &l.parent.m);
+    }
+
+    /** Wait until the given predicate returns \b true. */
+    template<typename Lock, typename Pred>
+    void wait(const Lock &l, Pred p)
+    {
+      if(!l.get_locked())
+	throw ConditionNotLockedException();
+
+      while(!p())
+	wait(l);
+    }
+  };
+
+  /** A higher-level abstraction borrowed from Concurrent Haskell,
+   *  which borrowed it from another language I forget.  This
+   *  represents a "box" that can either hold a value or be empty.
+   *  Any thread can take the current value of the box or place a new
+   *  value inside it; the attempt will block until a value is
+   *  available or the box is empty, respectively.  It's sort of a
+   *  single-element bounded communications channel.
+   *
+   *  The value in the box is stored with copying semantics.  Like the
+   *  other threading primitives, boxes are not copyable.
+   */
+  template<typename T>
+  class box
+  {
+    T val;
+    bool filled;
+
+    condition cond;
+    mutex m;
+  public:
+    /** Create an empty box. */
+    box()
+      :filled(false)
+    {
+    }
+
+    /** Create a box containing the given value. */
+    box(const T &_val)
+      :val(_val), filled(true)
+    {
+    }
+
+    /** Retrieve the current value of this box.  If the box is empty,
+     *  block until it is full.
+     */
+    T take();
+
+    /** Fill this box with a value.  If the box is full, block until
+     *  it is empty.
+     */
+    void put(const T &t);
+
+    /** If there is a value in the box, retrieve it immediately;
+     *  otherwise do nothing.
+     *
+     *  \param out the location in which the value should be stored
+     *  \return \b true iff a value was found in the box
+     */
+    bool try_take(T &out);
+
+    /** If the box is empty, place a value in it; otherwise, do
+     *  nothing.
+     *
+     *  \param t the value to place in the box
+     *
+     *  \return \b true iff the box was empty (and hence was filled
+     *  with t)
+     */
+    bool try_put(const T &t);
+
+    /** Atomically modify the contents of the box; if an exception is
+     *  thrown by the given function object, no action will be
+     *  performed.
+     */
+    template<typename Mutator>
+    void box::update(const Mutator &m);
+  };
+
+  /** Internal helper struct. */
+  struct bool_ref_pred
+  {
+    const bool &b;
+  public:
+    bool_ref_pred(const bool &_b)
+      :b(_b)
+    {
+    }
+
+    bool operator()() const
+    {
+      return b;
+    }
+  };
+
+  /** Internal helper struct. */
+  struct not_bool_ref_pred
+  {
+    const bool &b;
+  public:
+    not_bool_ref_pred(const bool &_b)
+      :b(_b)
+    {
+    }
+
+    bool operator()() const
+    {
+      return !b;
+    }
+  };
+
+  template<typename T>
+  T box<T>::take()
+  {
+    mutex::lock l(m);
+
+    cond.wait(l, bool_ref_pred(filled));
+
+    assert(filled);
+
+    filled = false;
+
+    // Interesting question: does l get released before or after the
+    // copy?  To be safe, I explicitly copy before I return.
+    T rval = val;
+    return rval;
+  }
+
+  template<typename T>
+  bool box<T>::try_take(T &out)
+  {
+    mutex::lock l(m);
+
+    if(filled)
+      {
+	filled = false;
+	out = val;
+	return true;
+      }
+    else
+      return false;
+  }
+
+  template<typename T>
+  void box<T>::put(const T &new_val)
+  {
+    mutex::lock l(m);
+
+    cond.wait(l, not_bool_ref_pred(filled));
+
+    filled = true;
+    val = new_val;
+    cond.wake_one();
+  }
+
+  template<typename T>
+  bool box<T>::try_put(const T &new_val)
+  {
+    mutex::lock l(m);
+
+    if(!filled)
+      {
+	filled = true;
+	val = new_val;
+	cond.wake_one();
+	return true;
+      }
+    else
+      return false;
+  }
+
+  template<typename T>
+  template<typename Mutator>
+  void box<T>::update(const Mutator &m)
+  {
+    mutex::lock l(m);
+
+    cond.wait(l, bool_ref_pred(filled));
+
+    T new_val = m(val);
+
+    val = new_val;
+    cond.wake_one();
+  }
+}
+
+#endif
+

Modified: branches/aptitude-0.3/aptitude/tests/Makefile.am
==============================================================================
--- branches/aptitude-0.3/aptitude/tests/Makefile.am	(original)
+++ branches/aptitude-0.3/aptitude/tests/Makefile.am	Thu Sep  1 05:29:03 2005
@@ -22,4 +22,5 @@
 	test_resolver.cc \
 	test_setset.cc \
 	test_tags.cc \
+	test_threads.cc \
 	test_wtree.cc

Added: branches/aptitude-0.3/aptitude/tests/test_threads.cc
==============================================================================
--- (empty file)
+++ branches/aptitude-0.3/aptitude/tests/test_threads.cc	Thu Sep  1 05:29:03 2005
@@ -0,0 +1,106 @@
+// test_threads.cc
+//
+//   Copyright (C) 2005 Daniel Burrows
+//
+//   This program is free software; you can redistribute it and/or
+//   modify it under the terms of the GNU General Public License as
+//   published by the Free Software Foundation; either version 2 of
+//   the License, or (at your option) any later version.
+//
+//   This program is distributed in the hope that it will be useful,
+//   but WITHOUT ANY WARRANTY; without even the implied warranty of
+//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+//   General Public License for more details.
+//
+//   You should have received a copy of the GNU General Public License
+//   along with this program; see the file COPYING.  If not, write to
+//   the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+//   Boston, MA 02111-1307, USA.
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <src/generic/threads.h>
+
+#include <iostream>
+
+class TestThreads : public CppUnit::TestFixture
+{
+  CPPUNIT_TEST_SUITE(TestThreads);
+
+  CPPUNIT_TEST(testBox);
+  CPPUNIT_TEST(testAutoDetach);
+
+  CPPUNIT_TEST_SUITE_END();
+
+public:
+  // Ye Olde Addition Thread
+  struct add_thread
+  {
+    int n;
+    threads::box<int> &count;
+  public:
+    add_thread(int _n, threads::box<int> &_count)
+      :n(_n), count(_count)
+    {
+    }
+
+    void operator()() const
+    {
+      for(int i = 0; i < n; ++i)
+	count.put(count.take()+1);
+    }
+  };
+
+  void do_testBox()
+  {
+    const int thread_count = 50;
+    const int thread_limit = 1000;
+
+    threads::box<int> b(100);
+
+    CPPUNIT_ASSERT_EQUAL(100, b.take());
+
+    std::auto_ptr<threads::thread> writers[thread_count];
+
+    for(int i = 0; i<thread_count; ++i)
+      writers[i] = std::auto_ptr<threads::thread>(new threads::thread(add_thread(thread_limit, b)));
+
+    int foo;
+    CPPUNIT_ASSERT(!b.try_take(foo));
+
+    CPPUNIT_ASSERT(b.try_put(-1));
+
+    for(int i = 0; i<thread_count; ++i)
+      writers[i]->join();
+
+    CPPUNIT_ASSERT_EQUAL(thread_count*thread_limit-1, b.take());
+  }
+
+  void testBox()
+  {
+    try
+      {
+	do_testBox();
+      }
+    catch(const Exception &e)
+      {
+	std::cerr << "Caught exception in testBox: " << e.errmsg() << std::endl;
+	throw;
+      }
+  }
+
+  struct do_nothing
+  {
+  public:
+    void operator()() const
+    {
+    }
+  };
+
+  void testAutoDetach()
+  {
+    threads::thread t(do_nothing());
+  }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(TestThreads);



More information about the Aptitude-svn-commit mailing list