[Reproducible-commits] [diffoscope] 01/01: Work-in-progress: implement parallel processing

Jérémy Bobbio lunar at moszumanska.debian.org
Fri Aug 21 15:57:29 UTC 2015


This is an automated email from the git hooks/post-receive script.

lunar pushed a commit to branch pu/parallel
in repository diffoscope.

commit af3f5efc728db2408f9b18efb2ee41ef5383f91b
Author: Jérémy Bobbio <lunar at debian.org>
Date:   Thu Jul 30 18:04:40 2015 +0000

    Work-in-progress: implement parallel processing
---
 diffoscope.py                      |   7 ++-
 diffoscope/__init__.py             |  16 ++++-
 diffoscope/comparators/__init__.py | 121 +++++++++++++++++++++++++++++++++++++
 diffoscope/comparators/binary.py   |  15 ++++-
 diffoscope/comparators/deb.py      |   4 +-
 diffoscope/comparators/tar.py      |  10 ++-
 diffoscope/comparators/utils.py    |  28 ++++++---
 tests/comparators/conftest.py      |   5 ++
 tests/comparators/test_deb.py      |   3 +-
 9 files changed, 192 insertions(+), 17 deletions(-)

diff --git a/diffoscope.py b/diffoscope.py
index bcf935f..68ec81d 100755
--- a/diffoscope.py
+++ b/diffoscope.py
@@ -27,7 +27,7 @@ import codecs
 import os
 import sys
 import traceback
-from diffoscope import logger, VERSION, set_locale
+from diffoscope import logger, VERSION, set_locale, set_jobs
 import diffoscope.comparators
 from diffoscope.config import Config
 from diffoscope.presenters.html import output_html
@@ -46,6 +46,8 @@ def create_parser():
                         default=False, help='display debug messages')
     parser.add_argument('--debugger', action='store_true',
                         help='Open the python debugger in case of crashes.')
