[Reproducible-commits] [diffoscope] 04/04: Work-in-progress: implement parallel processing
Jérémy Bobbio
lunar at moszumanska.debian.org
Sat Sep 5 16:01:33 UTC 2015
This is an automated email from the git hooks/post-receive script.
lunar pushed a commit to branch pu/parallel
in repository diffoscope.
commit c4b2619b00e0644511df70bc5371880cef69ed5e
Author: Jérémy Bobbio <lunar at debian.org>
Date: Thu Jul 30 18:04:40 2015 +0000
Work-in-progress: implement parallel processing
---
diffoscope/__init__.py | 4 --
diffoscope/__main__.py | 4 ++
diffoscope/comparators/__init__.py | 135 +++++++++++++++++++++++++++++++++++++
diffoscope/comparators/binary.py | 15 ++++-
diffoscope/comparators/deb.py | 11 ++-
diffoscope/comparators/tar.py | 9 ++-
diffoscope/comparators/utils.py | 18 ++++-
diffoscope/config.py | 21 ++++++
diffoscope/difference.py | 3 +-
9 files changed, 207 insertions(+), 13 deletions(-)
diff --git a/diffoscope/__init__.py b/diffoscope/__init__.py
index 5439a46..2ec532e 100644
--- a/diffoscope/__init__.py
+++ b/diffoscope/__init__.py
@@ -32,7 +32,6 @@ logger.addHandler(ch)
formatter = logging.Formatter('%(levelname)8s %(message)s')
ch.setFormatter(formatter)
-
class RequiredToolNotFound(Exception):
PROVIDERS = { 'ar': { 'debian': 'binutils-multiarch' }
, 'bzip2': { 'debian': 'bzip2' }
@@ -108,6 +107,3 @@ def set_locale():
os.environ[var] = 'C'
os.environ['LC_CTYPE'] = 'C.UTF-8'
os.environ['TZ'] = 'UTC'
-
-
-
diff --git a/diffoscope/__main__.py b/diffoscope/__main__.py
index 6971c2f..71f480f 100644
--- a/diffoscope/__main__.py
+++ b/diffoscope/__main__.py
@@ -47,6 +47,9 @@ def create_parser():
default=False, help='display debug messages')
parser.add_argument('--debugger', action='store_true',
help='Open the python debugger in case of crashes.')
+ parser.add_argument('--jobs', metavar='JOBS', dest='jobs', type=int,
+ help='comparisons to run simultaneously (default to CPU count)',
+ default=Config.general.jobs)
parser.add_argument('--html', metavar='output', dest='html_output',
help='write HTML report to given file (use - for stdout)')
parser.add_argument('--text', metavar='output', dest='text_output',
@@ -111,6 +114,7 @@ def run_diffoscope(parsed_args):
Config.general.max_report_size = parsed_args.max_report_size
Config.general.fuzzy_threshold = parsed_args.fuzzy_threshold
Config.general.new_file = parsed_args.new_file
+ Config.general.jobs = parsed_args.jobs
if parsed_args.debug:
logger.setLevel(logging.DEBUG)
set_locale()
diff --git a/diffoscope/comparators/__init__.py b/diffoscope/comparators/__init__.py
index 8fe3da5..f815ccd 100644
--- a/diffoscope/comparators/__init__.py
+++ b/diffoscope/comparators/__init__.py
@@ -18,12 +18,18 @@
# You should have received a copy of the GNU General Public License
# along with diffoscope. If not, see <http://www.gnu.org/licenses/>.
+from itertools import dropwhile, starmap
import magic
+#import multiprocessing
+import multiprocessing.dummy as multiprocessing
import operator
import os.path
import re
+import signal
+import subprocess
import sys
import tlsh
+import diffoscope
from diffoscope import logger, tool_required
from diffoscope.config import Config
from diffoscope.difference import Difference
@@ -104,6 +110,135 @@ def compare_commented_files(file1, file2, comment=None, source=None):
return difference
+class ComparisonPool(object):
+ def __init__(self):
+ self._manager = multiprocessing.Manager()
+ self._condition = self._manager.Condition()
+ self._running = self._manager.Value(int, 0)
+ self._pool_size = self._manager.Value(int, Config.general.jobs - 1)
+
+ @property
+ def manager(self):
+ return self._manager
+
+ def grow(self):
+ self._condition.acquire()
+ self._pool_size.value = self._pool_size.value + 1
+ logger.debug('grow pool, new size %s', self._pool_size.value)
+ self._condition.notify()
+ self._condition.release()
+
+ def shrink(self):
+ self._condition.acquire()
+ self._pool_size.value = self._pool_size.value - 1
+ logger.debug('shrink pool, new size %s', self._pool_size.value)
+ assert self._pool_size.value >= 0
+ self._condition.release()
+
+ def task_done(self):
+ self._condition.acquire()
+ self._running.value = self._running.value - 1
+ logger.debug('task_done. running %d/%d', self._running.value, self._pool_size.value)
+ assert self._running.value >= 0
+ self._condition.notify()
+ self._condition.release()
+
+ def run_comparison(self, *args):
+ self._condition.acquire()
+ logger.debug('run comparison. running %d/%d', self._running.value, self._pool_size.value)
+ while self._running.value >= self._pool_size.value:
+ logger.debug('comparison waiting')
+ self._condition.wait()
+ self._running.value = self._running.value + 1
+ logger.debug('done waiting. running %d/%d', self._running.value, self._pool_size.value)
+ self._condition.release()
+ p = multiprocessing.Process(target=compare_for_batch, args=args)
+ p.daemon = True
+ p.start()
+
+
+class ComparisonBatch(object):
+ def __init__(self, pool):
+ self._pool = pool
+ self._condition = multiprocessing.Condition()
+ self._remaining = multiprocessing.Value('i', 0, lock=False)
+
+ @property
+ def pool(self):
+ return self._pool
+
+ def process(self, comparisons):
+ self._condition.acquire()
+ assert self._remaining.value == 0
+ self._remaining.value = len(comparisons)
+ self._condition.release()
+ self._outputs = self._pool.manager.list(map(lambda _: None, comparisons))
+ for position, comparison in enumerate(comparisons):
+ self._pool.run_comparison(self, position, comparison)
+ # Stop processing stuff if we are done
+ self._condition.acquire()
+ try:
+ if self._remaining.value == 0:
+ break
+ finally:
+ self._condition.release()
+
+ def set_result(self, position, result):
+ self._outputs[position] = result
+ self._condition.acquire()
+ if type(result) is Exception:
+ self._remaining.value = 0
+ else:
+ self._remaining.value = self._remaining.value - 1
+ logger.debug('%d now remaining %s', id(self), self._remaining.value)
+ self._condition.notify()
+ self._condition.release()
+
+ def join(self):
+ self._condition.acquire()
+ while self._remaining.value > 0:
+ logger.debug('%d waiting for remaining %s', id(self), self._remaining.value)
+ self._condition.wait()
+ self._condition.release()
+ logger.debug('%d batch is over', id(self))
+ for output in self._outputs:
+ if isinstance(output, Exception):
+ raise output
+ return self._outputs
+
+
+def compare_for_batch(batch, position, comparison):
+ try:
+ result = compare_commented_files(*comparison)
+ except Exception as ex:
+ if isinstance(ex, subprocess.CalledProcessError) and \
+ ex.returncode == -signal.SIGINT:
+ logger.debug('Caught SIGINT')
+ else:
+ logger.error('Exception in compare_files!', exc_info=True)
+ _, result, _ = sys.exc_info()
+ logger.debug('got result %s', result)
+ batch.set_result(position, result)
+ batch.pool.task_done()
+
+
+def compare_many_files(comparisons):
+ if Config.general.jobs == 1:
+ return list(starmap(compare_commented_files, comparisons))
+ if not hasattr(compare_many_files, 'pool'):
+ compare_many_files.pool = ComparisonPool()
+ batch = ComparisonBatch(compare_many_files.pool)
+ # We are going to keep one process waiting, so let's give more space
+ batch.pool.grow()
+ batch.process(comparisons)
+ try:
+ return batch.join()
+ finally:
+ # We're done. Take the space back
+ logger.debug('compare_many_files done')
+ batch.pool.shrink()
+
+
# The order matters! They will be tried in turns.
FILE_CLASSES = (
Directory,
diff --git a/diffoscope/comparators/binary.py b/diffoscope/comparators/binary.py
index 3c8844a..26d71ec 100644
--- a/diffoscope/comparators/binary.py
+++ b/diffoscope/comparators/binary.py
@@ -28,6 +28,7 @@ from stat import S_ISCHR, S_ISBLK
import subprocess
import tempfile
import tlsh
+import threading
import magic
from diffoscope.config import Config
from diffoscope.difference import Difference
@@ -85,17 +86,25 @@ class File(object):
if not hasattr(self, '_mimedb'):
self._mimedb = magic.open(magic.NONE)
self._mimedb.load()
- return self._mimedb.file(path)
+ self._mimedb_lock = threading.Lock()
+ self._mimedb_lock.acquire()
+ ret = self._mimedb.file(path)
+ self._mimedb_lock.release()
+ return ret
@classmethod
def guess_encoding(self, path):
if not hasattr(self, '_mimedb_encoding'):
self._mimedb_encoding = magic.open(magic.MAGIC_MIME_ENCODING)
self._mimedb_encoding.load()
- return self._mimedb_encoding.file(path)
+ self._mimedb_encoding_lock = threading.Lock()
+ self._mimedb_encoding_lock.acquire()
+ ret = self._mimedb_encoding.file(path)
+ self._mimedb_encoding_lock.release()
+ return ret
def __repr__(self):
- return '<%s %s %s>' % (self.__class__, self.name, self.path)
+ return '<%s %s %s %s>' % (self.__class__.__name__, id(self), self.name, self.path)
# Path should only be used when accessing the file content (through get_content())
@property
diff --git a/diffoscope/comparators/deb.py b/diffoscope/comparators/deb.py
index 0d5cffb..9c1902c 100644
--- a/diffoscope/comparators/deb.py
+++ b/diffoscope/comparators/deb.py
@@ -19,14 +19,16 @@
from __future__ import absolute_import
+from itertools import starmap
import re
import os.path
from debian.arfile import ArFile
from diffoscope import logger
from diffoscope.difference import Difference
+import diffoscope.comparators
from diffoscope.comparators.binary import File, needs_content
from diffoscope.comparators.utils import \
- Archive, ArchiveMember, get_ar_content
+ Archive, ArchiveMember, get_ar_content, synchronized
from diffoscope.comparators.tar import TarContainer, get_tar_listing
AR_EXTRACTION_BUFFER_SIZE = 32768
@@ -39,9 +41,11 @@ class ArContainer(Archive):
# ArFile don't have to be closed
pass
+ @synchronized
def get_member_names(self):
return self.archive.getnames()
+ @synchronized
def extract(self, member_name, dest_dir):
logger.debug('ar extracting %s to %s', member_name, dest_dir)
member = self.archive.getmember(member_name)
@@ -54,7 +58,10 @@ class ArContainer(Archive):
class DebContainer(ArContainer):
- pass
+ def compare(self, other):
+ # We don't want to process things in parallel as we want to get control.tar first
+ # to see which files we can ignore by looking at md5sums
+ return starmap(diffoscope.comparators.compare_commented_files, self.comparisons(other))
class DebFile(File):
diff --git a/diffoscope/comparators/tar.py b/diffoscope/comparators/tar.py
index f08adf9..9e88abd 100644
--- a/diffoscope/comparators/tar.py
+++ b/diffoscope/comparators/tar.py
@@ -30,7 +30,7 @@ from diffoscope.comparators.binary import File, needs_content
from diffoscope.comparators.device import Device
from diffoscope.comparators.directory import Directory
from diffoscope.comparators.symlink import Symlink
-from diffoscope.comparators.utils import Archive, ArchiveMember
+from diffoscope.comparators.utils import Archive, ArchiveMember, synchronized
class TarMember(ArchiveMember):
def is_directory(self):
@@ -47,6 +47,10 @@ class TarDirectory(Directory, TarMember):
def __init__(self, archive, member_name):
ArchiveMember.__init__(self, archive, member_name)
+ @contextmanager
+ def get_content(self):
+ yield
+
def compare(self, other, source=None):
return None
@@ -100,14 +104,17 @@ class TarContainer(Archive):
def close_archive(self):
self.archive.close()
+ @synchronized
def get_member_names(self):
return self.archive.getnames()
+ @synchronized
def extract(self, member_name, dest_dir):
logger.debug('tar extracting %s to %s', member_name, dest_dir)
self.archive.extract(member_name, dest_dir)
return os.path.join(dest_dir, member_name).decode('utf-8')
+ @synchronized
def get_member(self, member_name):
tarinfo = self.archive.getmember(member_name)
if tarinfo.isdir():
diff --git a/diffoscope/comparators/utils.py b/diffoscope/comparators/utils.py
index 3b912cd..43d8a2e 100644
--- a/diffoscope/comparators/utils.py
+++ b/diffoscope/comparators/utils.py
@@ -20,6 +20,7 @@
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from itertools import starmap
+from functools import wraps
# The following would be shutil.which in Python 3.3
import os
import shutil
@@ -27,7 +28,7 @@ from stat import S_ISCHR, S_ISBLK
from StringIO import StringIO
import subprocess
import tempfile
-from threading import Thread
+from threading import Thread, RLock
import diffoscope.comparators
from diffoscope.comparators.binary import File, NonExistingFile
from diffoscope.config import Config
@@ -193,7 +194,7 @@ class Container(object):
yield NonExistingFile('/dev/null', other_file), other_file, NO_COMMENT
def compare(self, other, source=None):
- return list(starmap(diffoscope.comparators.compare_commented_files, self.comparisons(other)))
+ return diffoscope.comparators.compare_many_files(list(self.comparisons(other)))
class ArchiveMember(File):
@@ -316,3 +317,16 @@ class NonExistingArchive(Archive):
@property
def path(self):
return '/dev/null'
+
+# decorator
+def synchronized(original_method):
+ @wraps(original_method)
+ def wrapper(self, *args, **kwargs):
+ if not hasattr(self, '__rlock'):
+ self.__rlock = RLock()
+ try:
+ self.__rlock.acquire()
+ return original_method(self, *args, **kwargs)
+ finally:
+ self.__rlock.release()
+ return wrapper
diff --git a/diffoscope/config.py b/diffoscope/config.py
index 5087306..998b28c 100644
--- a/diffoscope/config.py
+++ b/diffoscope/config.py
@@ -17,6 +17,18 @@
# You should have received a copy of the GNU General Public License
# along with diffoscope. If not, see <http://www.gnu.org/licenses/>.
+import multiprocessing
+from diffoscope import logger
+
+
+def get_cpu_count():
+ try:
+ count = multiprocessing.cpu_count()
+ logger.debug('Detected CPU count: %d', count)
+ except NotImplementedError:
+ logger.warn('Unable to determine CPU count. Please specify --jobs.')
+ count = 1
+ return count
# From http://stackoverflow.com/a/7864317
# Credits to kylealanhale
@@ -32,6 +44,7 @@ class Config(object):
self._max_report_size = 2000 * 2 ** 10 # 2000 kB
self._fuzzy_threshold = 60
self._new_file = False
+ self._jobs = get_cpu_count()
@classproperty
def general(cls):
@@ -78,3 +91,11 @@ class Config(object):
@new_file.setter
def new_file(self, value):
self._new_file = value
+
+ @property
+ def jobs(self):
+ return self._jobs
+
+ @jobs.setter
+ def jobs(self, value):
+ self._jobs = value
diff --git a/diffoscope/difference.py b/diffoscope/difference.py
index e1b2a9a..0ece24b 100644
--- a/diffoscope/difference.py
+++ b/diffoscope/difference.py
@@ -21,12 +21,13 @@ from contextlib import contextmanager
import os
import os.path
import re
+import signal
import subprocess
import sys
import traceback
from StringIO import StringIO
from threading import Thread
-from multiprocessing import Queue
+from multiprocessing.dummy import Queue
from diffoscope.config import Config
from diffoscope import logger, tool_required, RequiredToolNotFound
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/reproducible/diffoscope.git
More information about the Reproducible-commits
mailing list