[SCM] WebKit Debian packaging branch, debian/experimental, updated. upstream/1.3.3-9427-gc2be6fc
dpranke at chromium.org
dpranke at chromium.org
Wed Dec 22 16:30:41 UTC 2010
The following commit has been merged in the debian/experimental branch:
commit 70654d4c170ea3bd85bcb668ff068b101792873e
Author: dpranke at chromium.org <dpranke at chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Date: Wed Nov 24 21:57:09 2010 +0000
2010-11-24 Dirk Pranke <dpranke at chromium.org>
Reviewed by Tony Chang.
This patch implements the first part of the manager side of the
Broker objects - it handles creating threads, waiting for them
to complete, and running a single-threaded loop as well.
https://bugs.webkit.org/show_bug.cgi?id=49779
* Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
* Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
* Scripts/webkitpy/layout_tests/run_webkit_tests.py:
* Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
git-svn-id: http://svn.webkit.org/repository/webkit/trunk@72698 268f45cc-cd09-0410-ab3c-d52691b4dbfc
diff --git a/WebKitTools/ChangeLog b/WebKitTools/ChangeLog
index bafefa4..7a89f33 100644
--- a/WebKitTools/ChangeLog
+++ b/WebKitTools/ChangeLog
@@ -1,3 +1,18 @@
+2010-11-24 Dirk Pranke <dpranke at chromium.org>
+
+ Reviewed by Tony Chang.
+
+ This patch implements the first part of the manager side of the
+ Broker objects - it handles creating threads, waiting for them
+ to complete, and running a single-threaded loop as well.
+
+ https://bugs.webkit.org/show_bug.cgi?id=49779
+
+ * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
+ * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+ * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
2010-11-24 Mihai Parparita <mihaip at chromium.org>
Reviewed by David Levin.
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py
index 1bac7ef..38cf6c3 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py
@@ -244,7 +244,7 @@ class WatchableThread(threading.Thread):
class TestShellThread(WatchableThread):
- def __init__(self, port, options, worker_number,
+ def __init__(self, port, options, worker_number, worker_name,
filename_list_queue, result_queue):
"""Initialize all the local state for this DumpRenderTree thread.
@@ -252,6 +252,7 @@ class TestShellThread(WatchableThread):
port: interface to port-specific hooks
options: command line options argument from optparse
worker_number: identifier for a particular worker thread.
+ worker_name: for logging.
filename_list_queue: A thread safe Queue class that contains lists
of tuples of (filename, uri) pairs.
result_queue: A thread safe Queue class that will contain
@@ -261,7 +262,7 @@ class TestShellThread(WatchableThread):
self._port = port
self._options = options
self._worker_number = worker_number
- self._name = 'worker-%d' % worker_number
+ self._name = worker_name
self._filename_list_queue = filename_list_queue
self._result_queue = result_queue
self._filename_list = []
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
index 13951c0..e520a9c 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
@@ -28,25 +28,154 @@
"""Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
-The model we use is that of a message broker - it provides a messaging
-abstraction and message loops, and handles launching threads and/or processes
-depending on the requested configuration.
+Testing is accomplished by having a manager (TestRunner) gather all of the
+tests to be run, and sending messages to a pool of workers (TestShellThreads)
+to run each test. Each worker communicates with one driver (usually
+DumpRenderTree) to run one test at a time and then compare the output against
+what we expected to get.
+
+This modules provides a message broker that connects the manager to the
+workers: it provides a messaging abstraction and message loops, and
+handles launching threads and/or processes depending on the
+requested configuration.
"""
import logging
import sys
+import time
import traceback
+import dump_render_tree_thread
_log = logging.getLogger(__name__)
-def log_wedged_thread(id):
- """Log information about the given thread state."""
+def get(port, options):
+ """Return an instance of a WorkerMessageBroker."""
+ worker_model = options.worker_model
+ if worker_model == 'inline':
+ return InlineBroker(port, options)
+ if worker_model == 'threads':
+ return MultiThreadedBroker(port, options)
+ raise ValueError('unsupported value for --worker-model: %s' % worker_model)
+
+
+class _WorkerState(object):
+ def __init__(self, name):
+ self.name = name
+ self.thread = None
+
+
+class WorkerMessageBroker(object):
+ def __init__(self, port, options):
+ self._port = port
+ self._options = options
+ self._num_workers = int(self._options.child_processes)
+
+ # This maps worker names to their _WorkerState values.
+ self._workers = {}
+
+ def _threads(self):
+ return tuple([w.thread for w in self._workers.values()])
+
+ def start_workers(self, test_runner):
+ """Starts up the pool of workers for running the tests.
+
+ Args:
+ test_runner: a handle to the manager/TestRunner object
+ """
+ self._test_runner = test_runner
+ for worker_number in xrange(self._num_workers):
+ worker = _WorkerState('worker-%d' % worker_number)
+ worker.thread = self._start_worker(worker_number, worker.name)
+ self._workers[worker.name] = worker
+ return self._threads()
+
+ def _start_worker(self, worker_number, worker_name):
+ raise NotImplementedError
+
+ def run_message_loop(self):
+ """Loop processing messages until done."""
+ raise NotImplementedError
+
+ def cancel_workers(self):
+ """Cancel/interrupt any workers that are still alive."""
+ pass
+
+ def cleanup(self):
+ """Perform any necessary cleanup on shutdown."""
+ pass
+
+
+class InlineBroker(WorkerMessageBroker):
+ def _start_worker(self, worker_number, worker_name):
+ # FIXME: Replace with something that isn't a thread.
+ thread = dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number, worker_name,
+ self._test_runner._current_filename_queue,
+ self._test_runner._result_queue)
+ # Note: Don't start() the thread! If we did, it would actually
+ # create another thread and start executing it, and we'd no longer
+ # be single-threaded.
+ return thread
+
+ def run_message_loop(self):
+ thread = self._threads()[0]
+ thread.run_in_main_thread(self._test_runner,
+ self._test_runner._current_result_summary)
+ self._test_runner.update()
+
+
+class MultiThreadedBroker(WorkerMessageBroker):
+ def _start_worker(self, worker_number, worker_name):
+ thread = dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number, worker_name,
+ self._test_runner._current_filename_queue,
+ self._test_runner._result_queue)
+ thread.start()
+ return thread
+
+ def run_message_loop(self):
+ threads = self._threads()
+
+ # Loop through all the threads waiting for them to finish.
+ some_thread_is_alive = True
+ while some_thread_is_alive:
+ some_thread_is_alive = False
+ t = time.time()
+ for thread in threads:
+ exception_info = thread.exception_info()
+ if exception_info is not None:
+ # Re-raise the thread's exception here to make it
+ # clear that testing was aborted. Otherwise,
+ # the tests that did not run would be assumed
+ # to have passed.
+ raise exception_info[0], exception_info[1], exception_info[2]
+
+ if thread.isAlive():
+ some_thread_is_alive = True
+ next_timeout = thread.next_timeout()
+ if next_timeout and t > next_timeout:
+ log_wedged_worker(thread.getName(), thread.id())
+ thread.clear_next_timeout()
+
+ self._test_runner.update()
+
+ if some_thread_is_alive:
+ time.sleep(0.01)
+
+ def cancel_workers(self):
+ threads = self._threads()
+ for thread in threads:
+ thread.cancel()
+
+
+def log_wedged_worker(name, id):
+ """Log information about the given worker state."""
stack = _find_thread_stack(id)
assert(stack is not None)
_log.error("")
- _log.error("Thread %d is wedged" % id)
+ _log.error("%s (tid %d) is wedged" % (name, id))
_log_stack(stack)
_log.error("")
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
index d708745..6f04fd3 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
@@ -30,25 +30,34 @@ import logging
import Queue
import sys
import thread
+import threading
import time
import unittest
from webkitpy.common import array_stream
from webkitpy.common.system import outputcapture
+from webkitpy.tool import mocktool
from webkitpy.layout_tests import run_webkit_tests
-import dump_render_tree_thread
import message_broker
-class TestThread(dump_render_tree_thread.WatchableThread):
+class TestThread(threading.Thread):
def __init__(self, started_queue, stopping_queue):
- dump_render_tree_thread.WatchableThread.__init__(self)
+ threading.Thread.__init__(self)
+ self._thread_id = None
self._started_queue = started_queue
self._stopping_queue = stopping_queue
self._timeout = False
self._timeout_queue = Queue.Queue()
+ self._exception_info = None
+
+ def id(self):
+ return self._thread_id
+
+ def getName(self):
+ return "worker-0"
def run(self):
self._covered_run()
@@ -70,12 +79,17 @@ class TestThread(dump_render_tree_thread.WatchableThread):
except:
self._exception_info = sys.exc_info()
+ def exception_info(self):
+ return self._exception_info
+
def next_timeout(self):
if self._timeout:
self._timeout_queue.put('done')
return time.time() - 10
return time.time()
+ def clear_next_timeout(self):
+ self._next_timeout = None
class TestHandler(logging.Handler):
def __init__(self, astream):
@@ -86,35 +100,39 @@ class TestHandler(logging.Handler):
self._stream.write(self.format(record))
-class WaitForThreadsToFinishTest(unittest.TestCase):
- class MockTestRunner(run_webkit_tests.TestRunner):
+class MultiThreadedBrokerTest(unittest.TestCase):
+ class MockTestRunner(object):
def __init__(self):
pass
def __del__(self):
pass
- def update_summary(self, result_summary):
+ def update(self):
pass
def run_one_thread(self, msg):
runner = self.MockTestRunner()
+ port = None
+ options = mocktool.MockOptions(child_processes='1')
starting_queue = Queue.Queue()
stopping_queue = Queue.Queue()
+ broker = message_broker.MultiThreadedBroker(port, options)
+ broker._test_runner = runner
child_thread = TestThread(starting_queue, stopping_queue)
+ broker._workers['worker-0'] = message_broker._WorkerState('worker-0')
+ broker._workers['worker-0'].thread = child_thread
child_thread.start()
started_msg = starting_queue.get()
stopping_queue.put(msg)
- threads = [child_thread]
- return runner._wait_for_threads_to_finish(threads, None)
+ return broker.run_message_loop()
def test_basic(self):
interrupted = self.run_one_thread('')
self.assertFalse(interrupted)
def test_interrupt(self):
- interrupted = self.run_one_thread('KeyboardInterrupt')
- self.assertTrue(interrupted)
+ self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
def test_timeout(self):
oc = outputcapture.OutputCapture()
@@ -137,7 +155,7 @@ class Test(unittest.TestCase):
found_stack = message_broker._find_thread_stack(0)
self.assertEqual(found_stack, None)
- def test_log_wedged_thread(self):
+ def test_log_wedged_worker(self):
oc = outputcapture.OutputCapture()
oc.capture_output()
logger = message_broker._log
@@ -151,7 +169,8 @@ class Test(unittest.TestCase):
child_thread.start()
msg = starting_queue.get()
- message_broker.log_wedged_thread(child_thread.id())
+ message_broker.log_wedged_worker(child_thread.getName(),
+ child_thread.id())
stopping_queue.put('')
child_thread.join(timeout=1.0)
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
index 1ceaee9..d78e452 100755
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
@@ -75,6 +75,7 @@ from layout_package import test_results_uploader
from webkitpy.common.system import user
from webkitpy.thirdparty import simplejson
+from webkitpy.tool import grammar
import port
@@ -239,17 +240,19 @@ class TestRunner:
# in DumpRenderTree.
DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
- def __init__(self, port, options, printer):
+ def __init__(self, port, options, printer, message_broker):
"""Initialize test runner data structures.
Args:
port: an object implementing port-specific
options: a dictionary of command line options
printer: a Printer object to record updates to.
+ message_broker: object used to communicate with workers.
"""
self._port = port
self._options = options
self._printer = printer
+ self._message_broker = message_broker
# disable wss server. need to install pyOpenSSL on buildbots.
# self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -565,34 +568,6 @@ class TestRunner:
return True
return False
- def _instantiate_dump_render_tree_threads(self, test_files,
- result_summary):
- """Instantitates and starts the TestShellThread(s).
-
- Return:
- The list of threads.
- """
- num_workers = self._num_workers()
- test_lists = self._shard_tests(test_files,
- num_workers > 1 and not self._options.experimental_fully_parallel)
- filename_queue = Queue.Queue()
- for item in test_lists:
- filename_queue.put(item)
-
- # Instantiate TestShellThreads and start them.
- threads = []
- for worker_number in xrange(num_workers):
- thread = dump_render_tree_thread.TestShellThread(self._port,
- self._options, worker_number,
- filename_queue, self._result_queue)
- if num_workers > 1:
- thread.start()
- else:
- thread.run_in_main_thread(self, result_summary)
- threads.append(thread)
-
- return threads
-
def _num_workers(self):
return int(self._options.child_processes)
@@ -610,58 +585,43 @@ class TestRunner:
in the form {filename:filename, test_run_time:test_run_time}
result_summary: summary object to populate with the results
"""
- plural = ""
- if self._num_workers() > 1:
- plural = "s"
- self._printer.print_update('Starting %s%s ...' %
- (self._port.driver_name(), plural))
- threads = self._instantiate_dump_render_tree_threads(file_list,
- result_summary)
- self._printer.print_update("Starting testing ...")
- keyboard_interrupted = self._wait_for_threads_to_finish(threads,
- result_summary)
- (thread_timings, test_timings, individual_test_timings) = \
- self._collect_timing_info(threads)
+ self._printer.print_update('Sharding tests ...')
+ num_workers = self._num_workers()
+ test_lists = self._shard_tests(file_list,
+ num_workers > 1 and not self._options.experimental_fully_parallel)
+ filename_queue = Queue.Queue()
+ for item in test_lists:
+ filename_queue.put(item)
- return (keyboard_interrupted, thread_timings, test_timings,
- individual_test_timings)
+ self._printer.print_update('Starting %s ...' %
+ grammar.pluralize('worker', num_workers))
+ message_broker = self._message_broker
+ self._current_filename_queue = filename_queue
+ self._current_result_summary = result_summary
+ threads = message_broker.start_workers(self)
- def _wait_for_threads_to_finish(self, threads, result_summary):
+ self._printer.print_update("Starting testing ...")
keyboard_interrupted = False
try:
- # Loop through all the threads waiting for them to finish.
- some_thread_is_alive = True
- while some_thread_is_alive:
- some_thread_is_alive = False
- t = time.time()
- for thread in threads:
- exception_info = thread.exception_info()
- if exception_info is not None:
- # Re-raise the thread's exception here to make it
- # clear that testing was aborted. Otherwise,
- # the tests that did not run would be assumed
- # to have passed.
- raise exception_info[0], exception_info[1], exception_info[2]
-
- if thread.isAlive():
- some_thread_is_alive = True
- next_timeout = thread.next_timeout()
- if (next_timeout and t > next_timeout):
- message_broker.log_wedged_thread(thread.id())
- thread.clear_next_timeout()
-
- self.update_summary(result_summary)
-
- if some_thread_is_alive:
- time.sleep(0.01)
-
+ message_broker.run_message_loop()
except KeyboardInterrupt:
+ _log.info("Interrupted, exiting")
+ message_broker.cancel_workers()
keyboard_interrupted = True
- for thread in threads:
- thread.cancel()
+ except:
+ # Unexpected exception; don't try to clean up workers.
+ _log.info("Exception raised, exiting")
+ raise
- return keyboard_interrupted
+ thread_timings, test_timings, individual_test_timings = \
+ self._collect_timing_info(threads)
+
+ return (keyboard_interrupted, thread_timings, test_timings,
+ individual_test_timings)
+
+ def update(self):
+ self.update_summary(self._current_result_summary)
def _collect_timing_info(self, threads):
test_timings = {}
@@ -1326,11 +1286,13 @@ def run(port, options, args, regular_output=sys.stderr,
printer.cleanup()
return 0
+ broker = message_broker.get(port, options)
+
# We wrap any parts of the run that are slow or likely to raise exceptions
# in a try/finally to ensure that we clean up the logging configuration.
num_unexpected_results = -1
try:
- test_runner = TestRunner(port, options, printer)
+ test_runner = TestRunner(port, options, printer, broker)
test_runner._print_config()
printer.print_update("Collecting tests ...")
@@ -1359,6 +1321,7 @@ def run(port, options, args, regular_output=sys.stderr,
_log.debug("Testing completed, Exit status: %d" %
num_unexpected_results)
finally:
+ broker.cleanup()
printer.cleanup()
return num_unexpected_results
@@ -1595,9 +1558,8 @@ def parse_args(args=None):
help="Number of DumpRenderTrees to run in parallel."),
# FIXME: Display default number of child processes that will run.
optparse.make_option("--worker-model", action="store",
- default="threads",
- help="controls worker model. Valid values are "
- "'inline' and 'threads' (default)."),
+ default="threads", help=("controls worker model. Valid values are "
+ "'inline' and 'threads' (default).")),
optparse.make_option("--experimental-fully-parallel",
action="store_true", default=False,
help="run all tests in parallel"),
@@ -1653,13 +1615,7 @@ def parse_args(args=None):
old_run_webkit_tests_compat)
option_parser = optparse.OptionParser(option_list=option_list)
- options, args = option_parser.parse_args(args)
-
- if options.worker_model not in ('inline', 'threads'):
- option_parser.error("unsupported value for --worker-model: %s" %
- options.worker_model)
-
- return options, args
+ return option_parser.parse_args(args)
def main():
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
index 09a0c71..b3c8861 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
@@ -103,8 +103,8 @@ def logging_run(extra_args=None, port_obj=None, tests_included=False):
'websocket/tests',
'failures/expected/*'])
+ oc = outputcapture.OutputCapture()
try:
- oc = outputcapture.OutputCapture()
oc.capture_output()
options, parsed_args = run_webkit_tests.parse_args(args)
user = MockUser()
@@ -226,7 +226,7 @@ class MainTest(unittest.TestCase):
def test_keyboard_interrupt(self):
# Note that this also tests running a test marked as SKIP if
# you specify it explicitly.
- self.assertRaises(KeyboardInterrupt, passing_run,
+ self.assertRaises(KeyboardInterrupt, logging_run,
['failures/expected/keyboard.html'], tests_included=True)
def test_last_results(self):
@@ -378,11 +378,11 @@ class MainTest(unittest.TestCase):
self.assertTrue(passing_run(['--worker-model', 'threads']))
def test_worker_model__processes(self):
- self.assertRaises(SystemExit, logging_run,
+ self.assertRaises(ValueError, logging_run,
['--worker-model', 'processes'])
def test_worker_model__unknown(self):
- self.assertRaises(SystemExit, logging_run,
+ self.assertRaises(ValueError, logging_run,
['--worker-model', 'unknown'])
MainTest = skip_if(MainTest, sys.platform == 'cygwin' and compare_version(sys, '2.6')[0] < 0, 'new-run-webkit-tests tests hang on Cygwin Python 2.5.2')
@@ -466,7 +466,8 @@ class TestRunnerTest(unittest.TestCase):
mock_port.relative_test_filename = lambda name: name
mock_port.filename_to_uri = lambda name: name
- runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(), printer=Mock())
+ runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
+ printer=Mock(), message_broker=Mock())
expected_html = u"""<html>
<head>
<title>Layout Test Results (time)</title>
@@ -483,7 +484,8 @@ class TestRunnerTest(unittest.TestCase):
def test_shard_tests(self):
# Test that _shard_tests in run_webkit_tests.TestRunner really
# put the http tests first in the queue.
- runner = TestRunnerWrapper(port=Mock(), options=Mock(), printer=Mock())
+ runner = TestRunnerWrapper(port=Mock(), options=Mock(),
+ printer=Mock(), message_broker=Mock())
test_list = [
"LayoutTests/websocket/tests/unicode.htm",
--
WebKit Debian packaging
More information about the Pkg-webkit-commits
mailing list