+    parser.add_argument('--jobs', metavar='JOBS', dest='jobs', type=int,
+                        help='comparisons to run simultaneously (default to CPU count)')
     parser.add_argument('--html', metavar='output', dest='html_output',
                         help='write HTML report to given file (use - for stdout)')
     parser.add_argument('--text', metavar='output', dest='text_output',
@@ -91,6 +93,8 @@ class ListToolsAction(argparse.Action):
         sys.exit(0)
 
 
+
+
 def main():
     Config.general.max_diff_block_lines = parsed_args.max_diff_block_lines
     Config.general.max_diff_input_lines = parsed_args.max_diff_input_lines
@@ -98,6 +102,7 @@ def main():
     if parsed_args.debug:
         logger.setLevel(logging.DEBUG)
     set_locale()
+    set_jobs(parsed_args.jobs)
     difference = diffoscope.comparators.compare_root_paths(
         parsed_args.file1, parsed_args.file2)
     if difference:
diff --git a/diffoscope/__init__.py b/diffoscope/__init__.py
index 88f8672..a651ce2 100644
--- a/diffoscope/__init__.py
+++ b/diffoscope/__init__.py
@@ -19,6 +19,7 @@
 
 from functools import wraps
 import logging
+import multiprocessing
 from distutils.spawn import find_executable
 import os
 
@@ -32,7 +33,6 @@ logger.addHandler(ch)
 formatter = logging.Formatter('%(levelname)8s %(message)s')
 ch.setFormatter(formatter)
 
-
 class RequiredToolNotFound(Exception):
     PROVIDERS = { 'ar':         { 'debian': 'binutils-multiarch' }
                 , 'bzip2':      { 'debian': 'bzip2' }
@@ -110,4 +110,16 @@ def set_locale():
     os.environ['TZ'] = 'UTC'
 
 
-
+def set_jobs(count):
+    global jobs
+    if not count:
+        try:
+            count = multiprocessing.cpu_count()
+            logger.debug('Detected CPU count: %d', count)
+        except NotImplementedError:
+            logger.warn('Unable to determine CPU count. Please specify --jobs.')
+            count = 1
+    jobs = count
+    if jobs == 1:
+        import diffoscope.comparators
+        diffoscope.comparators.compare_many_files = lambda comparisons: map(lambda args: diffoscope.comparators.compare_files(*args), comparisons)
diff --git a/diffoscope/comparators/__init__.py b/diffoscope/comparators/__init__.py
index 119b385..6f6ea0e 100644
--- a/diffoscope/comparators/__init__.py
+++ b/diffoscope/comparators/__init__.py
@@ -18,12 +18,16 @@
 # You should have received a copy of the GNU General Public License
 # along with diffoscope.  If not, see <http://www.gnu.org/licenses/>.
 
+from itertools import dropwhile
 import magic
+#import multiprocessing
+import multiprocessing.dummy as multiprocessing
 import operator
 import os.path
 import re
 import sys
 import tlsh
+import diffoscope
 from diffoscope import logger, tool_required
 from diffoscope.difference import Difference
 from diffoscope.comparators.binary import \
@@ -79,6 +83,123 @@ def compare_files(file1, file2, source=None):
         return file1.compare(file2, source)
 
 
+class ComparisonPool(object):
+    def __init__(self):
+        self._manager = multiprocessing.Manager()
+        self._condition = self._manager.Condition()
+        self._running = self._manager.Value(int, 0)
+        if not hasattr(diffoscope, 'jobs'):
+            diffoscope.set_jobs(None)
+        self._pool_size = self._manager.Value(int, diffoscope.jobs - 1)
+
+    @property
+    def manager(self):
+        return self._manager
+
+    def grow(self):
+        self._condition.acquire()
+        self._pool_size.value = self._pool_size.value + 1
+        logger.debug('grow pool, new size %s', self._pool_size.value)
+        self._condition.notify()
+        self._condition.release()
+
+    def shrink(self):
+        self._condition.acquire()
+        self._pool_size.value = self._pool_size.value - 1
+        logger.debug('shrink pool, new size %s', self._pool_size.value)
+        assert self._pool_size.value >= 0
+        self._condition.release()
+
+    def task_done(self):
+        self._condition.acquire()
+        self._running.value = self._running.value - 1
+        logger.debug('task_done. running %d/%d', self._running.value, self._pool_size.value)
+        assert self._running.value >= 0
+        self._condition.notify()
+        self._condition.release()
+
+    def run_comparison(self, *args):
+        self._condition.acquire()
+        logger.debug('run comparison. running %d/%d', self._running.value, self._pool_size.value)
+        while self._running.value >= self._pool_size.value:
+            logger.debug('comparison waiting')
+            self._condition.wait()
+        self._running.value = self._running.value + 1
+        logger.debug('done waiting. running %d/%d', self._running.value, self._pool_size.value)
+        self._condition.release()
+        p = multiprocessing.Process(target=compare_for_batch, args=args)
+        p.start()
+
+
+class ComparisonBatch(object):
+    def __init__(self, pool):
+        self._pool = pool
+        self._condition = multiprocessing.Condition()
+        self._remaining = multiprocessing.Value('i', 0, lock=False)
+
+    @property
+    def pool(self):
+        return self._pool
+
+    def process(self, comparisons):
+        self._condition.acquire()
+        assert self._remaining.value == 0
+        self._remaining.value = len(comparisons)
+        self._condition.release()
+        self._outputs = self._pool.manager.list(map(lambda _: None, comparisons))
+        for position, comparison in enumerate(comparisons):
+            self._pool.run_comparison(self, position, comparison)
+
+    def set_result(self, position, result):
+        self._outputs[position] = result
+        self._condition.acquire()
+        if type(result) is Exception:
+            self._remaining.value = 0
+        else:
+            self._remaining.value = self._remaining.value - 1
+        logger.debug('%d now remaining %s', id(self), self._remaining.value)
+        self._condition.notify()
+        self._condition.release()
+
+    def join(self):
+        self._condition.acquire()
+        while self._remaining.value > 0:
+            logger.debug('%d waiting for remaining %s', id(self), self._remaining.value)
+            self._condition.wait()
+        self._condition.release()
+        logger.debug('%d batch is over', id(self))
+        for output in self._outputs:
+            if type(output) is Exception:
+                raise output
+        return self._outputs
+
+
+def compare_for_batch(batch, position, comparison):
+    try:
+        result = compare_files(*comparison)
+    except:
+        logger.error('Exception in compare_files!', exc_info=True)
+        _, result, _ = sys.exc_info()
+    logger.debug('got result %s', result)
+    batch.set_result(position, result)
+    batch.pool.task_done()
+
+
+def compare_many_files(comparisons):
+    if not hasattr(compare_many_files, 'pool'):
+        compare_many_files.pool = ComparisonPool()
+    batch = ComparisonBatch(compare_many_files.pool)
+    # We are going to keep one process waiting, so let's give more space
+    batch.pool.grow()
+    batch.process(comparisons)
+    try:
+        return batch.join()
+    finally:
+        # We're done. Take the space back
+        logger.debug('compare_many_files done')
+        batch.pool.shrink()
+
+
 # The order matters! They will be tried in turns.
 FILE_CLASSES = (
     Directory,
diff --git a/diffoscope/comparators/binary.py b/diffoscope/comparators/binary.py
index 8060a17..25827fe 100644
--- a/diffoscope/comparators/binary.py
+++ b/diffoscope/comparators/binary.py
@@ -27,6 +27,7 @@ import re
 from stat import S_ISCHR, S_ISBLK
 import subprocess
 import tlsh
+import threading
 import magic
 from diffoscope.difference import Difference
 from diffoscope import tool_required, RequiredToolNotFound, logger
@@ -83,17 +84,25 @@ class File(object):
         if not hasattr(self, '_mimedb'):
             self._mimedb = magic.open(magic.NONE)
             self._mimedb.load()
-        return self._mimedb.file(path)
+            self._mimedb_lock = threading.Lock()
+        self._mimedb_lock.acquire()
+        ret = self._mimedb.file(path)
+        self._mimedb_lock.release()
+        return ret
 
     @classmethod
     def guess_encoding(self, path):
         if not hasattr(self, '_mimedb_encoding'):
             self._mimedb_encoding = magic.open(magic.MAGIC_MIME_ENCODING)
             self._mimedb_encoding.load()
-        return self._mimedb_encoding.file(path)
+            self._mimedb_encoding_lock = threading.Lock()
+        self._mimedb_encoding_lock.acquire()
+        ret = self._mimedb_encoding.file(path)
+        self._mimedb_encoding_lock.release()
+        return ret
 
     def __repr__(self):
-        return '<%s %s %s>' % (self.__class__, self.name, self.path)
+        return '<%s %s %s %s>' % (self.__class__.__name__, id(self), self.name, self.path)
 
     # Path should only be used when accessing the file content (through get_content())
     @property
diff --git a/diffoscope/comparators/deb.py b/diffoscope/comparators/deb.py
index 3283ad3..fde3cea 100644
--- a/diffoscope/comparators/deb.py
+++ b/diffoscope/comparators/deb.py
@@ -26,7 +26,7 @@ from diffoscope import logger
 from diffoscope.difference import Difference
 from diffoscope.comparators.binary import File, needs_content
 from diffoscope.comparators.utils import \
-    Archive, ArchiveMember, get_ar_content
+    Archive, ArchiveMember, get_ar_content, synchronized
 from diffoscope.comparators.tar import TarContainer, get_tar_listing
 
 AR_EXTRACTION_BUFFER_SIZE = 32768
@@ -39,9 +39,11 @@ class ArContainer(Archive):
         # ArFile don't have to be closed
         pass
 
+    @synchronized
     def get_member_names(self):
         return self.archive.getnames()
 
+    @synchronized
     def extract(self, member_name, dest_dir):
         logger.debug('ar extracting %s to %s', member_name, dest_dir)
         member = self.archive.getmember(member_name)
diff --git a/diffoscope/comparators/tar.py b/diffoscope/comparators/tar.py
index 07a7e75..0af94f2 100644
--- a/diffoscope/comparators/tar.py
+++ b/diffoscope/comparators/tar.py
@@ -17,6 +17,7 @@
 # You should have received a copy of the GNU General Public License
 # along with diffoscope.  If not, see <http://www.gnu.org/licenses/>.
 
+from contextlib import contextmanager
 import os.path
 import re
 import stat
@@ -29,7 +30,7 @@ from diffoscope.comparators.binary import File, needs_content
 from diffoscope.comparators.device import Device
 from diffoscope.comparators.directory import Directory
 from diffoscope.comparators.symlink import Symlink
-from diffoscope.comparators.utils import Archive, ArchiveMember
+from diffoscope.comparators.utils import Archive, ArchiveMember, synchronized
 
 class TarMember(ArchiveMember):
     def is_directory(self):
@@ -46,6 +47,10 @@ class TarDirectory(Directory, TarMember):
     def __init__(self, archive, member_name):
         ArchiveMember.__init__(self, archive, member_name)
 
+    @contextmanager
+    def get_content(self):
+        yield
+
     def compare(self, other, source=None):
         return None
 
@@ -95,14 +100,17 @@ class TarContainer(Archive):
     def close_archive(self):
         self.archive.close()
 
+    @synchronized
     def get_member_names(self):
         return self.archive.getnames()
 
+    @synchronized
     def extract(self, member_name, dest_dir):
         logger.debug('tar extracting %s to %s', member_name, dest_dir)
         self.archive.extract(member_name, dest_dir)
         return os.path.join(dest_dir, member_name).decode('utf-8')
 
+    @synchronized
     def get_member(self, member_name):
         tarinfo = self.archive.getmember(member_name)
         if tarinfo.isdir():
diff --git a/diffoscope/comparators/utils.py b/diffoscope/comparators/utils.py
index a87ea58..bab88bd 100644
--- a/diffoscope/comparators/utils.py
+++ b/diffoscope/comparators/utils.py
@@ -19,6 +19,7 @@
 
 from abc import ABCMeta, abstractmethod
 from contextlib import contextmanager
+from functools import wraps
 # The following would be shutil.which in Python 3.3
 import os
 import shutil
@@ -26,7 +27,7 @@ from stat import S_ISCHR, S_ISBLK
 from StringIO import StringIO
 import subprocess
 import tempfile
-from threading import Thread
+from threading import Thread, RLock
 import diffoscope.comparators
 from diffoscope.comparators.binary import File
 from diffoscope.difference import Difference
@@ -168,13 +169,10 @@ class Container(object):
         differences = []
         my_names = set(self.get_member_names())
         other_names = set(other.get_member_names())
-        for name in sorted(my_names.intersection(other_names)):
-            logger.debug('compare member %s', name)
-            my_file = self.get_member(name)
-            other_file = other.get_member(name)
-            differences.append(
-                diffoscope.comparators.compare_files(
-                    my_file, other_file, source=name))
+        differences.extend(diffoscope.comparators.compare_many_files(
+            [(self.get_member(name), other.get_member(name), name)
+             for name in sorted(set(my_names).intersection(other_names))]))
+        # XXX: change following to use compare_many_files
         my_extra_files = map(self.get_member, my_names.difference(other_names))
         other_extra_files = map(other.get_member, other_names.difference(my_names))
         for my_file, other_file, score in diffoscope.comparators.perform_fuzzy_matching(my_extra_files, other_extra_files):
@@ -265,3 +263,17 @@ class Archive(Container):
 
     def get_member(self, member_name):
         return ArchiveMember(self, member_name)
+
+
+# decorator
+def synchronized(original_method):
+    @wraps(original_method)
+    def wrapper(self, *args, **kwargs):
+        if not hasattr(self, '__rlock'):
+            self.__rlock = RLock()
+        try:
+            self.__rlock.acquire()
+            return original_method(self, *args, **kwargs)
+        finally:
+            self.__rlock.release()
+    return wrapper
diff --git a/tests/comparators/conftest.py b/tests/comparators/conftest.py
index 671be3c..d0b0d76 100644
--- a/tests/comparators/conftest.py
+++ b/tests/comparators/conftest.py
@@ -27,6 +27,11 @@ def set_locale():
     diffoscope.set_locale()    
 
 
+ at pytest.fixture
+def no_parallel():
+    debbindiff.set_jobs(1)
+
+
 def tool_missing(cmd):
     return find_executable(cmd) is None
 
diff --git a/tests/comparators/test_deb.py b/tests/comparators/test_deb.py
index 7000521..8e9f570 100644
--- a/tests/comparators/test_deb.py
+++ b/tests/comparators/test_deb.py
@@ -24,6 +24,7 @@ import diffoscope.comparators
 from diffoscope.comparators import specialize
 from diffoscope.comparators.binary import FilesystemFile
 from diffoscope.comparators.deb import DebFile, Md5sumsFile, DebDataTarFile
+from conftest import no_parallel
 
 TEST_FILE1_PATH = os.path.join(os.path.dirname(__file__), '../data/test1.deb')
 TEST_FILE2_PATH = os.path.join(os.path.dirname(__file__), '../data/test2.deb')
@@ -61,7 +62,7 @@ def test_identification_of_md5sums_outside_deb(tmpdir):
     f = specialize(FilesystemFile(path))
     assert type(f) is FilesystemFile
 
-def test_identification_of_md5sums_in_deb(deb1, deb2, monkeypatch):
+def test_identification_of_md5sums_in_deb(deb1, deb2, no_parallel, monkeypatch):
     orig_func = Md5sumsFile.recognizes
     @staticmethod
     def probe(file):

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git



More information about the Reproducible-commits mailing list