[apertium-apy] 01/05: Imported Upstream version 0.1~r61425

Kartik Mistry kartik at moszumanska.debian.org
Fri Aug 14 05:58:32 UTC 2015


This is an automated email from the git hooks/post-receive script.

kartik pushed a commit to branch master
in repository apertium-apy.

commit 5a401edd0f9f0e09e05cfa8a47e85ca909657b3c
Author: Kartik Mistry <kartik.mistry at gmail.com>
Date:   Fri Aug 14 11:16:28 2015 +0530

    Imported Upstream version 0.1~r61425
---
 servlet.py              | 134 ++++---
 tools/apertiumlangs.sql |   4 +-
 toro.py                 | 983 ++++++++++++++++++++++++++++++++++++++++++++++++
 translation.py          | 158 ++++++--
 4 files changed, 1193 insertions(+), 86 deletions(-)

diff --git a/servlet.py b/servlet.py
index 465b562..832dbdc 100755
--- a/servlet.py
+++ b/servlet.py
@@ -9,6 +9,7 @@ from multiprocessing import Pool, TimeoutError
 from functools import wraps
 from threading import Thread
 from datetime import datetime
+import heapq
 
 import tornado, tornado.web, tornado.httpserver, tornado.process, tornado.iostream
 from tornado import escape, gen
@@ -18,8 +19,6 @@ try: #3.1
 except ImportError: #2.1
     from tornado.options import enable_pretty_logging
 
-import toro
-
 from modeSearch import searchPath
 from util import getLocalizedLanguages, apertium, bilingualTranslate, removeLast, stripTags, processPerWord, getCoverage, getCoverages, toAlpha3Code, toAlpha2Code, noteUnknownToken, scaleMtLog, TranslationInfo, closeDb, flushUnknownWords, inMemoryUnknownToken
 import translation
@@ -57,7 +56,8 @@ class BaseHandler(tornado.web.RequestHandler):
     analyzers = {}
     generators = {}
     taggers = {}
-    pipelines = {} # (l1, l2): (inpipe, outpipe), only contains flushing pairs!
+    pipelines = {} # (l1, l2): [translation.Pipeline], only contains flushing pairs!
+    pipelines_holding = []
     callback = None
     timeout = None
     scaleMtLogs = False
@@ -67,15 +67,15 @@ class BaseHandler(tornado.web.RequestHandler):
 
     stats = {
         'useCount': {},
-        'lastUsage': {},
         'vmsize': 0,
     }
 
-    # The lock is needed so we don't let two coroutines write
-    # simultaneously to a pipeline; then the first call to read might
-    # read translations of text put there by the second call …
-    pipeline_locks = {} # (l1, l2): lock for (l1, l2) in pairs
-    pipeline_cmds = {} # (l1, l2): (do_flush, commands)
+    pipeline_cmds = {} # (l1, l2): translation.ParsedModes
+    max_pipes_per_pair = 1
+    min_pipes_per_pair = 0
+    max_users_per_pipe = 5
+    max_idle_secs = 0
+    restart_pipe_after = 1000
 
     def initialize(self):
         self.callback = self.get_argument('callback', default=None)
@@ -177,7 +177,14 @@ class StatsHandler(BaseHandler):
     @tornado.web.asynchronous
     def get(self):
         self.sendResponse({
-            'responseData': { '%s-%s' % pair: useCount for pair, useCount in self.stats['useCount'].items() },
+            'responseData': {
+                'useCount': { '%s-%s' % pair: useCount
+                              for pair, useCount in self.stats['useCount'].items() },
+                'runningPipes': { '%s-%s' % pair: len(pipes)
+                                  for pair, pipes in self.pipelines.items()
+                                  if pipes != [] },
+                'holdingPipes': len(self.pipelines_holding),
+            },
             'responseDetails': None,
             'responseStatus': 200
         })
@@ -190,8 +197,6 @@ class RootHandler(BaseHandler):
 class TranslateHandler(BaseHandler):
     def notePairUsage(self, pair):
         self.stats['useCount'][pair] = 1 + self.stats['useCount'].get(pair, 0)
-        if self.max_idle_secs:
-            self.stats['lastUsage'][pair] = time.time()
 
     unknownMarkRE = re.compile(r'\*([^.,;:\t\* ]+)')
     def maybeStripMarks(self, markUnknown, l1, l2, translated):
@@ -209,24 +214,36 @@ class TranslateHandler(BaseHandler):
                 else:
                     noteUnknownToken(token, pair, self.missingFreqs)
 
-    def shutdownPair(self, pair):
-        logging.info("shutting down")
-        self.pipelines[pair][0].stdin.close()
-        self.pipelines[pair][0].stdout.close()
-        self.pipelines.pop(pair)
+    def cleanable(self, i, pair, pipe):
+        if pipe.useCount > self.restart_pipe_after:
+            # Not affected by min_pipes_per_pair
+            logging.info('A pipe for pair %s-%s has handled %d requests, scheduling restart',
+                         pair[0], pair[1], self.restart_pipe_after)
+            return True
+        elif (i >= self.min_pipes_per_pair
+              and self.max_idle_secs != 0
+              and time.time() - pipe.lastUsage > self.max_idle_secs):
+            logging.info("A pipe for pair %s-%s hasn't been used in %d secs, scheduling shutdown",
+                         pair[0], pair[1], self.max_idle_secs)
+            return True
+        else:
+            return False
 
     def cleanPairs(self):
-        if self.max_idle_secs:
-            for pair, lastUsage in self.stats['lastUsage'].items():
-                if pair in self.pipelines and time.time() - lastUsage > self.max_idle_secs:
-                    logging.info('Shutting down pair %s-%s since it has not been used in %d seconds' % (
-                        pair[0], pair[1], self.max_idle_secs))
-                    self.shutdownPair(pair)
-
-    def getPipeLock(self, l1, l2):
-        if (l1, l2) not in self.pipeline_locks:
-            self.pipeline_locks[(l1, l2)] = toro.Lock()
-        return self.pipeline_locks[(l1, l2)]
+        for pair in self.pipelines:
+            pipes = self.pipelines[pair]
+            to_clean = set(p for i, p in enumerate(pipes)
+                           if self.cleanable(i, pair, p))
+            self.pipelines_holding += to_clean
+            pipes[:] = [p for p in pipes if not p in to_clean]
+            heapq.heapify(pipes)
+        # The holding area lets us restart pipes after n usages next
+        # time round, since with lots of traffic an active pipe may
+        # never reach 0 users
+        self.pipelines_holding[:] = [p for p in self.pipelines_holding
+                                     if p.users > 0]
+        if self.pipelines_holding:
+            logging.info("%d pipelines still scheduled for shutdown", len(self.pipelines_holding))
 
     def getPipeCmds(self, l1, l2):
         if (l1, l2) not in self.pipeline_cmds:
