[SCM] WebKit Debian packaging branch, debian/experimental, updated. upstream/1.3.3-10851-g50815da

dpranke at chromium.org dpranke at chromium.org
Wed Dec 22 17:56:55 UTC 2010


The following commit has been merged in the debian/experimental branch:
commit 5c6ea7c082016c1880aa45632b3d5c13b3ca0bed
Author: dpranke at chromium.org <dpranke at chromium.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Date:   Fri Dec 3 03:41:43 2010 +0000

    2010-12-02  Dirk Pranke  <dpranke at chromium.org>
    
            Reviewed by Tony Chang.
    
            nrwt multiprocessing - move logic back into run_webkit_tests
    
            This change moves a bunch of logic that I had put into
            message_broker back into run_webkit_tests, in a slightly
            different format. WorkerMessageBroker needed to become less aware of
            the logic the TestRunner class uses, and more generic.
            Eventually the MessageBroker will only do generic messaging and
            thread/process-pooling, and (almost) all of the
            run-webkit-tests-specific logic will be moved to
            run_webkit_tests.py and dump_render_tree_thread.py.
    
            The biggest changes are that the Broker can now start a single
            worker, but the responsibility for starting all of them is pushed
            back to the TestRunner (Manager), and the logic for checking if
            the threads are done or wedged is moved back to TestRunner. We
            also remove WorkerMessageBroker.cleanup (not needed) and
            cancel_workers (they have to be cancelled individually).
    
            The  message_broker is now encapsulated inside
            TestRunner._run_tests(); it only needs to exist while actually
            running the tests.
    
            Also, delete a bunch of tests in message_broker_unittest that no
            longer make much sense.
    
            This patch depends on bug 50372.
    
            https://bugs.webkit.org/show_bug.cgi?id=50374
    
            * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
            * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
            * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
            * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
            * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
    
    git-svn-id: http://svn.webkit.org/repository/webkit/trunk@73231 268f45cc-cd09-0410-ab3c-d52691b4dbfc

diff --git a/WebKitTools/ChangeLog b/WebKitTools/ChangeLog
index d675077..2de59e7 100644
--- a/WebKitTools/ChangeLog
+++ b/WebKitTools/ChangeLog
@@ -1,3 +1,42 @@
+2010-12-02  Dirk Pranke  <dpranke at chromium.org>
+
+        Reviewed by Tony Chang.
+
+        nrwt multiprocessing - move logic back into run_webkit_tests
+
+        This change moves a bunch of logic that I had put into
+        message_broker back into run_webkit_tests, in a slightly
+        different format. WorkerMessageBroker needed to become less aware of
+        the logic the TestRunner class uses, and more generic.
+        Eventually the MessageBroker will only do generic messaging and
+        thread/process-pooling, and (almost) all of the
+        run-webkit-tests-specific logic will be moved to
+        run_webkit_tests.py and dump_render_tree_thread.py.
+        
+        The biggest changes are that the Broker can now start a single
+        worker, but the responsibility for starting all of them is pushed
+        back to the TestRunner (Manager), and the logic for checking if
+        the threads are done or wedged is moved back to TestRunner. We
+        also remove WorkerMessageBroker.cleanup (not needed) and
+        cancel_workers (they have to be cancelled individually).
+        
+        The  message_broker is now encapsulated inside
+        TestRunner._run_tests(); it only needs to exist while actually
+        running the tests.
+
+        Also, delete a bunch of tests in message_broker_unittest that no
+        longer make much sense.
+
+        This patch depends on bug 50372.
+
+        https://bugs.webkit.org/show_bug.cgi?id=50374
+
+        * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
+        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
+        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
+
 2010-12-02  Hayato Ito  <hayato at chromium.org>
 
         Reviewed by Eric Seidel.
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
index 285b2e7..bc3d336 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
@@ -64,114 +64,88 @@ class _WorkerMessageBroker(object):
     def __init__(self, port, options):
         self._port = port
         self._options = options
-        self._num_workers = int(self._options.child_processes)
 
-        # This maps worker names to their TestShellThread objects.
+        # This maps worker_names to TestShellThreads
         self._threads = {}
 
