[Aptitude-svn-commit] r4089 - in branches/aptitude-0.3/aptitude: .
src/generic
Daniel Burrows
dburrows at costa.debian.org
Thu Sep 15 01:23:38 UTC 2005
Author: dburrows
Date: Thu Sep 15 01:23:35 2005
New Revision: 4089
Modified:
branches/aptitude-0.3/aptitude/ChangeLog
branches/aptitude-0.3/aptitude/src/generic/resolver_manager.cc
branches/aptitude-0.3/aptitude/src/generic/resolver_manager.h
Log:
Add support to the manager for running computations in the background.
Modified: branches/aptitude-0.3/aptitude/ChangeLog
==============================================================================
--- branches/aptitude-0.3/aptitude/ChangeLog (original)
+++ branches/aptitude-0.3/aptitude/ChangeLog Thu Sep 15 01:23:35 2005
@@ -1,5 +1,12 @@
2005-09-14 Daniel Burrows <dburrows at debian.org>
+ * src/resolver_manager.cc, src/resolver_manager.h:
+
+ Add coarse-grained threading support: the manager can start a
+ background thread for computations; thread-safety is ensured by
+ completely stopping the thread before performing dangerous
+ mutations.
+
* src/generic/immset.h:
Fix a silly error in the implementation of clone(): pass the
Modified: branches/aptitude-0.3/aptitude/src/generic/resolver_manager.cc
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/resolver_manager.cc (original)
+++ branches/aptitude-0.3/aptitude/src/generic/resolver_manager.cc Thu Sep 15 01:23:35 2005
@@ -28,8 +28,21 @@
#include <sigc++/functors/mem_fun.h>
+std::string ResolverManagerThreadClashException::errmsg() const
+{
+ return "Internal error: attempt to run two simultaneous resolvers";
+}
+
+// NB: we need a recursive mutex because some routines can be called
+// either by other routines of the class (already have a mutex lock)
+// or by the user (don't have a mutex lock); I could sidestep this
+// with some clever magic, but there's no point unless it turns out to
+// be a bottleneck.
resolver_manager::resolver_manager(aptitudeDepCache *_cache)
- :cache(_cache), resolver(0), selected_solution(0), out_of_time(false)
+ :cache(_cache), resolver(0), selected_solution(0), out_of_time(false),
+ out_of_solutions(false), background_resolver_active(false),
+ resolver_suspend_count(0), resolver_thread(NULL),
+ mutex(threads::mutex::attr(PTHREAD_MUTEX_RECURSIVE_NP))
{
cache->pre_package_state_changed.connect(sigc::mem_fun(this, &resolver_manager::discard_resolver));
cache->package_state_changed.connect(sigc::mem_fun(this, &resolver_manager::maybe_create_resolver));
@@ -41,29 +54,174 @@
resolver_manager::~resolver_manager()
{
- delete resolver;
+ discard_resolver();
+
+ delete continuation;
for(unsigned int i = 0; i < solutions.size(); ++i)
delete solutions[i];
}
+resolver_manager::background_continuation::~background_continuation()
+{
+}
+
+class resolver_manager::background_suspender
+{
+ resolver_manager &m;
+ /** A lock on the manager's main mutex; used as a way of reminding
+ * myself to take that lock first.
+ */
+ threads::mutex::lock &l;
+public:
+ background_suspender(resolver_manager &_m,
+ threads::mutex::lock &_l)
+ :m(_m), l(_l)
+ {
+ if(m.resolver_suspend_count == 0)
+ m.stop_background_resolver();
+ ++m.resolver_suspend_count;
+ }
+
+ ~background_suspender()
+ {
+ --m.resolver_suspend_count;
+ if(m.resolver_suspend_count == 0)
+ m.restart_background_resolver();
+ }
+};
+
+// This assumes that background_resolver_active is empty when it
+// starts (see restart_background_resolver)
+//
+// TODO: merge with the foreground stuff?
+void resolver_manager::background_thread_execution(int max_steps,
+ int sol_num)
+{
+ background_resolver_active.put(true);
+
+ try
+ {
+ aptitude_resolver::solution *sol = do_get_solution(sol_num);
+
+ continuation->success(*sol);
+
+ assert(background_resolver_active.take());
+ background_resolver_active.put(false);
+ }
+ // If the computation was interrupted, just terminate silently
+ // without clearing the background_resolver_active field.
+ catch(InterruptedException)
+ {
+ assert(background_resolver_active.take());
+ background_resolver_active.put(true);
+ }
+ catch(NoMoreSolutions)
+ {
+ continuation->no_more_solutions();
+
+ assert(background_resolver_active.take());
+ background_resolver_active.put(false);
+ }
+ catch(NoMoreTime)
+ {
+ continuation->no_more_time();
+
+ assert(background_resolver_active.take());
+ background_resolver_active.put(false);
+ }
+ // Other escaping exceptions should blow up the program, so don't
+ // worry about the state of background_resolver_active here.
+}
+
+// Need this because sigc slots aren't threadsafe :-(
+struct resolver_manager::background_thread_bootstrap
+{
+ int max_steps;
+ int sol_num;
+ resolver_manager &m;
+public:
+ background_thread_bootstrap(int _max_steps,
+ int _sol_num,
+ resolver_manager &_m)
+ :max_steps(_max_steps), sol_num(_sol_num), m(_m)
+ {
+ }
+
+ void operator()()
+ {
+ m.background_thread_execution(max_steps, sol_num);
+ }
+};
+
+void resolver_manager::restart_background_resolver()
+{
+ threads::mutex::lock l(mutex);
+
+ if(background_resolver_active.take())
+ {
+ assert(resolver_thread == NULL);
+ resolver_thread = new threads::thread(background_thread_bootstrap(aptcfg->FindI(PACKAGE "::ProblemResolver::StepLimit", 5000), background_target, *this));
+ }
+ else
+ background_resolver_active.put(false);
+}
+
+void resolver_manager::stop_background_resolver()
+{
+ threads::mutex::lock l(mutex);
+
+ if(background_resolver_active.take())
+ {
+ background_resolver_active.put(true);
+ assert(resolver_thread != NULL);
+ resolver->cancel_solver();
+ resolver_thread->join();
+ }
+ else
+ background_resolver_active.put(false);
+}
+
void resolver_manager::maybe_create_resolver()
{
+ threads::mutex::lock l(mutex);
+
if(resolver == NULL && cache->BrokenCount() > 0)
create_resolver();
}
void resolver_manager::discard_resolver()
{
+ threads::mutex::lock l(mutex);
+
+ stop_background_resolver();
+ // Now we know no background resolver is running, and we've locked
+ // out the other methods of this object, so this is guaranteed to be
+ // safe.
+ bool b;
+ assert(background_resolver_active.try_take(b));
+ background_resolver_active.put(false);
+
delete resolver;
- solutions.clear();
+
+ {
+ threads::mutex::lock l2(solutions_mutex);
+ solutions.clear();
+ }
+
resolver=NULL;
- out_of_time=false;
+
+ out_of_time.take();
+ out_of_time.put(false);
+
+ out_of_solutions.take();
+ out_of_solutions.put(false);
}
void resolver_manager::create_resolver()
{
- assert(resolver==NULL);
+ threads::mutex::lock l(mutex);
+ assert(resolver == NULL);
// NOTE: the performance of the resolver is highly sensitive to
// these settings; choosing bad ones can result in hitting
@@ -106,6 +264,9 @@
void resolver_manager::set_debug(bool activate)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver_exists());
resolver->set_debug(activate);
@@ -113,21 +274,32 @@
bool resolver_manager::resolver_exists() const
{
+ threads::mutex::lock l(mutex);
+
return resolver != NULL;
}
unsigned int resolver_manager::generated_solution_count() const
{
+ threads::mutex::lock l(mutex);
+ threads::mutex::lock l2(solutions_mutex);
+
return solutions.size();
}
bool resolver_manager::solution_generation_complete() const
{
+ threads::mutex::lock l(mutex);
+
+ // FIXME: does this need a lock in the resolver?
+ // background_suspender bs(*this, l);
return resolver->exhausted();
}
bool resolver_manager::solutions_exhausted() const
{
+ threads::mutex::lock l(mutex);
+
if(!resolver_exists())
return true;
else
@@ -137,62 +309,168 @@
bool resolver_manager::solutions_at_start() const
{
+ threads::mutex::lock l(mutex);
+
if(!resolver_exists())
return true;
else
return selected_solution == 0;
}
-const aptitude_resolver::solution &resolver_manager::get_solution(unsigned int solution_num)
+aptitude_resolver::solution *resolver_manager::do_get_solution(unsigned int solution_num)
{
+ threads::mutex::lock sol_l(solutions_mutex);
+ if(solution_num < solutions.size())
+ return solutions[solution_num];
+
while(solution_num >= solutions.size())
{
+ sol_l.release();
+
try
{
- solutions.push_back(new aptitude_resolver::solution(resolver->find_next_solution(aptcfg->FindI(PACKAGE "::ProblemResolver::StepLimit", 5000))));
- out_of_time = false;
+ generic_solution<aptitude_universe> sol = resolver->find_next_solution(aptcfg->FindI(PACKAGE "::ProblemResolver::StepLimit", 5000));
+
+ sol_l.acquire();
+ solutions.push_back(new aptitude_resolver::solution(sol.clone()));
+ sol_l.release();
+
+ out_of_time.take();
+ out_of_time.put(false);
}
catch(NoMoreTime)
{
- out_of_time = true;
+ out_of_time.take();
+ out_of_time.put(true);
throw NoMoreTime();
}
catch(NoMoreSolutions)
{
- selected_solution_changed();
+ out_of_time.take();
+ out_of_time.put(false);
+ out_of_solutions.take();
+ out_of_solutions.put(true);
throw NoMoreSolutions();
}
}
- return *solutions[solution_num];
+ return solutions[solution_num];
+}
+
+const aptitude_resolver::solution &resolver_manager::get_solution(unsigned int solution_num)
+{
+ threads::mutex::lock l(mutex);
+
+ background_suspender bs(*this, l);
+
+ return *do_get_solution(solution_num);
}
+
const aptitude_resolver::solution &resolver_manager::get_current_solution()
{
+ threads::mutex::lock l(mutex);
+
assert(resolver);
- if(out_of_time)
+ bool oot = out_of_time.take();
+ out_of_time.put(oot);
+
+ // Only throw NoMoreTime when we're trying to generate a new
+ // solution.
+ threads::mutex::lock sol_l(solutions_mutex);
+ oot = oot && selected_solution >= solutions.size();
+ sol_l.release();
+
+ if(oot)
throw NoMoreTime();
else
{
+ sol_l.acquire();
unsigned int start_size = solutions.size();
+ sol_l.release();
+
const aptitude_resolver::solution &rval = get_solution(selected_solution);
+
+ sol_l.acquire();
if(start_size != solutions.size())
- selected_solution_changed();
+ {
+ sol_l.release();
+ selected_solution_changed();
+ }
return rval;
}
}
+void resolver_manager::get_solution_background(unsigned int solution_num,
+ background_continuation *k)
+{
+ threads::mutex::lock l(mutex);
+
+ threads::mutex::lock sol_l(solutions_mutex);
+ if(solution_num < solutions.size())
+ {
+ k->success(*solutions[solution_num]);
+ return;
+ }
+ sol_l.release();
+
+
+ bool active = background_resolver_active.take();
+ background_resolver_active.put(active);
+
+ if(active)
+ throw ResolverManagerThreadClashException();
+ else
+ {
+ // Prime the background thread:
+ background_resolver_active.take();
+ background_resolver_active.put(true);
+
+ background_target = solution_num;
+ continuation = k;
+ restart_background_resolver();
+ }
+}
+
+void resolver_manager::get_current_solution_background(background_continuation *k)
+{
+ threads::mutex::lock l(mutex);
+
+ assert(resolver);
+
+ bool oot = out_of_time.take();
+ out_of_time.put(oot);
+
+ // Only throw NoMoreTime when we're trying to generate a new
+ // solution.
+ threads::mutex::lock sol_l(solutions_mutex);
+ oot = oot && selected_solution >= solutions.size();
+ sol_l.release();
+
+ if(oot)
+ k->no_more_time();
+ else
+ get_solution_background(selected_solution, k);
+}
+
void resolver_manager::reject_version(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->reject_version(ver);
+ restart_background_resolver();
}
void resolver_manager::unreject_version(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->unreject_version(ver);
@@ -202,6 +480,7 @@
bool resolver_manager::is_rejected(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
assert(resolver);
return resolver->is_rejected(ver);
@@ -209,6 +488,9 @@
void resolver_manager::mandate_version(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->mandate_version(ver);
@@ -216,6 +498,9 @@
void resolver_manager::unmandate_version(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->unmandate_version(ver);
@@ -225,6 +510,7 @@
bool resolver_manager::is_mandatory(const aptitude_resolver_version &ver)
{
+ threads::mutex::lock l(mutex);
assert(resolver);
return resolver->is_mandatory(ver);
@@ -232,6 +518,9 @@
void resolver_manager::harden_dep(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->harden(dep);
@@ -239,6 +528,9 @@
void resolver_manager::unharden_dep(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->unharden(dep);
@@ -248,6 +540,7 @@
bool resolver_manager::is_hardened(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
assert(resolver);
return resolver->is_hardened(dep);
@@ -255,6 +548,9 @@
void resolver_manager::force_break_dep(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->force_break(dep);
@@ -262,6 +558,9 @@
void resolver_manager::unforce_break_dep(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver);
resolver->unforce_break(dep);
@@ -271,6 +570,7 @@
bool resolver_manager::is_forced_broken(const aptitude_resolver_dep &dep)
{
+ threads::mutex::lock l(mutex);
assert(resolver);
return resolver->is_forced_broken(dep);
@@ -278,12 +578,16 @@
void resolver_manager::select_next_solution()
{
+ threads::mutex::lock l(mutex);
+
++selected_solution;
selected_solution_changed();
}
void resolver_manager::select_previous_solution()
{
+ threads::mutex::lock l(mutex);
+
if(selected_solution == 0)
throw NoMoreSolutions();
else
@@ -293,12 +597,16 @@
const aptitude_resolver::solution &resolver_manager::next_solution()
{
+ threads::mutex::lock l(mutex);
+
select_next_solution();
return get_current_solution();
}
const aptitude_resolver::solution &resolver_manager::previous_solution()
{
+ threads::mutex::lock l(mutex);
+
select_previous_solution();
return get_current_solution();
}
@@ -307,6 +615,9 @@
const pkgCache::VerIterator &ver,
int score)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
assert(resolver_exists());
assert(resolver->fresh());
@@ -314,8 +625,11 @@
score);
}
-void resolver_manager::dump(ostream &out) const
+void resolver_manager::dump(ostream &out)
{
+ threads::mutex::lock l(mutex);
+ background_suspender bs(*this, l);
+
if(!resolver_exists())
return;
Modified: branches/aptitude-0.3/aptitude/src/generic/resolver_manager.h
==============================================================================
--- branches/aptitude-0.3/aptitude/src/generic/resolver_manager.h (original)
+++ branches/aptitude-0.3/aptitude/src/generic/resolver_manager.h Thu Sep 15 01:23:35 2005
@@ -20,12 +20,16 @@
// A higher-level resolver interface. This code is responsible for
// maintaining a list of previously observed solutions, for passing
// certain actions on to the underlying resolver (protecting users
-// from having to actually import the whole resolver definition), and
-// for managing the resolver in the face of cache reloads and resets.
+// from having to actually import the whole resolver definition), for
+// managing the resolver in the face of cache reloads and resets, and
+// for managing threaded access to the resolver.
#ifndef RESOLVER_MANAGER_H
#define RESOLVER_MANAGER_H
+#include "exception.h"
+#include "threads.h"
+
#include <apt-pkg/pkgcache.h>
#include <sigc++/signal.h>
@@ -40,18 +44,25 @@
template<typename PackageUniverse> class generic_solution;
class aptitude_resolver;
-namespace threads
+/** An exception thrown when the user tries to start two threads at
+ * once.
+ */
+class ResolverManagerThreadClashException : public Exception
{
- class thread;
+public:
+ std::string errmsg() const;
};
/** Manages a resolver for a single cache object. When broken
* packages arise, a new resolver is created; whenever the state of a
* package changes, the resolver is deleted and reset. While a
* resolver is active, users of this class can "select" a particular
- * solution, then "generate" it. Generating a solution is a
- * potentially slow operation and support for pushing that action
- * into a background thread will be provided in the future.
+ * solution, then "generate" it.
+ *
+ * Solutions can be generated in a background thread, but of course
+ * only one background thread may be running at a time. The
+ * solutions returned should only be accessed from one thread at a
+ * time unless you clone() them.
*
* Note: of course it would also be possible to simply query the
* manager for the Nth solution; however, using a selection pointer
@@ -60,6 +71,32 @@
*/
class resolver_manager : public sigc::trackable
{
+public:
+ /** This class represents the continuation of get_solution() in a
+ * background thread. See get_background_solution() for details.
+ */
+ class background_continuation
+ {
+ public:
+ virtual ~background_continuation();
+
+ /** Invoked when a solution has been successfully generated. */
+ virtual void success(const generic_solution<aptitude_universe> &) = 0;
+ /** Invoked when all solutions have been exhausted (corresponds to
+ * the NoMoreSolutions exception).
+ */
+ virtual void no_more_solutions() = 0;
+
+ /** Invoked when time has expired. (corresponds to NoMoreTime) */
+ virtual void no_more_time() = 0;
+
+ /** Invoked when the solver was interrupted. (corresponds to
+ * InterruptedException)
+ */
+ virtual void interrupted() = 0;
+ };
+
+private:
/** The cache file on which this manager operates. */
aptitudeDepCache *cache;
@@ -72,6 +109,12 @@
*/
std::vector<generic_solution<aptitude_universe> *> solutions;
+ /** A lock for the list of solutions; used to allow the background
+ * thread to immediately post results without taking the big class
+ * lock (since that might be taken by stop_background_resolver())
+ */
+ mutable threads::mutex solutions_mutex;
+
/** The index of the currently selected solution. */
unsigned int selected_solution;
@@ -81,11 +124,88 @@
* This is used so that the return value of get_current_solution()
* is stable.
*/
- bool out_of_time;
+ threads::box<bool> out_of_time;
+
+ /** If \b true, a find_next_solution() call will terminate with
+ * NoMoreSolutions. (this may be the case even if it is \b false)
+ */
+ threads::box<bool> out_of_solutions;
+
+ /** If \b true, a background thread should be active to calculate a
+ * resolver solution. This is a box to eliminate a potential race
+ * between the very end of the background thread and the 'start a
+ * background thread' routine.
+ */
+ threads::box<bool> background_resolver_active;
+
+ /** If background_resolver_active is \b true, then this is the
+ * solution number which it is attempting to calculate.
+ */
+ unsigned int background_target;
+
+ /** Used to manage the background thread semi-automatically; counts
+ * how many times it was suspended by the stack-based
+ * background_suspend class.
+ */
+ int resolver_suspend_count;
+
+ /** If background_resolver_active is \b true, this is a callback
+ * object to be used in handling the result of the background
+ * thread's computation.
+ */
+ background_continuation *continuation;
+
+ /** The thread in which a background resolver is running, or \b NULL
+ * if none is.
+ */
+ threads::thread *resolver_thread;
+
+ /** This lock is used to serialize all accesses to this object,
+ * except background_get_solution().
+ */
+ mutable threads::mutex mutex;
void discard_resolver();
void create_resolver();
+ /** A class that bootstraps the routine below. */
+ class background_thread_bootstrap;
+ friend class background_thread_bootstrap;
+
+ /** A class that stops the background thread when it's created, and
+ * restarts it when it's destroyed. If background_resolver_active
+ * is set to \b false in the meantime, the resolver won't be
+ * restarted.
+ */
+ class background_suspender;
+ friend class background_suspender;
+
+ /** Low-level code to get a solution; it does not take the global
+ * lock, does not stop a background thread, and may run either in
+ * the foreground or in the background. It is called by
+ * background_thread_execution and get_solution.
+ */
+ generic_solution<aptitude_universe> *do_get_solution(unsigned int solution_number);
+
+ /** The actual background thread.
+ *
+ * \param max_steps the limit of this thread's search
+ * \param sol_num the number of solutions to be examined by this
+ * thread.
+ */
+ void background_thread_execution(int max_steps, int sol_num);
+
+ /** The caller should hold the mutex; if no resolver thread is
+ * currently running and background_resolver_active is \b true,
+ * then a new resolver will be started.
+ */
+ void restart_background_resolver();
+
+ /** The caller should hold the mutex; stops any running resolver
+ * thread, but leaves background_resolver_active set.
+ */
+ void stop_background_resolver();
+
/** Create a resolver if necessary. */
void maybe_create_resolver();
public:
@@ -122,10 +242,16 @@
* the list; will continue a search even if it ran out of time
* previously.
*
+ * If solution_num < generated_solution_count, it is always safe to
+ * call this from a foreground thread; otherwise, an exception will
+ * be thrown if a background resolver exists.
+ *
* \throw NoMoreSolutions if the list of solutions is exhausted
* \throw NoMoreTime if time is exhausted while searching for
* the solution (time here is counted separately
* at each step).
+ * \throw ResolverManagerThreadClashException if a new solution
+ * would be generated and a background thread exists.
*/
const generic_solution<aptitude_universe> &get_solution(unsigned int solution_num);
@@ -135,10 +261,44 @@
*
* \throw NoMoreSolutions if the list of solutions is exhausted
* \throw NoMoreTime if time is exhausted while searching for the solution.
+ * \throw ResolverManagerThreadClashException if a new solution
+ * would be generated and a background thread exists.
*/
const generic_solution<aptitude_universe> &get_current_solution();
+ /** As get_solution, but run in a background thread if necessary.
+ *
+ * \param solution_num the solution to retrieve
+ *
+ * \param k a background_continuation object; when the background
+ * computation is finished, a method corresponding to its result
+ * will be invoked on continuation. This method may be invoked in
+ * either the foreground thread or the background thread, so it
+ * should be threadsafe.
+ *
+ * k is owned by this object and will be deleted at its discretion.
+ *
+ * \throw ResolverManagerThreadClashException if a background
+ * resolver already exists.
+ */
+ void get_solution_background(unsigned int solution_num,
+ background_continuation *k);
+
+ /** As get_current_solution, but run in a background thread if
+ * necessary. Updates to the selected solution will not alter the
+ * solution being computed by the background thread.
+ *
+ * \param solution_num the solution to retrieve
+ *
+ * \param k the continuation of the background process; see
+ * get_background_solution.
+ *
+ * \throw ResolverManagerThreadClashException if a background
+ * resolver already exists.
+ */
+ void get_current_solution_background(background_continuation *k);
+
/** If \b true, all solutions have been generated. */
bool solution_generation_complete() const;
@@ -260,7 +420,7 @@
/** If a resolver exists, write its state (including scores, etc)
* to the given stream.
*/
- void dump(std::ostream &out) const;
+ void dump(std::ostream &out);
/** This signal is emitted when the selected solution changes, or
* when the user takes an action that might change the number of
More information about the Aptitude-svn-commit
mailing list