@@ -234,14 +251,30 @@ class TranslateHandler(BaseHandler):
             self.pipeline_cmds[(l1, l2)] = translation.parseModeFile(mode_path)
         return self.pipeline_cmds[(l1, l2)]
 
+    def shouldStartPipe(self, l1, l2):
+        pipes = self.pipelines.get((l1, l2), [])
+        if pipes == []:
+            logging.info("%s-%s not in pipelines of this process",
+                         l1, l2)
+            return True
+        else:
+            min_p = pipes[0]
+            if len(pipes) < self.max_pipes_per_pair and min_p.users > self.max_users_per_pipe:
+                logging.info("%s-%s has ≥%d users per pipe but only %d pipes",
+                            l1, l2, min_p.users, len(pipes))
+                return True
+            else:
+                return False
+
     def getPipeline(self, l1, l2):
-        do_flush, commands = self.getPipeCmds(l1, l2)
-        if not do_flush:
-            return None
-        if (l1, l2) not in self.pipelines:
-            logging.info('%s-%s not in pipelines of this process, starting …' % (l1, l2))
-            self.pipelines[(l1, l2)] = translation.startPipeline(commands)
-        return self.pipelines[(l1, l2)]
+        pair = (l1, l2)
+        if self.shouldStartPipe(l1, l2):
+            logging.info("Starting up a new pipeline for %s-%s …", l1, l2)
+            if not pair in self.pipelines:
+                self.pipelines[pair] = []
+            p = translation.makePipeline(self.getPipeCmds(l1, l2))
+            heapq.heappush(self.pipelines[pair], p)
+        return self.pipelines[pair][0]
 
     def logBeforeTranslation(self):
         if self.scaleMtLogs:
@@ -274,10 +307,9 @@ class TranslateHandler(BaseHandler):
 
         if '%s-%s' % (l1, l2) in self.pairs:
             before = self.logBeforeTranslation()
-            lock = self.getPipeLock(l1, l2)
-            _, commands = self.getPipeCmds(l1, l2)
             pipeline = self.getPipeline(l1, l2)
-            translated = yield translation.translate(toTranslate, lock, pipeline, commands)
+            self.notePairUsage((l1, l2))
+            translated = yield pipeline.translate(toTranslate)
             self.logAfterTranslation(before, toTranslate)
             self.sendResponse({
                 'responseData': {
@@ -286,7 +318,6 @@ class TranslateHandler(BaseHandler):
                 'responseDetails': None,
                 'responseStatus': 200
             })
-            self.notePairUsage((l1, l2))
             self.cleanPairs()
         else:
             self.send_error(400, explanation='That pair is not installed')
@@ -641,7 +672,7 @@ class PipeDebugHandler(BaseHandler):
 
 missingFreqsDb = ''
 
-def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_idle_secs, verbosity=0, scaleMtLogs=False, memory=0):
+def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_pipes_per_pair, min_pipes_per_pair, max_users_per_pipe, max_idle_secs, restart_pipe_after, verbosity=0, scaleMtLogs=False, memory=0):
 
     global missingFreqsDb
     missingFreqsDb= missingFreqs
@@ -650,7 +681,11 @@ def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeo
     Handler.langNames = langNames
     Handler.missingFreqs = missingFreqs
     Handler.timeout = timeout
+    Handler.max_pipes_per_pair = max_pipes_per_pair
+    Handler.min_pipes_per_pair = min_pipes_per_pair
+    Handler.max_users_per_pipe = max_users_per_pipe
     Handler.max_idle_secs = max_idle_secs
+    Handler.restart_pipe_after = restart_pipe_after
     Handler.scaleMtLogs = scaleMtLogs
     Handler.inMemoryUnknown = True if memory > 0 else False
     Handler.inMemoryLimit = memory
@@ -673,8 +708,17 @@ def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeo
     for dirpath, modename, lang_pair in modes['tagger']:
         Handler.taggers[lang_pair] = (dirpath, modename)
 
+def sanity_check():
+    locale_vars = ["LANG", "LC_ALL"]
+    u8 = re.compile("UTF-?8", re.IGNORECASE)
+    if not any(re.search(u8, os.environ.get(key, ""))
+               for key in locale_vars):
+        print("servlet.py: error: APY needs a UTF-8 locale, please set LANG or LC_ALL",
+              file=sys.stderr)
+        sys.exit(1)
 
 if __name__ == '__main__':
+    sanity_check()
     parser = argparse.ArgumentParser(description='Start Apertium APY')
     parser.add_argument('pairs_path', help='path to Apertium installed pairs (all modes files in this path are included)')
     parser.add_argument('-s', '--nonpairs-path', help='path to Apertium SVN (only non-translator debug modes are included from this path)')
@@ -684,10 +728,14 @@ if __name__ == '__main__':
     parser.add_argument('-c', '--ssl-cert', help='path to SSL Certificate', default=None)
     parser.add_argument('-k', '--ssl-key', help='path to SSL Key File', default=None)
     parser.add_argument('-t', '--timeout', help='timeout for requests (default = 10)', type=int, default=10)
-    parser.add_argument('-j', '--num-processes', help='number of processes to run (default = number of cores)', type=int, default=0)
+    parser.add_argument('-j', '--num-processes', help='number of processes to run (default = 1; use 0 to run one http server per core, where each http server runs all available language pairs)', nargs='?', type=int, default=1)
     parser.add_argument('-d', '--daemon', help='daemon mode: redirects stdout and stderr to files apertium-apy.log and apertium-apy.err ; use with --log-path', action='store_true')
     parser.add_argument('-P', '--log-path', help='path to log output files to in daemon mode; defaults to local directory', default='./')
-    parser.add_argument('-m', '--max-idle-secs', help='shut down pipelines it have not been used in this many seconds', type=int, default=0)
+    parser.add_argument('-i', '--max-pipes-per-pair', help='how many pipelines we can spin up per language pair (default = 1)', type=int, default=1)
+    parser.add_argument('-n', '--min-pipes-per-pair', help='when shutting down pipelines, keep at least this many open per language pair (default = 0)', type=int, default=0)
+    parser.add_argument('-u', '--max-users-per-pipe', help='how many concurrent requests per pipeline before we consider spinning up a new one (default = 5)', type=int, default=5)
+    parser.add_argument('-m', '--max-idle-secs', help='if specified, shut down pipelines that have not been used in this many seconds', type=int, default=0)
+    parser.add_argument('-r', '--restart-pipe-after', help='restart a pipeline if it has had this many requests (default = 1000)', type=int, default=1000)
     parser.add_argument('-v', '--verbosity', help='logging verbosity', type=int, default=0)
     parser.add_argument('-S', '--scalemt-logs', help='generates ScaleMT-like logs; use with --log-path; disables', action='store_true')
     parser.add_argument('-M', '--unknown-memory-limit', help="keeps unknown words in memory until a limit is reached", type=int, default=0)