-    def start_workers(self, test_runner):
-        """Starts up the pool of workers for running the tests.
-
-        Args:
-            test_runner: a handle to the manager/TestRunner object
-        """
-        self._test_runner = test_runner
-        for worker_number in xrange(self._num_workers):
-            thread = self.start_worker(worker_number)
-            self._threads[thread.name()] = thread
-        return self._threads.values()
-
-    def start_worker(self, worker_number):
-        # FIXME: Replace with something that isn't a thread.
+    def start_worker(self, test_runner, worker_number):
+        """Start a worker with the given index number.
+
+        Returns the actual TestShellThread object."""
+        # FIXME: Remove dependencies on test_runner.
+        # FIXME: Replace with something that isn't a thread, and return
+        # the name of the worker, not the thread itself. We need to return
+        # the thread itself for now to allow TestRunner to access the object
+        # directly to read shared state.
+        thread = dump_render_tree_thread.TestShellThread(self._port,
+            self._options, worker_number, test_runner._current_filename_queue,
+            test_runner._result_queue)
+        self._threads[thread.name()] = thread
         # Note: Don't start() the thread! If we did, it would actually
         # create another thread and start executing it, and we'd no longer
         # be single-threaded.
-        return dump_render_tree_thread.TestShellThread(self._port,
-            self._options, worker_number,
-            self._test_runner._current_filename_queue,
-            self._test_runner._result_queue)
+        return thread
 
-    def run_message_loop(self):
-        """Loop processing messages until done."""
-        raise NotImplementedError
+    def cancel_worker(self, worker_name):
+        """Attempt to cancel a worker (best-effort). The worker may still be
+        running after this call returns."""
+        self._threads[worker_name].cancel()
 
-    def cancel_workers(self):
-        """Cancel/interrupt any workers that are still alive."""
-        pass
+    def log_wedged_worker(self, worker_name):
+        """Log information about the given worker's state."""
+        raise NotImplementedError
 
-    def cleanup(self):
-        """Perform any necessary cleanup on shutdown."""
-        pass
+    def run_message_loop(self, test_runner):
+        """Loop processing messages until done."""
+        # FIXME: eventually we'll need a message loop that the workers
+        # can also call.
+        raise NotImplementedError
 
 
 class _InlineBroker(_WorkerMessageBroker):
-    def run_message_loop(self):
+    def run_message_loop(self, test_runner):
         thread = self._threads.values()[0]
-        thread.run_in_main_thread(self._test_runner,
-                                  self._test_runner._current_result_summary)
-        self._test_runner.update()
+        thread.run_in_main_thread(test_runner,
+                                  test_runner._current_result_summary)
+
+    def log_wedged_worker(self, worker_name):
+        raise AssertionError('_InlineBroker.log_wedged_worker() called')
 
 
 class _MultiThreadedBroker(_WorkerMessageBroker):
-    def start_worker(self, worker_number):
-        thread = _WorkerMessageBroker.start_worker(self, worker_number)
+    def start_worker(self, test_runner, worker_number):
+        thread = _WorkerMessageBroker.start_worker(self, test_runner,
+                                                   worker_number)
+        # Unlike the base implementation, here we actually want to start
+        # the thread.
         thread.start()
         return thread
 
-    def run_message_loop(self):
-        # Loop through all the threads waiting for them to finish.
-        some_thread_is_alive = True
-        while some_thread_is_alive:
-            some_thread_is_alive = False
-            t = time.time()
-            for thread in self._threads.values():
-                exception_info = thread.exception_info()
-                if exception_info is not None:
-                    # Re-raise the thread's exception here to make it
-                    # clear that testing was aborted. Otherwise,
-                    # the tests that did not run would be assumed
-                    # to have passed.
-                    raise exception_info[0], exception_info[1], exception_info[2]
-
-                if thread.isAlive():
-                    some_thread_is_alive = True
-                    next_timeout = thread.next_timeout()
-                    if next_timeout and t > next_timeout:
-                        log_wedged_worker(thread.name(), thread.id())
-                        thread.clear_next_timeout()
-
-            self._test_runner.update()
-
-            if some_thread_is_alive:
-                time.sleep(0.01)
-
-    def cancel_workers(self):
-        for thread in self._threads.values():
-            thread.cancel()
-
-
-def log_wedged_worker(name, id):
-    """Log information about the given worker state."""
-    stack = _find_thread_stack(id)
-    assert(stack is not None)
-    _log.error("")
-    _log.error("%s (tid %d) is wedged" % (name, id))
-    _log_stack(stack)
-    _log.error("")
-
-
-def _find_thread_stack(id):
-    """Returns a stack object that can be used to dump a stack trace for
-    the given thread id (or None if the id is not found)."""
-    for thread_id, stack in sys._current_frames().items():
-        if thread_id == id:
-            return stack
-    return None
-
-
-def _log_stack(stack):
-    """Log a stack trace to log.error()."""
-    for filename, lineno, name, line in traceback.extract_stack(stack):
-        _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
-        if line:
-            _log.error('  %s' % line.strip())
+    def run_message_loop(self, test_runner):
+        # FIXME: Remove the dependencies on test_runner. Checking on workers
+        # should be done via a timer firing.
+        test_runner._check_on_workers()
+
+    def log_wedged_worker(self, worker_name):
+        thread = self._threads[worker_name]
+        stack = self._find_thread_stack(thread.id())
+        assert(stack is not None)
+        _log.error("")
+        _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
+        self._log_stack(stack)
+        _log.error("")
+
+    def _find_thread_stack(self, id):
+        """Returns a stack object that can be used to dump a stack trace for
+        the given thread id (or None if the id is not found)."""
+        for thread_id, stack in sys._current_frames().items():
+            if thread_id == id:
+                return stack
+        return None
+
+    def _log_stack(self, stack):
+        """Log a stack trace to log.error()."""
+        for filename, lineno, name, line in traceback.extract_stack(stack):
+            _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
+            if line:
+                _log.error('  %s' % line.strip())
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
index c006471..d46df4c 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
@@ -42,139 +42,60 @@ from webkitpy.layout_tests import run_webkit_tests
 
 import message_broker
 
