[SCM] WebKit Debian packaging branch, debian/experimental, updated. upstream/1.3.3-10851-g50815da
dpranke at chromium.org
dpranke at chromium.org
Wed Dec 22 17:56:55 UTC 2010
The following commit has been merged in the debian/experimental branch:
commit 5c6ea7c082016c1880aa45632b3d5c13b3ca0bed
Author: dpranke at chromium.org <dpranke at chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Date: Fri Dec 3 03:41:43 2010 +0000
2010-12-02 Dirk Pranke <dpranke at chromium.org>
Reviewed by Tony Chang.
nrwt multiprocessing - move logic back into run_webkit_tests
This change moves a bunch of logic that I had put into
message_broker back into run_webkit_tests, in a slightly
different format. WorkerMessageBroker needed to become less aware of
the logic the TestRunner class uses, and more generic.
Eventually the MessageBroker will only do generic messaging and
thread/process-pooling, and (almost) all of the
run-webkit-tests-specific logic will be moved to
run_webkit_tests.py and dump_render_tree_thread.py.
The biggest changes are that the Broker can now start a single
worker, but the responsibility for starting all of them is pushed
back to the TestRunner (Manager), and the logic for checking if
the threads are done or wedged is moved back to TestRunner. We
also remove WorkerMessageBroker.cleanup (not needed) and
cancel_workers (they have to be cancelled individually).
The message_broker is now encapsulated inside
TestRunner._run_tests(); it only needs to exist while actually
running the tests.
Also, delete a bunch of tests in message_broker_unittest that no
longer make much sense.
This patch depends on bug 50372.
https://bugs.webkit.org/show_bug.cgi?id=50374
* Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
* Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
* Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
* Scripts/webkitpy/layout_tests/run_webkit_tests.py:
* Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@73231 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/WebKitTools/ChangeLog b/WebKitTools/ChangeLog
index d675077..2de59e7 100644
--- a/WebKitTools/ChangeLog
+++ b/WebKitTools/ChangeLog
@@ -1,3 +1,42 @@
+2010-12-02 Dirk Pranke <dpranke at chromium.org>
+
+ Reviewed by Tony Chang.
+
+ nrwt multiprocessing - move logic back into run_webkit_tests
+
+ This change moves a bunch of logic that I had put into
+ message_broker back into run_webkit_tests, in a slightly
+ different format. WorkerMessageBroker needed to become less aware of
+ the logic the TestRunner class uses, and more generic.
+ Eventually the MessageBroker will only do generic messaging and
+ thread/process-pooling, and (almost) all of the
+ run-webkit-tests-specific logic will be moved to
+ run_webkit_tests.py and dump_render_tree_thread.py.
+
+ The biggest changes are that the Broker can now start a single
+ worker, but the responsibility for starting all of them is pushed
+ back to the TestRunner (Manager), and the logic for checking if
+ the threads are done or wedged is moved back to TestRunner. We
+ also remove WorkerMessageBroker.cleanup (not needed) and
+ cancel_workers (they have to be cancelled individually).
+
+ The message_broker is now encapsulated inside
+ TestRunner._run_tests(); it only needs to exist while actually
+ running the tests.
+
+ Also, delete a bunch of tests in message_broker_unittest that no
+ longer make much sense.
+
+ This patch depends on bug 50372.
+
+ https://bugs.webkit.org/show_bug.cgi?id=50374
+
+ * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
+ * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
+ * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
2010-12-02 Hayato Ito <hayato at chromium.org>
Reviewed by Eric Seidel.
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
index 285b2e7..bc3d336 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
@@ -64,114 +64,88 @@ class _WorkerMessageBroker(object):
def __init__(self, port, options):
self._port = port
self._options = options
- self._num_workers = int(self._options.child_processes)
- # This maps worker names to their TestShellThread objects.
+ # This maps worker_names to TestShellThreads
self._threads = {}
- def start_workers(self, test_runner):
- """Starts up the pool of workers for running the tests.
-
- Args:
- test_runner: a handle to the manager/TestRunner object
- """
- self._test_runner = test_runner
- for worker_number in xrange(self._num_workers):
- thread = self.start_worker(worker_number)
- self._threads[thread.name()] = thread
- return self._threads.values()
-
- def start_worker(self, worker_number):
- # FIXME: Replace with something that isn't a thread.
+ def start_worker(self, test_runner, worker_number):
+ """Start a worker with the given index number.
+
+ Returns the actual TestShellThread object."""
+ # FIXME: Remove dependencies on test_runner.
+ # FIXME: Replace with something that isn't a thread, and return
+ # the name of the worker, not the thread itself. We need to return
+ # the thread itself for now to allow TestRunner to access the object
+ # directly to read shared state.
+ thread = dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number, test_runner._current_filename_queue,
+ test_runner._result_queue)
+ self._threads[thread.name()] = thread
# Note: Don't start() the thread! If we did, it would actually
# create another thread and start executing it, and we'd no longer
# be single-threaded.
- return dump_render_tree_thread.TestShellThread(self._port,
- self._options, worker_number,
- self._test_runner._current_filename_queue,
- self._test_runner._result_queue)
+ return thread
- def run_message_loop(self):
- """Loop processing messages until done."""
- raise NotImplementedError
+ def cancel_worker(self, worker_name):
+ """Attempt to cancel a worker (best-effort). The worker may still be
+ running after this call returns."""
+ self._threads[worker_name].cancel()
- def cancel_workers(self):
- """Cancel/interrupt any workers that are still alive."""
- pass
+ def log_wedged_worker(self, worker_name):
+ """Log information about the given worker's state."""
+ raise NotImplementedError
- def cleanup(self):
- """Perform any necessary cleanup on shutdown."""
- pass
+ def run_message_loop(self, test_runner):
+ """Loop processing messages until done."""
+ # FIXME: eventually we'll need a message loop that the workers
+ # can also call.
+ raise NotImplementedError
class _InlineBroker(_WorkerMessageBroker):
- def run_message_loop(self):
+ def run_message_loop(self, test_runner):
thread = self._threads.values()[0]
- thread.run_in_main_thread(self._test_runner,
- self._test_runner._current_result_summary)
- self._test_runner.update()
+ thread.run_in_main_thread(test_runner,
+ test_runner._current_result_summary)
+
+ def log_wedged_worker(self, worker_name):
+ raise AssertionError('_InlineBroker.log_wedged_worker() called')
class _MultiThreadedBroker(_WorkerMessageBroker):
- def start_worker(self, worker_number):
- thread = _WorkerMessageBroker.start_worker(self, worker_number)
+ def start_worker(self, test_runner, worker_number):
+ thread = _WorkerMessageBroker.start_worker(self, test_runner,
+ worker_number)
+ # Unlike the base implementation, here we actually want to start
+ # the thread.
thread.start()
return thread
- def run_message_loop(self):
- # Loop through all the threads waiting for them to finish.
- some_thread_is_alive = True
- while some_thread_is_alive:
- some_thread_is_alive = False
- t = time.time()
- for thread in self._threads.values():
- exception_info = thread.exception_info()
- if exception_info is not None:
- # Re-raise the thread's exception here to make it
- # clear that testing was aborted. Otherwise,
- # the tests that did not run would be assumed
- # to have passed.
- raise exception_info[0], exception_info[1], exception_info[2]
-
- if thread.isAlive():
- some_thread_is_alive = True
- next_timeout = thread.next_timeout()
- if next_timeout and t > next_timeout:
- log_wedged_worker(thread.name(), thread.id())
- thread.clear_next_timeout()
-
- self._test_runner.update()
-
- if some_thread_is_alive:
- time.sleep(0.01)
-
- def cancel_workers(self):
- for thread in self._threads.values():
- thread.cancel()
-
-
-def log_wedged_worker(name, id):
- """Log information about the given worker state."""
- stack = _find_thread_stack(id)
- assert(stack is not None)
- _log.error("")
- _log.error("%s (tid %d) is wedged" % (name, id))
- _log_stack(stack)
- _log.error("")
-
-
-def _find_thread_stack(id):
- """Returns a stack object that can be used to dump a stack trace for
- the given thread id (or None if the id is not found)."""
- for thread_id, stack in sys._current_frames().items():
- if thread_id == id:
- return stack
- return None
-
-
-def _log_stack(stack):
- """Log a stack trace to log.error()."""
- for filename, lineno, name, line in traceback.extract_stack(stack):
- _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
- if line:
- _log.error(' %s' % line.strip())
+ def run_message_loop(self, test_runner):
+ # FIXME: Remove the dependencies on test_runner. Checking on workers
+ # should be done via a timer firing.
+ test_runner._check_on_workers()
+
+ def log_wedged_worker(self, worker_name):
+ thread = self._threads[worker_name]
+ stack = self._find_thread_stack(thread.id())
+ assert(stack is not None)
+ _log.error("")
+ _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
+ self._log_stack(stack)
+ _log.error("")
+
+ def _find_thread_stack(self, id):
+ """Returns a stack object that can be used to dump a stack trace for
+ the given thread id (or None if the id is not found)."""
+ for thread_id, stack in sys._current_frames().items():
+ if thread_id == id:
+ return stack
+ return None
+
+ def _log_stack(self, stack):
+ """Log a stack trace to log.error()."""
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ _log.error(' %s' % line.strip())
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
index c006471..d46df4c 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
@@ -42,139 +42,60 @@ from webkitpy.layout_tests import run_webkit_tests
import message_broker
+# FIXME: Boy do we need a lot more tests here ...
-class TestThread(threading.Thread):
- def __init__(self, started_queue, stopping_queue):
- threading.Thread.__init__(self)
- self._id = None
- self._started_queue = started_queue
- self._stopping_queue = stopping_queue
- self._timeout = False
- self._timeout_queue = Queue.Queue()
- self._exception_info = None
-
- def id(self):
- return self._id
-
- def name(self):
- return 'worker/0'
-
- def run(self):
- self._covered_run()
-
- def _covered_run(self):
- # FIXME: this is a separate routine to work around a bug
- # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
- self._id = thread.get_ident()
- try:
- self._started_queue.put('')
- msg = self._stopping_queue.get()
- if msg == 'KeyboardInterrupt':
- raise KeyboardInterrupt
- elif msg == 'Exception':
- raise ValueError()
- elif msg == 'Timeout':
- self._timeout = True
- self._timeout_queue.get()
- except:
- self._exception_info = sys.exc_info()
-
- def exception_info(self):
- return self._exception_info
-
- def next_timeout(self):
- if self._timeout:
- self._timeout_queue.put('done')
- return time.time() - 10
- return time.time()
-
- def clear_next_timeout(self):
- self._next_timeout = None
-
-class TestHandler(logging.Handler):
- def __init__(self, astream):
- logging.Handler.__init__(self)
- self._stream = astream
-
- def emit(self, record):
- self._stream.write(self.format(record))
-
-
-class MultiThreadedBrokerTest(unittest.TestCase):
- class MockTestRunner(object):
- def __init__(self):
- pass
-
- def __del__(self):
- pass
-
- def update(self):
- pass
-
- def run_one_thread(self, msg):
- runner = self.MockTestRunner()
- port = None
- options = mocktool.MockOptions(child_processes='1')
- starting_queue = Queue.Queue()
- stopping_queue = Queue.Queue()
- broker = message_broker._MultiThreadedBroker(port, options)
- broker._test_runner = runner
- child_thread = TestThread(starting_queue, stopping_queue)
- name = child_thread.name()
- broker._threads[name] = child_thread
- child_thread.start()
- started_msg = starting_queue.get()
- stopping_queue.put(msg)
- return broker.run_message_loop()
- def test_basic(self):
- interrupted = self.run_one_thread('')
- self.assertFalse(interrupted)
+class TestThreadStacks(unittest.TestCase):
+ class Thread(threading.Thread):
+ def __init__(self, started_queue, stopping_queue):
+ threading.Thread.__init__(self)
+ self._id = None
+ self._started_queue = started_queue
+ self._stopping_queue = stopping_queue
- def test_interrupt(self):
- self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
+ def id(self):
+ return self._id
- def test_timeout(self):
- oc = outputcapture.OutputCapture()
- oc.capture_output()
- interrupted = self.run_one_thread('Timeout')
- self.assertFalse(interrupted)
- oc.restore_output()
+ def name(self):
+ return 'worker/0'
- def test_exception(self):
- self.assertRaises(ValueError, self.run_one_thread, 'Exception')
+ def run(self):
+ self._id = thread.get_ident()
+ self._started_queue.put('')
+ msg = self._stopping_queue.get()
+ def make_broker(self):
+ options = mocktool.MockOptions()
+ return message_broker._MultiThreadedBroker(port=None,
+ options=options)
-class Test(unittest.TestCase):
def test_find_thread_stack_found(self):
+ broker = self.make_broker()
id, stack = sys._current_frames().items()[0]
- found_stack = message_broker._find_thread_stack(id)
+ found_stack = broker._find_thread_stack(id)
self.assertNotEqual(found_stack, None)
def test_find_thread_stack_not_found(self):
- found_stack = message_broker._find_thread_stack(0)
+ broker = self.make_broker()
+ found_stack = broker._find_thread_stack(0)
self.assertEqual(found_stack, None)
def test_log_wedged_worker(self):
+ broker = self.make_broker()
oc = outputcapture.OutputCapture()
oc.capture_output()
- logger = message_broker._log
- astream = array_stream.ArrayStream()
- handler = TestHandler(astream)
- logger.addHandler(handler)
starting_queue = Queue.Queue()
stopping_queue = Queue.Queue()
- child_thread = TestThread(starting_queue, stopping_queue)
+ child_thread = TestThreadStacks.Thread(starting_queue, stopping_queue)
child_thread.start()
+ broker._threads[child_thread.name()] = child_thread
msg = starting_queue.get()
- message_broker.log_wedged_worker(child_thread.name(),
- child_thread.id())
+ broker.log_wedged_worker(child_thread.name())
stopping_queue.put('')
child_thread.join(timeout=1.0)
- self.assertFalse(astream.empty())
self.assertFalse(child_thread.isAlive())
oc.restore_output()
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
index 0b11d9b..dd84788 100755
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
@@ -228,6 +228,15 @@ def summarize_unexpected_results(port_obj, expectations, result_summary,
return results
+class WorkerState(object):
+ """A class for the TestRunner/manager to use to track the current state
+ of the workers."""
+ def __init__(self, name, number, thread):
+ self.name = name
+ self.number = number
+ self.thread = thread
+
+
class TestRunner:
"""A class for managing running a series of tests on a series of layout
test files."""
@@ -240,19 +249,20 @@ class TestRunner:
# in DumpRenderTree.
DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
- def __init__(self, port, options, printer, message_broker):
+ def __init__(self, port, options, printer):
"""Initialize test runner data structures.
Args:
port: an object implementing port-specific
options: a dictionary of command line options
printer: a Printer object to record updates to.
- message_broker: object used to communicate with workers.
"""
self._port = port
self._options = options
self._printer = printer
- self._message_broker = message_broker
+
+ # This maps worker names to the state we are tracking for each of them.
+ self._workers = {}
# disable wss server. need to install pyOpenSSL on buildbots.
# self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -586,33 +596,39 @@ class TestRunner:
result_summary: summary object to populate with the results
"""
+ self._workers = {}
+
self._printer.print_update('Sharding tests ...')
num_workers = self._num_workers()
test_lists = self._shard_tests(file_list,
num_workers > 1 and not self._options.experimental_fully_parallel)
+
+ broker = message_broker.get(self._port, self._options)
+ self._message_broker = broker
+
filename_queue = Queue.Queue()
for item in test_lists:
filename_queue.put(item)
self._printer.print_update('Starting %s ...' %
grammar.pluralize('worker', num_workers))
- message_broker = self._message_broker
self._current_filename_queue = filename_queue
self._current_result_summary = result_summary
- if not self._options.dry_run:
- threads = message_broker.start_workers(self)
- else:
- threads = []
+ for worker_number in xrange(num_workers):
+ thread = broker.start_worker(self, worker_number)
+ w = WorkerState(thread.name(), worker_number, thread)
+ self._workers[thread.name()] = w
self._printer.print_update("Starting testing ...")
keyboard_interrupted = False
if not self._options.dry_run:
try:
- message_broker.run_message_loop()
+ broker.run_message_loop(self)
except KeyboardInterrupt:
_log.info("Interrupted, exiting")
- message_broker.cancel_workers()
+ for worker_name in self._workers.keys():
+ broker.cancel_worker(worker_name)
keyboard_interrupted = True
except:
# Unexpected exception; don't try to clean up workers.
@@ -620,21 +636,50 @@ class TestRunner:
raise
thread_timings, test_timings, individual_test_timings = \
- self._collect_timing_info(threads)
+ self._collect_timing_info(self._workers)
+ self._message_broker = None
return (keyboard_interrupted, thread_timings, test_timings,
individual_test_timings)
- def update(self):
- self.update_summary(self._current_result_summary)
-
- def _collect_timing_info(self, threads):
+ def _check_on_workers(self):
+ """Returns True iff all the workers have either completed or wedged."""
+
+ # Loop through all the threads waiting for them to finish.
+ some_thread_is_alive = True
+ while some_thread_is_alive:
+ some_thread_is_alive = False
+ t = time.time()
+ for worker in self._workers.values():
+ thread = worker.thread
+ exception_info = thread.exception_info()
+ if exception_info is not None:
+ # Re-raise the thread's exception here to make it
+ # clear that testing was aborted. Otherwise,
+ # the tests that did not run would be assumed
+ # to have passed.
+ raise exception_info[0], exception_info[1], exception_info[2]
+
+ if thread.isAlive():
+ some_thread_is_alive = True
+ next_timeout = thread.next_timeout()
+ if next_timeout and t > next_timeout:
+ self._message_broker.log_wedged_worker(worker.name)
+ thread.clear_next_timeout()
+
+ self.update_summary(self._current_result_summary)
+
+ if some_thread_is_alive:
+ time.sleep(0.01)
+
+ def _collect_timing_info(self, workers):
test_timings = {}
individual_test_timings = []
thread_timings = []
- for thread in threads:
- thread_timings.append({'name': thread.getName(),
+ for w in workers.values():
+ thread = w.thread
+ thread_timings.append({'name': thread.name(),
'num_tests': thread.get_num_tests(),
'total_time': thread.get_total_time()})
test_timings.update(thread.get_test_group_timing_stats())
@@ -1006,8 +1051,7 @@ class TestRunner:
result_summary):
"""Prints the run times for slow, timeout and crash tests.
Args:
- individual_test_timings: List of dump_render_tree_thread.TestStats
- for all tests.
+ individual_test_timings: List of TestStats for all tests.
result_summary: summary object for test run
"""
# Reverse-sort by the time spent in DumpRenderTree.
@@ -1295,13 +1339,11 @@ def run(port, options, args, regular_output=sys.stderr,
printer.cleanup()
return 0
- broker = message_broker.get(port, options)
-
# We wrap any parts of the run that are slow or likely to raise exceptions
# in a try/finally to ensure that we clean up the logging configuration.
num_unexpected_results = -1
try:
- test_runner = TestRunner(port, options, printer, broker)
+ test_runner = TestRunner(port, options, printer)
test_runner._print_config()
printer.print_update("Collecting tests ...")
@@ -1330,7 +1372,6 @@ def run(port, options, args, regular_output=sys.stderr,
_log.debug("Testing completed, Exit status: %d" %
num_unexpected_results)
finally:
- broker.cleanup()
printer.cleanup()
return num_unexpected_results
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
index 0ae9a09..d325a1c 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
@@ -489,7 +489,7 @@ class TestRunnerTest(unittest.TestCase):
mock_port.filename_to_uri = lambda name: name
runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
- printer=Mock(), message_broker=Mock())
+ printer=Mock())
expected_html = u"""<html>
<head>
<title>Layout Test Results (time)</title>
@@ -507,7 +507,7 @@ class TestRunnerTest(unittest.TestCase):
# Test that _shard_tests in run_webkit_tests.TestRunner really
# put the http tests first in the queue.
runner = TestRunnerWrapper(port=Mock(), options=Mock(),
- printer=Mock(), message_broker=Mock())
+ printer=Mock())
test_list = [
"LayoutTests/websocket/tests/unicode.htm",
--
WebKit Debian packaging
More information about the Pkg-webkit-commits
mailing list