@@ -718,7 +766,7 @@ if __name__ == '__main__':
     if not cld2:
         logging.warning('Unable to import CLD2, continuing using naive method of language detection')
 
-    setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_idle_secs, args.verbosity, args.scalemt_logs, args.unknown_memory_limit)
+    setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_pipes_per_pair, args.min_pipes_per_pair, args.max_users_per_pipe, args.max_idle_secs, args.restart_pipe_after, args.verbosity, args.scalemt_logs, args.unknown_memory_limit)
 
     application = tornado.web.Application([
         (r'/', RootHandler),
diff --git a/tools/apertiumlangs.sql b/tools/apertiumlangs.sql
index 7bbf4ba..c62fdb4 100644
--- a/tools/apertiumlangs.sql
+++ b/tools/apertiumlangs.sql
@@ -2721,7 +2721,7 @@ INSERT INTO "languageNames" VALUES(2734,'en','ne','Nepali');
 INSERT INTO "languageNames" VALUES(2735,'en','nl','Dutch');
 INSERT INTO "languageNames" VALUES(2736,'en','nn','Norwegian Nynorsk');
 INSERT INTO "languageNames" VALUES(2737,'en','no','Norwegian');
-INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogai');
+INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogay');
 INSERT INTO "languageNames" VALUES(2739,'en','oc','Occitan');
 INSERT INTO "languageNames" VALUES(2740,'en','os','Ossetic');
 INSERT INTO "languageNames" VALUES(2741,'en','pa','Punjabi');
@@ -2756,7 +2756,7 @@ INSERT INTO "languageNames" VALUES(2769,'en','tk','Turkmen');
 INSERT INTO "languageNames" VALUES(2770,'en','tl','Tagalog');
 INSERT INTO "languageNames" VALUES(2771,'en','tr','Turkish');
 INSERT INTO "languageNames" VALUES(2772,'en','tt','Tatar');
-INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvinian');
+INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvan');
 INSERT INTO "languageNames" VALUES(2774,'en','udm','Udmurt');
 INSERT INTO "languageNames" VALUES(2775,'en','uk','Ukrainian');
 INSERT INTO "languageNames" VALUES(2776,'en','ur','Urdu');
diff --git a/toro.py b/toro.py
new file mode 100644
index 0000000..d978299
--- /dev/null
+++ b/toro.py
@@ -0,0 +1,983 @@
+# From https://github.com/ajdavis/toro/
+
+# Toro Copyright (c) 2012 A. Jesse Jiryu Davis
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import contextlib
+import heapq
+import collections
+from functools import partial
+from queue import Full, Empty
+
+from tornado import ioloop
+from tornado import gen
+from tornado.concurrent import Future
+
+
+version_tuple = (0, 8, '+')
+
+version = '.'.join(map(str, version_tuple))
+"""Current version of Toro."""
+
+
+__all__ = [
+    # Exceptions
+    'NotReady', 'AlreadySet', 'Full', 'Empty', 'Timeout',
+
+    # Primitives
+    'AsyncResult', 'Event', 'Condition',  'Semaphore', 'BoundedSemaphore',
+    'Lock',
+
+    # Queues
+    'Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'
+]
+
+
+class NotReady(Exception):
+    """Raised when accessing an :class:`AsyncResult` that has no value yet."""
+    pass
+
+
+class AlreadySet(Exception):
+    """Raised when setting a value on an :class:`AsyncResult` that already
+    has one."""
+    pass
+
+
+class Timeout(Exception):
+    """Raised when a deadline passes before a Future is ready."""
+
+    def __str__(self):
+        return "Timeout"
+
+
+class _TimeoutFuture(Future):
+
+    def __init__(self, deadline, io_loop):
+        """Create a Future with optional deadline.
+
+        If deadline is not None, it may be a number denoting a unix timestamp
+        (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` object
+        for a deadline relative to the current time.
+
+        set_exception(toro.Timeout()) is executed after a timeout.
+        """
+
+        super(_TimeoutFuture, self).__init__()
+        self.io_loop = io_loop
+        if deadline is not None:
+            callback = partial(self.set_exception, Timeout())
+            self._timeout_handle = io_loop.add_timeout(deadline, callback)
+        else:
+            self._timeout_handle = None
+
+    def set_result(self, result):
+        self._cancel_timeout()
+        super(_TimeoutFuture, self).set_result(result)
+
+    def set_exception(self, exception):
+        self._cancel_timeout()
+        super(_TimeoutFuture, self).set_exception(exception)
+
+    def _cancel_timeout(self):
+        if self._timeout_handle:
+            self.io_loop.remove_timeout(self._timeout_handle)
+            self._timeout_handle = None
+
+
+class _ContextManagerList(list):
+    def __enter__(self, *args, **kwargs):
+        for obj in self:
+            obj.__enter__(*args, **kwargs)
+
+    def __exit__(self, *args, **kwargs):
+        for obj in self:
+            obj.__exit__(*args, **kwargs)
+
+
+class _ContextManagerFuture(Future):
+    """A Future that can be used with the "with" statement.
+
+    When a coroutine yields this Future, the return value is a context manager
+    that can be used like:
+
+        with (yield future):
+            pass
+
+    At the end of the block, the Future's exit callback is run. Used for
+    Lock.acquire() and Semaphore.acquire().
+    """
+    def __init__(self, wrapped, exit_callback):
+        super(_ContextManagerFuture, self).__init__()
+        wrapped.add_done_callback(self._done_callback)
+        self.exit_callback = exit_callback
+
+    def _done_callback(self, wrapped):
+        if wrapped.exception():
+            self.set_exception(wrapped.exception())
+        else:
+            self.set_result(wrapped.result())
+
+    def result(self):
+        if self.exception():
+            raise self.exception()
+
+        # Otherwise return a context manager that cleans up after the block.
+        @contextlib.contextmanager
+        def f():
+            try:
+                yield
+            finally:
+                self.exit_callback()
+        return f()
+
+
+def _consume_expired_waiters(waiters):
+    # Delete waiters at the head of the queue who've timed out
+    while waiters and waiters[0].done():
+        waiters.popleft()
+
+
+_null_result = object()
+
+
+class AsyncResult(object):
+    """A one-time event that stores a value or an exception.
+
+    The only distinction between AsyncResult and a simple Future is that
+    AsyncResult lets coroutines wait with a deadline. The deadline can be
+    configured separately for each waiter.
+
+    An :class:`AsyncResult` instance cannot be reset.
+
+    :Parameters:
+      - `io_loop`: Optional custom IOLoop.
+    """
+
+    def __init__(self, io_loop=None):
+        self.io_loop = io_loop or ioloop.IOLoop.current()
+        self.value = _null_result
+        self.waiters = []
+
+    def __str__(self):
+        result = '<%s ' % (self.__class__.__name__, )
+        if self.ready():
+            result += 'value=%r' % self.value
+        else:
+            result += 'unset'
+            if self.waiters:
+                result += ' waiters[%s]' % len(self.waiters)
+
+        return result + '>'
+
+    def set(self, value):
+        """Set a value and wake up all the waiters."""
+        if self.ready():
+            raise AlreadySet
+
+        self.value = value
+        waiters, self.waiters = self.waiters, []
+        for waiter in waiters:
+            if not waiter.done():  # Might have timed out
+                waiter.set_result(value)
+
+    def ready(self):
+        return self.value is not _null_result
+
+    def get(self, deadline=None):
+        """Get a value once :meth:`set` is called. Returns a Future.
+
+        The Future's result will be the value. The Future raises
+        :exc:`toro.Timeout` if no value is set before the deadline.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+            a deadline relative to the current time.
+        """
+        future = _TimeoutFuture(deadline, self.io_loop)
+        if self.ready():
+            future.set_result(self.value)
+        else:
+            self.waiters.append(future)
+
+        return future
+
+    def get_nowait(self):
+        """Get the value if ready, or raise :class:`NotReady`."""
+        if self.ready():
+            return self.value
+        else:
+            raise NotReady
+
+
+class Condition(object):
+    """A condition allows one or more coroutines to wait until notified.
+
+    Like a standard Condition_, but does not need an underlying lock that
+    is acquired and released.
+
+    .. _Condition: http://docs.python.org/library/threading.html#threading.Condition
+
+    :Parameters:
+      - `io_loop`: Optional custom IOLoop.
+    """
+
+    def __init__(self, io_loop=None):
+        self.io_loop = io_loop or ioloop.IOLoop.current()
+        self.waiters = collections.deque()  # Queue of _Waiter objects
+
+    def __str__(self):
+        result = '<%s' % (self.__class__.__name__, )
+        if self.waiters:
+            result += ' waiters[%s]' % len(self.waiters)
+        return result + '>'
+
+    def wait(self, deadline=None):
+        """Wait for :meth:`notify`. Returns a Future.
+
+        :exc:`~toro.Timeout` is executed after a timeout.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        future = _TimeoutFuture(deadline, self.io_loop)
+        self.waiters.append(future)
+        return future
+
+    def notify(self, n=1):
+        """Wake up `n` waiters.
+
+        :Parameters:
+          - `n`: The number of waiters to awaken (default: 1)
+        """
+        waiters = []  # Waiters we plan to run right now
+        while n and self.waiters:
+            waiter = self.waiters.popleft()
+            if not waiter.done():  # Might have timed out
+                n -= 1
+                waiters.append(waiter)
+
+        for waiter in waiters:
+            waiter.set_result(None)
+
+    def notify_all(self):
+        """Wake up all waiters."""
+        self.notify(len(self.waiters))
+
+
+# TODO: show correct examples that avoid thread / process issues w/ concurrent.futures.Future
+class Event(object):
+    """An event blocks coroutines until its internal flag is set to True.
+
+    Similar to threading.Event_.
+
+    .. _threading.Event: http://docs.python.org/library/threading.html#threading.Event
+
+    .. seealso:: :doc:`examples/event_example`
+
+    :Parameters:
+      - `io_loop`: Optional custom IOLoop.
+    """
+
+    def __init__(self, io_loop=None):
+        self.io_loop = io_loop or ioloop.IOLoop.current()
+        self.condition = Condition(io_loop=io_loop)
+        self._flag = False
+
+    def __str__(self):
+        return '<%s %s>' % (
+            self.__class__.__name__, 'set' if self._flag else 'clear')
+
+    def is_set(self):
+        """Return ``True`` if and only if the internal flag is true."""
+        return self._flag
+
+    def set(self):
+        """Set the internal flag to ``True``. All waiters are awakened.
+        Calling :meth:`wait` once the flag is true will not block.
+        """
+        self._flag = True
+        self.condition.notify_all()
+
+    def clear(self):
+        """Reset the internal flag to ``False``. Calls to :meth:`wait`
+        will block until :meth:`set` is called.
+        """
+        self._flag = False
+
+    def wait(self, deadline=None):
+        """Block until the internal flag is true. Returns a Future.
+
+        The Future raises :exc:`~toro.Timeout` after a timeout.
+
+        :Parameters:
+          - `callback`: Function taking no arguments.
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        if self._flag:
+            future = _TimeoutFuture(None, self.io_loop)
+            future.set_result(None)
+            return future
+        else:
+            return self.condition.wait(deadline)
+
+
+class Queue(object):
+    """Create a queue object with a given maximum size.
+
+    If `maxsize` is 0 (the default) the queue size is unbounded.
+
+    Unlike the `standard Queue`_, you can reliably know this Queue's size
+    with :meth:`qsize`, since your single-threaded Tornado application won't
+    be interrupted between calling :meth:`qsize` and doing an operation on the
+    Queue.
+
+    **Examples:**
+
+    :doc:`examples/producer_consumer_example`
+
+    :doc:`examples/web_spider_example`
+
+    :Parameters:
+      - `maxsize`: Optional size limit (no limit by default).
+      - `io_loop`: Optional custom IOLoop.
+
+    .. _`Gevent's Queue`: http://www.gevent.org/gevent.queue.html
+
+    .. _`standard Queue`: http://docs.python.org/library/queue.html#Queue.Queue
+    """
+    def __init__(self, maxsize=0, io_loop=None):
+        self.io_loop = io_loop or ioloop.IOLoop.current()
+        if maxsize is None:
+            raise TypeError("maxsize can't be None")
+
+        if maxsize < 0:
+            raise ValueError("maxsize can't be negative")
+
+        self._maxsize = maxsize
+
+        # _TimeoutFutures
+        self.getters = collections.deque([])
+        # Pairs of (item, _TimeoutFuture)
+        self.putters = collections.deque([])
+        self._init(maxsize)
+
+    def _init(self, maxsize):
+        self.queue = collections.deque()
+
+    def _get(self):
+        return self.queue.popleft()
+
+    def _put(self, item):
+        self.queue.append(item)
+
+    def __repr__(self):
+        return '<%s at %s %s>' % (
+            type(self).__name__, hex(id(self)), self._format())
+
+    def __str__(self):
+        return '<%s %s>' % (type(self).__name__, self._format())
+
+    def _format(self):
+        result = 'maxsize=%r' % (self.maxsize, )
+        if getattr(self, 'queue', None):
+            result += ' queue=%r' % self.queue
+        if self.getters:
+            result += ' getters[%s]' % len(self.getters)
+        if self.putters:
+            result += ' putters[%s]' % len(self.putters)
+        return result
+
+    def _consume_expired_putters(self):
+        # Delete waiters at the head of the queue who've timed out
+        while self.putters and self.putters[0][1].done():
+            self.putters.popleft()
+
+    def qsize(self):
+        """Number of items in the queue"""
+        return len(self.queue)
+
+    @property
+    def maxsize(self):
+        """Number of items allowed in the queue."""
+        return self._maxsize
+
+    def empty(self):
+        """Return ``True`` if the queue is empty, ``False`` otherwise."""
+        return not self.queue
+
+    def full(self):
+        """Return ``True`` if there are `maxsize` items in the queue.
+
+        .. note:: if the Queue was initialized with `maxsize=0`
+          (the default), then :meth:`full` is never ``True``.
+        """
+        if self.maxsize == 0:
+            return False
+        else:
+            return self.maxsize <= self.qsize()
+
+    def put(self, item, deadline=None):
+        """Put an item into the queue. Returns a Future.
+
+        The Future blocks until a free slot is available for `item`, or raises
+        :exc:`toro.Timeout`.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        _consume_expired_waiters(self.getters)
+        future = _TimeoutFuture(deadline, self.io_loop)
+        if self.getters:
+            assert not self.queue, "queue non-empty, why are getters waiting?"
+            getter = self.getters.popleft()
+
+            # Use _put and _get instead of passing item straight to getter, in
+            # case a subclass has logic that must run (e.g. JoinableQueue).
+            self._put(item)
+            getter.set_result(self._get())
+            future.set_result(None)
+        else:
+            if self.maxsize and self.maxsize <= self.qsize():
+                self.putters.append((item, future))
+            else:
+                self._put(item)
+                future.set_result(None)
+
+        return future
+
+    def put_nowait(self, item):
+        """Put an item into the queue without blocking.
+
+        If no free slot is immediately available, raise queue.Full.
+        """
+        _consume_expired_waiters(self.getters)
+        if self.getters:
+            assert not self.queue, "queue non-empty, why are getters waiting?"
+            getter = self.getters.popleft()
+
+            self._put(item)
+            getter.set_result(self._get())
+        elif self.maxsize and self.maxsize <= self.qsize():
+            raise Full
+        else:
+            self._put(item)
+
+    def get(self, deadline=None):
+        """Remove and return an item from the queue. Returns a Future.
+
+        The Future blocks until an item is available, or raises
+        :exc:`toro.Timeout`.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        self._consume_expired_putters()
+        future = _TimeoutFuture(deadline, self.io_loop)
+        if self.putters:
+            assert self.full(), "queue not full, why are putters waiting?"
+            item, putter = self.putters.popleft()
+            self._put(item)
+            putter.set_result(None)
+            future.set_result(self._get())
+        elif self.qsize():
+            future.set_result(self._get())
+        else:
+            self.getters.append(future)
+
+        return future
+
+    def get_nowait(self):
+        """Remove and return an item from the queue without blocking.
+
+        Return an item if one is immediately available, else raise
+        :exc:`queue.Empty`.
+        """
+        self._consume_expired_putters()
+        if self.putters:
+            assert self.full(), "queue not full, why are putters waiting?"
+            item, putter = self.putters.popleft()
+            self._put(item)
+            putter.set_result(None)
+            return self._get()
+        elif self.qsize():
+            return self._get()
+        else:
+            raise Empty
+
+
+class PriorityQueue(Queue):
+    """A subclass of :class:`Queue` that retrieves entries in priority order
+    (lowest first).
+
+    Entries are typically tuples of the form: ``(priority number, data)``.
+
+    :Parameters:
+      - `maxsize`: Optional size limit (no limit by default).
+      - `initial`: Optional sequence of initial items.
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def _init(self, maxsize):
+        self.queue = []
+
+    def _put(self, item, heappush=heapq.heappush):
+        heappush(self.queue, item)
+
+    def _get(self, heappop=heapq.heappop):
+        return heappop(self.queue)
+
+
+class LifoQueue(Queue):
+    """A subclass of :class:`Queue` that retrieves most recently added entries
+    first.
+
+    :Parameters:
+      - `maxsize`: Optional size limit (no limit by default).
+      - `initial`: Optional sequence of initial items.
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def _init(self, maxsize):
+        self.queue = []
+
+    def _put(self, item):
+        self.queue.append(item)
+
+    def _get(self):
+        return self.queue.pop()
+
+
+class JoinableQueue(Queue):
+    """A subclass of :class:`Queue` that additionally has :meth:`task_done`
+    and :meth:`join` methods.
+
+    .. seealso:: :doc:`examples/web_spider_example`
+
+    :Parameters:
+      - `maxsize`: Optional size limit (no limit by default).
+      - `initial`: Optional sequence of initial items.
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def __init__(self, maxsize=0, io_loop=None):
+        Queue.__init__(self, maxsize=maxsize, io_loop=io_loop)
+        self.unfinished_tasks = 0
+        self._finished = Event(io_loop)
+        self._finished.set()
+
+    def _format(self):
+        result = Queue._format(self)
+        if self.unfinished_tasks:
+            result += ' tasks=%s' % self.unfinished_tasks
+        return result
+
+    def _put(self, item):
+        self.unfinished_tasks += 1
+        self._finished.clear()
+        Queue._put(self, item)
+
+    def task_done(self):
+        """Indicate that a formerly enqueued task is complete.
+
+        Used by queue consumers. For each :meth:`get <Queue.get>` used to
+        fetch a task, a subsequent call to :meth:`task_done` tells the queue
+        that the processing on the task is complete.
+
+        If a :meth:`join` is currently blocking, it will resume when all
+        items have been processed (meaning that a :meth:`task_done` call was
+        received for every item that had been :meth:`put <Queue.put>` into the
+        queue).
+
+        Raises ``ValueError`` if called more times than there were items
+        placed in the queue.
+        """
+        if self.unfinished_tasks <= 0:
+            raise ValueError('task_done() called too many times')
+        self.unfinished_tasks -= 1
+        if self.unfinished_tasks == 0:
+            self._finished.set()
+
+    def join(self, deadline=None):
+        """Block until all items in the queue are processed. Returns a Future.
+
+        The count of unfinished tasks goes up whenever an item is added to
+        the queue. The count goes down whenever a consumer calls
+        :meth:`task_done` to indicate that all work on the item is complete.
+        When the count of unfinished tasks drops to zero, :meth:`join`
+        unblocks.
+
+        The Future raises :exc:`toro.Timeout` if the count is not zero before
+        the deadline.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        return self._finished.wait(deadline)
+
+
+class Semaphore(object):
+    """A lock that can be acquired a fixed number of times before blocking.
+
+    A Semaphore manages a counter representing the number of release() calls
+    minus the number of acquire() calls, plus an initial value. The acquire()
+    method blocks if necessary until it can return without making the counter
+    negative.
+
+    If not given, value defaults to 1.
+
+    :meth:`acquire` supports the context manager protocol:
+
+    >>> from tornado import gen
+    >>> import toro
+    >>> semaphore = toro.Semaphore()
+    >>>
+    >>> @gen.coroutine
+    ... def f():
+    ...    with (yield semaphore.acquire()):
+    ...        assert semaphore.locked()
+    ...
+    ...    assert not semaphore.locked()
+
+    .. note:: Unlike the standard threading.Semaphore_, a :class:`Semaphore`
+      can tell you the current value of its :attr:`counter`, because code in a
+      single-threaded Tornado app can check these values and act upon them
+      without fear of interruption from another thread.
+
+    .. _threading.Semaphore: http://docs.python.org/library/threading.html#threading.Semaphore
+
+    .. seealso:: :doc:`examples/web_spider_example`
+
+    :Parameters:
+      - `value`: An int, the initial value (default 1).
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def __init__(self, value=1, io_loop=None):
+        if value < 0:
+            raise ValueError('semaphore initial value must be >= 0')
+
+        # The semaphore is implemented as a Queue with 'value' objects
+        self.q = Queue(io_loop=io_loop)
+        for _ in range(value):
+            self.q.put_nowait(None)
+
+        self._unlocked = Event(io_loop=io_loop)
+        if value:
+            self._unlocked.set()
+
+    def __repr__(self):
+        return '<%s at %s%s>' % (
+            type(self).__name__, hex(id(self)), self._format())
+
+    def __str__(self):
+        return '<%s%s>' % (
+            self.__class__.__name__, self._format())
+
+    def _format(self):
+        return ' counter=%s' % self.counter
+
+    @property
+    def counter(self):
+        """An integer, the current semaphore value"""
+        return self.q.qsize()
+
+    def locked(self):
+        """True if :attr:`counter` is zero"""
+        return self.q.empty()
+
+    def release(self):
+        """Increment :attr:`counter` and wake one waiter.
+        """
+        self.q.put(None)
+        if not self.locked():
+            # No one was waiting on acquire(), so self.q.qsize() is positive
+            self._unlocked.set()
+
+    def wait(self, deadline=None):
+        """Wait for :attr:`locked` to be False. Returns a Future.
+
+        The Future raises :exc:`toro.Timeout` after the deadline.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        return self._unlocked.wait(deadline)
+
+    def acquire(self, deadline=None):
+        """Decrement :attr:`counter`. Returns a Future.
+
+        Block if the counter is zero and wait for a :meth:`release`. The
+        Future raises :exc:`toro.Timeout` after the deadline.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        queue_future = self.q.get(deadline)
+        if self.q.empty():
+            self._unlocked.clear()
+        future = _ContextManagerFuture(queue_future, self.release)
+        return future
+
+    def __enter__(self):
+        raise RuntimeError(
+            "Use Semaphore like 'with (yield semaphore)', not like"
+            " 'with semaphore'")
+
+    __exit__ = __enter__
+
+
+class BoundedSemaphore(Semaphore):
+    """A semaphore that prevents release() being called too often.
+
+    A bounded semaphore checks to make sure its current value doesn't exceed
+    its initial value. If it does, ``ValueError`` is raised. In most
+    situations semaphores are used to guard resources with limited capacity.
+    If the semaphore is released too many times it's a sign of a bug.
+
+    If not given, *value* defaults to 1.
+
+    .. seealso:: :doc:`examples/web_spider_example`
+    """
+    def __init__(self, value=1, io_loop=None):
+        super(BoundedSemaphore, self).__init__(value=value, io_loop=io_loop)
+        self._initial_value = value
+
+    def release(self):
+        if self.counter >= self._initial_value:
+            raise ValueError("Semaphore released too many times")
+        return super(BoundedSemaphore, self).release()
+
+
+class Lock(object):
+    """A lock for coroutines.
+
+    It is created unlocked. When unlocked, :meth:`acquire` changes the state
+    to locked. When the state is locked, yielding :meth:`acquire` waits until
+    a call to :meth:`release`.
+
+    The :meth:`release` method should only be called in the locked state;
+    an attempt to release an unlocked lock raises RuntimeError.
+
+    When more than one coroutine is waiting for the lock, the first one
+    registered is awakened by :meth:`release`.
+
+    :meth:`acquire` supports the context manager protocol:
+
+    >>> from tornado import gen
+    >>> import toro
+    >>> lock = toro.Lock()
+    >>>
+    >>> @gen.coroutine
+    ... def f():
+    ...    with (yield lock.acquire()):
+    ...        assert lock.locked()
+    ...
+    ...    assert not lock.locked()
+
+    .. note:: Unlike with the standard threading.Lock_, code in a
+      single-threaded Tornado application can check if a :class:`Lock`
+      is :meth:`locked`, and act on that information without fear that another
+      thread has grabbed the lock, provided you do not yield to the IOLoop
+      between checking :meth:`locked` and using a protected resource.
+
+    .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects
+
+    .. seealso:: :doc:`examples/lock_example`
+
+    :Parameters:
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def __init__(self, io_loop=None):
+        self._block = BoundedSemaphore(value=1, io_loop=io_loop)
+
+    def __str__(self):
+        return "<%s _block=%s>" % (
+            self.__class__.__name__,
+            self._block)
+
+    def acquire(self, deadline=None):
+        """Attempt to lock. Returns a Future.
+
+        The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+            deadline relative to the current time.
+        """
+        return self._block.acquire(deadline)
+
+    def release(self):
+        """Unlock.
+
+        If any coroutines are waiting for :meth:`acquire`,
+        the first in line is awakened.
+
+        If not locked, raise a RuntimeError.
+        """
+        if not self.locked():
+            raise RuntimeError('release unlocked lock')
+        self._block.release()
+
+    def locked(self):
+        """``True`` if the lock has been acquired"""
+        return self._block.locked()
+
+    def __enter__(self):
+        raise RuntimeError(
+            "Use Lock like 'with (yield lock)', not like"
+            " 'with lock'")
+
+    __exit__ = __enter__
+
+
+class RWLock(object):
+    """A reader-writer lock for coroutines.
+
+    It is created unlocked. When unlocked, :meth:`acquire_write` always changes
+    the state to locked. When unlocked, :meth:`acquire_read` can changed the
+    state to locked, if :meth:`acquire_read` was called max_readers times. When
+    the state is locked, yielding :meth:`acquire_read`/meth:`acquire_write`
+    waits until a call to :meth:`release_write` in case of locking on write, or
+    :meth:`release_read` in case of locking on read.
+
+    The :meth:`release_read` method should only be called in the locked-on-read
+    state; an attempt to release an unlocked lock raises RuntimeError.
+
+    The :meth:`release_write` method should only be called in the locked on
+    write state; an attempt to release an unlocked lock raises RuntimeError.
+
+    When more than one coroutine is waiting for the lock, the first one
+    registered is awakened by :meth:`release_read`/:meth:`release_write`.
+
+    :meth:`acquire_read`/:meth:`acquire_write` support the context manager
+    protocol:
+
+    >>> from tornado import gen
+    >>> import toro
+    >>> lock = toro.RWLock(max_readers=10)
+    >>>
+    >>> @gen.coroutine
+    ... def f():
+    ...    with (yield lock.acquire_read()):
+    ...        assert not lock.locked()
+    ...
+    ...    with (yield lock.acquire_write()):
+    ...        assert lock.locked()
+    ...
+    ...    assert not lock.locked()
+
+    .. note:: Unlike with the standard threading.Lock_, code in a
+      single-threaded Tornado application can check if a :class:`RWLock`
+      is :meth:`locked`, and act on that information without fear that another
+      thread has grabbed the lock, provided you do not yield to the IOLoop
+      between checking :meth:`locked` and using a protected resource.
+
+    .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects
+
+    :Parameters:
+      - `max_readers`: Optional max readers value, default 1.
+      - `io_loop`: Optional custom IOLoop.
+    """
+    def __init__(self, max_readers=1, io_loop=None):
+        self._max_readers = max_readers
+        self._block = BoundedSemaphore(value=max_readers, io_loop=io_loop)
+
+    def __str__(self):
+        return "<%s _block=%s>" % (
+            self.__class__.__name__,
+            self._block)
+
+    def acquire_read(self, deadline=None):
+        """Attempt to lock for read. Returns a Future.
+
+        The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+            a deadline relative to the current time.
+        """
+        return self._block.acquire(deadline)
+
+    @gen.coroutine
+    def acquire_write(self, deadline=None):
+        """Attempt to lock for write. Returns a Future.
+
+        The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+        :Parameters:
+          - `deadline`: Optional timeout, either an absolute timestamp
+            (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+            a deadline relative to the current time.
+        """
+        futures = [self._block.acquire(deadline) for _ in
+                   xrange(self._max_readers)]
+        try:
+            managers = yield futures
+        except Timeout:
+            for f in futures:
+                # Avoid traceback logging.
+                f.exception()
+            raise
+
+        raise gen.Return(_ContextManagerList(managers))
+
+    def release_read(self):
+        """Releases one reader.
+
+        If any coroutines are waiting for :meth:`acquire_read` (in case of full
+        readers queue), the first in line is awakened.
+
+        If not locked, raise a RuntimeError.
+        """
+        if not self.locked():
+            raise RuntimeError('release unlocked lock')
+        self._block.release()
+
+    def release_write(self):
+        """Releases after write.
+
+        The first in queue will be awakened after release.
+
+        If not locked, raise a RuntimeError.
+        """
+        if not self.locked():
+            raise RuntimeError('release unlocked lock')
+        for i in xrange(self._max_readers):
+            self._block.release()
+
+    def locked(self):
+        """``True`` if the lock has been acquired"""
+        return self._block.locked()
+
+    def __enter__(self):
+        raise RuntimeError(
+            "Use RWLock like 'with (yield lock)', not like"
+            " 'with lock'")
+
+    __exit__ = __enter__
diff --git a/translation.py b/translation.py
index 257d63c..479effd 100644
--- a/translation.py
+++ b/translation.py
@@ -2,8 +2,88 @@ import re, os, tempfile
 from subprocess import Popen, PIPE
 from tornado import gen
 import tornado.process, tornado.iostream
+try: # >=4.2
+    import tornado.locks as locks
+except ImportError:
+    import toro as locks
 import logging
 from select import PIPE_BUF
+from contextlib import contextmanager
+from collections import namedtuple
+from time import time
+
+class Pipeline(object):
+    def __init__(self):
+        # The lock is needed so we don't let two coroutines write
+        # simultaneously to a pipeline; then the first call to read might
+        # read translations of text put there by the second call …
+        self.lock = locks.Lock()
+        # The users count is how many requests have picked this
+        # pipeline for translation. If this is 0, we can safely shut
+        # down the pipeline.
+        self.users = 0
+        self.lastUsage = 0
+        self.useCount = 0
+
+    @contextmanager
+    def use(self):
+        self.lastUsage = time()
+        self.users += 1
+        try:
+            yield
+        finally:
+            self.users -= 1
+            self.lastUsage = time()
+            self.useCount += 1
+
+    def __lt__(self, other):
+        return self.users < other.users
+
+    @gen.coroutine
+    def translate(self, _):
+        raise Exception("Not implemented, subclass me!")
+
+class FlushingPipeline(Pipeline):
+    def __init__(self, commands, *args, **kwargs):
+        self.inpipe, self.outpipe = startPipeline(commands)
+        super().__init__(*args, **kwargs)
+
+    def __del__(self):
+        logging.debug("shutting down FlushingPipeline that was used %d times", self.useCount)
+        self.inpipe.stdin.close()
+        self.inpipe.stdout.close()
+        # TODO: It seems the process immediately becomes <defunct>,
+        # but only completely removed after a second request to the
+        # server – why?
+
+    @gen.coroutine
+    def translate(self, toTranslate):
+        with self.use():
+            all_split = splitForTranslation(toTranslate, n_users=self.users)
+            parts = yield [translateNULFlush(part, self) for part in all_split]
+            return "".join(parts)
+
+class SimplePipeline(Pipeline):
+    def __init__(self, commands, *args, **kwargs):
+        self.commands = commands
+        super().__init__(*args, **kwargs)
+
+    @gen.coroutine
+    def translate(self, toTranslate):
+        with self.use():
+            with (yield self.lock.acquire()):
+                res = yield translateSimple(toTranslate, self.commands)
+                return res
+
+
+ParsedModes = namedtuple('ParsedModes', 'do_flush commands')
+
+def makePipeline(modes_parsed):
+    if modes_parsed.do_flush:
+        return FlushingPipeline(modes_parsed.commands)
+    else:
+        return SimplePipeline(modes_parsed.commands)
+
 
 def startPipeline(commands):
     procs = []
@@ -19,9 +99,9 @@ def startPipeline(commands):
         procs.append(tornado.process.Subprocess(cmd,
                                                 stdin=in_from,
                                                 stdout=out_from))
-
     return procs[0], procs[-1]
 
+
 def parseModeFile(mode_path):
     mode_str = open(mode_path, 'r').read().strip()
     if mode_str:
@@ -41,12 +121,12 @@ def parseModeFile(mode_path):
             commands = []
             for cmd in mode_str.strip().split('|'):
                 cmd = cmd.replace('$2', '').replace('$1', '-g')
-                cmd = re.sub('^(\S*)', '\g<1> -z', cmd)
+                cmd = re.sub(r'^(\S*)', r'\g<1> -z', cmd)
                 commands.append(cmd.split())
-        return do_flush, commands
+        return ParsedModes(do_flush, commands)
     else:
-        logging.error('Could not parse mode file %s' % mode_path)
-        raise Exception('Could not parse mode file %s' % mode_path)
+        logging.error('Could not parse mode file %s', mode_path)
+        raise Exception('Could not parse mode file %s', mode_path)
 
 
 def upToBytes(string, max_bytes):
@@ -66,7 +146,7 @@ def upToBytes(string, max_bytes):
             l -= 1
     return 0
 
-def hardbreakFn(string, rush_hour):
+def hardbreakFn(string, n_users):
     """If others are queueing up to translate at the same time, we send
     short requests, otherwise we try to minimise the number of
     requests, but without letting buffers fill up.
@@ -74,30 +154,35 @@ def hardbreakFn(string, rush_hour):
     These numbers could probably be tweaked a lot.
 
     """
-    if rush_hour:
+    if n_users > 2:
         return 1000
     else:
         return upToBytes(string, PIPE_BUF)
 
 def preferPunctBreak(string, last, hardbreak):
     """We would prefer to split on a period or space seen before the
-    hardbreak, if we can.
+    hardbreak, if we can. If the remaining string is smaller or equal
+    than the hardbreak, return end of the string
 
     """
+
+    if(len(string[last:])<= hardbreak):
+        return last+hardbreak+1
+
     softbreak = int(hardbreak/2)+1
     softnext = last + softbreak
     hardnext = last + hardbreak
     dot = string.rfind(".", softnext, hardnext)
     if dot>-1:
-        return dot
+        return dot+1
     else:
         space = string.rfind(" ", softnext, hardnext)
         if space>-1:
-            return space
+            return space+1
         else:
             return hardnext
 
-def splitForTranslation(toTranslate, rush_hour):
+def splitForTranslation(toTranslate, n_users):
     """Splitting it up a bit ensures we don't fill up FIFO buffers (leads
     to processes hanging on read/write)."""
     allSplit = []	# [].append and join faster than str +=
@@ -105,16 +190,18 @@ def splitForTranslation(toTranslate, rush_hour):
     rounds=0
     while last < len(toTranslate) and rounds<10:
         rounds+=1
-        hardbreak = hardbreakFn(toTranslate[last:], rush_hour)
+        hardbreak = hardbreakFn(toTranslate[last:], n_users)
         next = preferPunctBreak(toTranslate, last, hardbreak)
         allSplit.append(toTranslate[last:next])
+        #logging.getLogger().setLevel(logging.DEBUG)
+        logging.debug("splitForTranslation: last:%s hardbreak:%s next:%s appending:%s"%(last,hardbreak,next,toTranslate[last:next]))
         last = next
     return allSplit
 
 @gen.coroutine
-def translateNULFlush(toTranslate, lock, pipeline):
-    with (yield lock.acquire()):
-        proc_in, proc_out = pipeline
+def translateNULFlush(toTranslate, pipeline):
+    with (yield pipeline.lock.acquire()):
+        proc_in, proc_out = pipeline.inpipe, pipeline.outpipe
 
         proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE)
         proc_deformat.stdin.write(bytes(toTranslate, 'utf-8'))
@@ -125,14 +212,14 @@ def translateNULFlush(toTranslate, lock, pipeline):
         # TODO: PipeIOStream has no flush, but seems to work anyway?
         #proc_in.stdin.flush()
 
-        output = yield proc_out.stdout.read_until(bytes('\0', 'utf-8'))
+        output = yield gen.Task(proc_out.stdout.read_until, bytes('\0', 'utf-8'))
 
         proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE)
         proc_reformat.stdin.write(output)
         return proc_reformat.communicate()[0].decode('utf-8')
 
 