+# FIXME: Boy do we need a lot more tests here ...
 
-class TestThread(threading.Thread):
-    def __init__(self, started_queue, stopping_queue):
-        threading.Thread.__init__(self)
-        self._id = None
-        self._started_queue = started_queue
-        self._stopping_queue = stopping_queue
-        self._timeout = False
-        self._timeout_queue = Queue.Queue()
-        self._exception_info = None
-
-    def id(self):
-        return self._id
-
-    def name(self):
-        return 'worker/0'
-
-    def run(self):
-        self._covered_run()
-
-    def _covered_run(self):
-        # FIXME: this is a separate routine to work around a bug
-        # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
-        self._id = thread.get_ident()
-        try:
-            self._started_queue.put('')
-            msg = self._stopping_queue.get()
-            if msg == 'KeyboardInterrupt':
-                raise KeyboardInterrupt
-            elif msg == 'Exception':
-                raise ValueError()
-            elif msg == 'Timeout':
-                self._timeout = True
-                self._timeout_queue.get()
-        except:
-            self._exception_info = sys.exc_info()
-
-    def exception_info(self):
-        return self._exception_info
-
-    def next_timeout(self):
-        if self._timeout:
-            self._timeout_queue.put('done')
-            return time.time() - 10
-        return time.time()
-
-    def clear_next_timeout(self):
-        self._next_timeout = None
-
-class TestHandler(logging.Handler):
-    def __init__(self, astream):
-        logging.Handler.__init__(self)
-        self._stream = astream
-
-    def emit(self, record):
-        self._stream.write(self.format(record))
-
-
-class MultiThreadedBrokerTest(unittest.TestCase):
-    class MockTestRunner(object):
-        def __init__(self):
-            pass
-
-        def __del__(self):
-            pass
-
-        def update(self):
-            pass
-
-    def run_one_thread(self, msg):
-        runner = self.MockTestRunner()
-        port = None
-        options = mocktool.MockOptions(child_processes='1')
-        starting_queue = Queue.Queue()
-        stopping_queue = Queue.Queue()
-        broker = message_broker._MultiThreadedBroker(port, options)
-        broker._test_runner = runner
-        child_thread = TestThread(starting_queue, stopping_queue)
-        name = child_thread.name()
-        broker._threads[name] = child_thread
-        child_thread.start()
-        started_msg = starting_queue.get()
-        stopping_queue.put(msg)
-        return broker.run_message_loop()
 
-    def test_basic(self):
-        interrupted = self.run_one_thread('')
-        self.assertFalse(interrupted)
+class TestThreadStacks(unittest.TestCase):
+    class Thread(threading.Thread):
+        def __init__(self, started_queue, stopping_queue):
+            threading.Thread.__init__(self)
+            self._id = None
+            self._started_queue = started_queue
+            self._stopping_queue = stopping_queue
 
-    def test_interrupt(self):
-        self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
+        def id(self):
+            return self._id
 
-    def test_timeout(self):
-        oc = outputcapture.OutputCapture()
-        oc.capture_output()
-        interrupted = self.run_one_thread('Timeout')
-        self.assertFalse(interrupted)
-        oc.restore_output()
+        def name(self):
+            return 'worker/0'
 