-def translateWithoutFlush(toTranslate, lock, pipeline):
+def translateWithoutFlush(toTranslate, proc_in, proc_out):
     proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE)
     proc_deformat.stdin.write(bytes(toTranslate, 'utf-8'))
     deformatted = proc_deformat.communicate()[0]
@@ -164,8 +251,8 @@ def translatePipeline(toTranslate, commands):
     output.append(toTranslate)
     output.append(towrite.decode('utf-8'))
 
-    pipeline = []
-    pipeline.append("apertium-deshtml")
+    all_cmds = []
+    all_cmds.append("apertium-deshtml")
 
     for cmd in commands:
         proc = Popen(cmd, stdin=PIPE, stdout=PIPE)
@@ -173,40 +260,29 @@ def translatePipeline(toTranslate, commands):
         towrite = proc.communicate()[0]
 
         output.append(towrite.decode('utf-8'))
-        pipeline.append(cmd)
+        all_cmds.append(cmd)
 
     proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE)
     proc_reformat.stdin.write(towrite)
     towrite = proc_reformat.communicate()[0].decode('utf-8')
 
     output.append(towrite)
-    pipeline.append("apertium-rehtml-noent")
+    all_cmds.append("apertium-rehtml-noent")
 
-    return output, pipeline
+    return output, all_cmds
 
 @gen.coroutine
 def translateSimple(toTranslate, commands):
     proc_in, proc_out = startPipeline(commands)
-    assert(proc_in==proc_out)
-    yield proc_in.stdin.write(bytes(toTranslate, 'utf-8'))
+    assert proc_in == proc_out
+    yield gen.Task(proc_in.stdin.write, bytes(toTranslate, 'utf-8'))
     proc_in.stdin.close()
-    translated = yield proc_out.stdout.read_until_close()
+    translated = yield gen.Task(proc_out.stdout.read_until_close)
     proc_in.stdout.close()
     return translated.decode('utf-8')
 
-def translateDoc(fileToTranslate, format, modeFile):
-    modesdir=os.path.dirname(os.path.dirname(modeFile))
-    mode=os.path.splitext(os.path.basename(modeFile))[0]
-    return Popen(['apertium', '-f', format, '-d', modesdir, mode],
-                  stdin=fileToTranslate, stdout=PIPE).communicate()[0]
-
- at gen.coroutine
-def translate(toTranslate, lock, pipeline, commands):
-    if pipeline:
-        allSplit = splitForTranslation(toTranslate, rush_hour = lock.locked())
-        parts = yield [translateNULFlush(part, lock, pipeline) for part in allSplit]
-        return "".join(parts)
-    else:
-        with (yield lock.acquire()):
-            res = yield translateSimple(toTranslate, commands)
-            return res
+def translateDoc(fileToTranslate, fmt, modeFile):
+    modesdir = os.path.dirname(os.path.dirname(modeFile))
+    mode = os.path.splitext(os.path.basename(modeFile))[0]
+    return Popen(['apertium', '-f', fmt, '-d', modesdir, mode],
+                 stdin=fileToTranslate, stdout=PIPE).communicate()[0]

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/apertium-apy.git



More information about the debian-science-commits mailing list