-    def test_exception(self):
-        self.assertRaises(ValueError, self.run_one_thread, 'Exception')
+        def run(self):
+            self._id = thread.get_ident()
+            self._started_queue.put('')
+            msg = self._stopping_queue.get()
 
+    def make_broker(self):
+        options = mocktool.MockOptions()
+        return message_broker._MultiThreadedBroker(port=None,
+                                                     options=options)
 
-class Test(unittest.TestCase):
     def test_find_thread_stack_found(self):
+        broker = self.make_broker()
         id, stack = sys._current_frames().items()[0]
-        found_stack = message_broker._find_thread_stack(id)
+        found_stack = broker._find_thread_stack(id)
         self.assertNotEqual(found_stack, None)
 
     def test_find_thread_stack_not_found(self):
-        found_stack = message_broker._find_thread_stack(0)
+        broker = self.make_broker()
+        found_stack = broker._find_thread_stack(0)
         self.assertEqual(found_stack, None)
 
     def test_log_wedged_worker(self):
+        broker = self.make_broker()
         oc = outputcapture.OutputCapture()
         oc.capture_output()
-        logger = message_broker._log
-        astream = array_stream.ArrayStream()
-        handler = TestHandler(astream)
-        logger.addHandler(handler)
 
         starting_queue = Queue.Queue()
         stopping_queue = Queue.Queue()
-        child_thread = TestThread(starting_queue, stopping_queue)
+        child_thread = TestThreadStacks.Thread(starting_queue, stopping_queue)
         child_thread.start()
+        broker._threads[child_thread.name()] = child_thread
         msg = starting_queue.get()
 
-        message_broker.log_wedged_worker(child_thread.name(),
-                                         child_thread.id())
+        broker.log_wedged_worker(child_thread.name())
         stopping_queue.put('')
         child_thread.join(timeout=1.0)
 
-        self.assertFalse(astream.empty())
         self.assertFalse(child_thread.isAlive())
         oc.restore_output()
 
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
index 0b11d9b..dd84788 100755
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
@@ -228,6 +228,15 @@ def summarize_unexpected_results(port_obj, expectations, result_summary,
     return results
 
 
+class WorkerState(object):
+    """A class for the TestRunner/manager to use to track the current state
+    of the workers."""
+    def __init__(self, name, number, thread):
+        self.name = name
+        self.number = number
+        self.thread = thread
+
+
 class TestRunner:
     """A class for managing running a series of tests on a series of layout
     test files."""
@@ -240,19 +249,20 @@ class TestRunner:
     # in DumpRenderTree.
     DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
 
-    def __init__(self, port, options, printer, message_broker):
+    def __init__(self, port, options, printer):
         """Initialize test runner data structures.
 
         Args:
           port: an object implementing port-specific
           options: a dictionary of command line options
           printer: a Printer object to record updates to.
-          message_broker: object used to communicate with workers.
         """
         self._port = port
         self._options = options
         self._printer = printer
-        self._message_broker = message_broker
+
+        # This maps worker names to the state we are tracking for each of them.
+        self._workers = {}
 
         # disable wss server. need to install pyOpenSSL on buildbots.
         # self._websocket_secure_server = websocket_server.PyWebSocket(
@@ -586,33 +596,39 @@ class TestRunner:
             result_summary: summary object to populate with the results
         """
 
+        self._workers = {}
+
         self._printer.print_update('Sharding tests ...')
         num_workers = self._num_workers()
         test_lists = self._shard_tests(file_list,
             num_workers > 1 and not self._options.experimental_fully_parallel)
+
+        broker = message_broker.get(self._port, self._options)
+        self._message_broker = broker
+
         filename_queue = Queue.Queue()
         for item in test_lists:
             filename_queue.put(item)
 
         self._printer.print_update('Starting %s ...' %
                                    grammar.pluralize('worker', num_workers))
-        message_broker = self._message_broker
         self._current_filename_queue = filename_queue
         self._current_result_summary = result_summary
 
-        if not self._options.dry_run:
-            threads = message_broker.start_workers(self)
-        else:
-            threads = []
+        for worker_number in xrange(num_workers):
+            thread = broker.start_worker(self, worker_number)
+            w = WorkerState(thread.name(), worker_number, thread)
+            self._workers[thread.name()] = w
 
         self._printer.print_update("Starting testing ...")
         keyboard_interrupted = False
         if not self._options.dry_run:
             try:
-                message_broker.run_message_loop()
+                broker.run_message_loop(self)
             except KeyboardInterrupt:
                 _log.info("Interrupted, exiting")
-                message_broker.cancel_workers()
+                for worker_name in self._workers.keys():
+                    broker.cancel_worker(worker_name)
                 keyboard_interrupted = True
             except:
                 # Unexpected exception; don't try to clean up workers.
@@ -620,21 +636,50 @@ class TestRunner:
                 raise
 
         thread_timings, test_timings, individual_test_timings = \
-            self._collect_timing_info(threads)
+            self._collect_timing_info(self._workers)
+        self._message_broker = None
 
         return (keyboard_interrupted, thread_timings, test_timings,
                 individual_test_timings)
 
-    def update(self):
-        self.update_summary(self._current_result_summary)
-
-    def _collect_timing_info(self, threads):
+    def _check_on_workers(self):
+        """Returns True iff all the workers have either completed or wedged."""
+
+        # Loop through all the threads waiting for them to finish.
+        some_thread_is_alive = True
+        while some_thread_is_alive:
+            some_thread_is_alive = False
+            t = time.time()
+            for worker in self._workers.values():
+                thread = worker.thread
+                exception_info = thread.exception_info()
+                if exception_info is not None:
+                    # Re-raise the thread's exception here to make it
+                    # clear that testing was aborted. Otherwise,
+                    # the tests that did not run would be assumed
+                    # to have passed.
+                    raise exception_info[0], exception_info[1], exception_info[2]
+
+                if thread.isAlive():
+                    some_thread_is_alive = True
+                    next_timeout = thread.next_timeout()
+                    if next_timeout and t > next_timeout:
+                        self._message_broker.log_wedged_worker(worker.name)
+                        thread.clear_next_timeout()
+
+            self.update_summary(self._current_result_summary)
+
+            if some_thread_is_alive:
+                time.sleep(0.01)
+
+    def _collect_timing_info(self, workers):
         test_timings = {}
         individual_test_timings = []
         thread_timings = []
 
-        for thread in threads:
-            thread_timings.append({'name': thread.getName(),
+        for w in workers.values():
+            thread = w.thread
+            thread_timings.append({'name': thread.name(),
                                    'num_tests': thread.get_num_tests(),
                                    'total_time': thread.get_total_time()})
             test_timings.update(thread.get_test_group_timing_stats())
@@ -1006,8 +1051,7 @@ class TestRunner:
                                   result_summary):
         """Prints the run times for slow, timeout and crash tests.
         Args:
-          individual_test_timings: List of dump_render_tree_thread.TestStats
-              for all tests.
+          individual_test_timings: List of TestStats for all tests.
           result_summary: summary object for test run
         """
         # Reverse-sort by the time spent in DumpRenderTree.
@@ -1295,13 +1339,11 @@ def run(port, options, args, regular_output=sys.stderr,
         printer.cleanup()
         return 0
 
-    broker = message_broker.get(port, options)
-
     # We wrap any parts of the run that are slow or likely to raise exceptions
     # in a try/finally to ensure that we clean up the logging configuration.
     num_unexpected_results = -1
     try:
-        test_runner = TestRunner(port, options, printer, broker)
+        test_runner = TestRunner(port, options, printer)
         test_runner._print_config()
 
         printer.print_update("Collecting tests ...")
@@ -1330,7 +1372,6 @@ def run(port, options, args, regular_output=sys.stderr,
             _log.debug("Testing completed, Exit status: %d" %
                        num_unexpected_results)
     finally:
-        broker.cleanup()
         printer.cleanup()
 
     return num_unexpected_results
diff --git a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
index 0ae9a09..d325a1c 100644
--- a/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
+++ b/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
@@ -489,7 +489,7 @@ class TestRunnerTest(unittest.TestCase):
         mock_port.filename_to_uri = lambda name: name
 
         runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
-            printer=Mock(), message_broker=Mock())
+                                             printer=Mock())
         expected_html = u"""<html>
   <head>
     <title>Layout Test Results (time)</title>
@@ -507,7 +507,7 @@ class TestRunnerTest(unittest.TestCase):
         # Test that _shard_tests in run_webkit_tests.TestRunner really
         # put the http tests first in the queue.
         runner = TestRunnerWrapper(port=Mock(), options=Mock(),
-            printer=Mock(), message_broker=Mock())
+                                   printer=Mock())
 
         test_list = [
           "LayoutTests/websocket/tests/unicode.htm",

-- 
WebKit Debian packaging



More information about the Pkg-webkit-commits mailing list