[apertium-apy] 01/05: Imported Upstream version 0.1~r61425
Kartik Mistry
kartik at moszumanska.debian.org
Fri Aug 14 05:58:32 UTC 2015
This is an automated email from the git hooks/post-receive script.
kartik pushed a commit to branch master
in repository apertium-apy.
commit 5a401edd0f9f0e09e05cfa8a47e85ca909657b3c
Author: Kartik Mistry <kartik.mistry at gmail.com>
Date: Fri Aug 14 11:16:28 2015 +0530
Imported Upstream version 0.1~r61425
---
servlet.py | 134 ++++---
tools/apertiumlangs.sql | 4 +-
toro.py | 983 ++++++++++++++++++++++++++++++++++++++++++++++++
translation.py | 158 ++++++--
4 files changed, 1193 insertions(+), 86 deletions(-)
diff --git a/servlet.py b/servlet.py
index 465b562..832dbdc 100755
--- a/servlet.py
+++ b/servlet.py
@@ -9,6 +9,7 @@ from multiprocessing import Pool, TimeoutError
from functools import wraps
from threading import Thread
from datetime import datetime
+import heapq
import tornado, tornado.web, tornado.httpserver, tornado.process, tornado.iostream
from tornado import escape, gen
@@ -18,8 +19,6 @@ try: #3.1
except ImportError: #2.1
from tornado.options import enable_pretty_logging
-import toro
-
from modeSearch import searchPath
from util import getLocalizedLanguages, apertium, bilingualTranslate, removeLast, stripTags, processPerWord, getCoverage, getCoverages, toAlpha3Code, toAlpha2Code, noteUnknownToken, scaleMtLog, TranslationInfo, closeDb, flushUnknownWords, inMemoryUnknownToken
import translation
@@ -57,7 +56,8 @@ class BaseHandler(tornado.web.RequestHandler):
analyzers = {}
generators = {}
taggers = {}
- pipelines = {} # (l1, l2): (inpipe, outpipe), only contains flushing pairs!
+ pipelines = {} # (l1, l2): [translation.Pipeline], only contains flushing pairs!
+ pipelines_holding = []
callback = None
timeout = None
scaleMtLogs = False
@@ -67,15 +67,15 @@ class BaseHandler(tornado.web.RequestHandler):
stats = {
'useCount': {},
- 'lastUsage': {},
'vmsize': 0,
}
- # The lock is needed so we don't let two coroutines write
- # simultaneously to a pipeline; then the first call to read might
- # read translations of text put there by the second call …
- pipeline_locks = {} # (l1, l2): lock for (l1, l2) in pairs
- pipeline_cmds = {} # (l1, l2): (do_flush, commands)
+ pipeline_cmds = {} # (l1, l2): translation.ParsedModes
+ max_pipes_per_pair = 1
+ min_pipes_per_pair = 0
+ max_users_per_pipe = 5
+ max_idle_secs = 0
+ restart_pipe_after = 1000
def initialize(self):
self.callback = self.get_argument('callback', default=None)
@@ -177,7 +177,14 @@ class StatsHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
self.sendResponse({
- 'responseData': { '%s-%s' % pair: useCount for pair, useCount in self.stats['useCount'].items() },
+ 'responseData': {
+ 'useCount': { '%s-%s' % pair: useCount
+ for pair, useCount in self.stats['useCount'].items() },
+ 'runningPipes': { '%s-%s' % pair: len(pipes)
+ for pair, pipes in self.pipelines.items()
+ if pipes != [] },
+ 'holdingPipes': len(self.pipelines_holding),
+ },
'responseDetails': None,
'responseStatus': 200
})
@@ -190,8 +197,6 @@ class RootHandler(BaseHandler):
class TranslateHandler(BaseHandler):
def notePairUsage(self, pair):
self.stats['useCount'][pair] = 1 + self.stats['useCount'].get(pair, 0)
- if self.max_idle_secs:
- self.stats['lastUsage'][pair] = time.time()
unknownMarkRE = re.compile(r'\*([^.,;:\t\* ]+)')
def maybeStripMarks(self, markUnknown, l1, l2, translated):
@@ -209,24 +214,36 @@ class TranslateHandler(BaseHandler):
else:
noteUnknownToken(token, pair, self.missingFreqs)
- def shutdownPair(self, pair):
- logging.info("shutting down")
- self.pipelines[pair][0].stdin.close()
- self.pipelines[pair][0].stdout.close()
- self.pipelines.pop(pair)
+ def cleanable(self, i, pair, pipe):
+ if pipe.useCount > self.restart_pipe_after:
+ # Not affected by min_pipes_per_pair
+ logging.info('A pipe for pair %s-%s has handled %d requests, scheduling restart',
+ pair[0], pair[1], self.restart_pipe_after)
+ return True
+ elif (i >= self.min_pipes_per_pair
+ and self.max_idle_secs != 0
+ and time.time() - pipe.lastUsage > self.max_idle_secs):
+ logging.info("A pipe for pair %s-%s hasn't been used in %d secs, scheduling shutdown",
+ pair[0], pair[1], self.max_idle_secs)
+ return True
+ else:
+ return False
def cleanPairs(self):
- if self.max_idle_secs:
- for pair, lastUsage in self.stats['lastUsage'].items():
- if pair in self.pipelines and time.time() - lastUsage > self.max_idle_secs:
- logging.info('Shutting down pair %s-%s since it has not been used in %d seconds' % (
- pair[0], pair[1], self.max_idle_secs))
- self.shutdownPair(pair)
-
- def getPipeLock(self, l1, l2):
- if (l1, l2) not in self.pipeline_locks:
- self.pipeline_locks[(l1, l2)] = toro.Lock()
- return self.pipeline_locks[(l1, l2)]
+ for pair in self.pipelines:
+ pipes = self.pipelines[pair]
+ to_clean = set(p for i, p in enumerate(pipes)
+ if self.cleanable(i, pair, p))
+ self.pipelines_holding += to_clean
+ pipes[:] = [p for p in pipes if not p in to_clean]
+ heapq.heapify(pipes)
+ # The holding area lets us restart pipes after n usages next
+ # time round, since with lots of traffic an active pipe may
+ # never reach 0 users
+ self.pipelines_holding[:] = [p for p in self.pipelines_holding
+ if p.users > 0]
+ if self.pipelines_holding:
+ logging.info("%d pipelines still scheduled for shutdown", len(self.pipelines_holding))
def getPipeCmds(self, l1, l2):
if (l1, l2) not in self.pipeline_cmds:
@@ -234,14 +251,30 @@ class TranslateHandler(BaseHandler):
self.pipeline_cmds[(l1, l2)] = translation.parseModeFile(mode_path)
return self.pipeline_cmds[(l1, l2)]
+ def shouldStartPipe(self, l1, l2):
+ pipes = self.pipelines.get((l1, l2), [])
+ if pipes == []:
+ logging.info("%s-%s not in pipelines of this process",
+ l1, l2)
+ return True
+ else:
+ min_p = pipes[0]
+ if len(pipes) < self.max_pipes_per_pair and min_p.users > self.max_users_per_pipe:
+ logging.info("%s-%s has ≥%d users per pipe but only %d pipes",
+ l1, l2, min_p.users, len(pipes))
+ return True
+ else:
+ return False
+
def getPipeline(self, l1, l2):
- do_flush, commands = self.getPipeCmds(l1, l2)
- if not do_flush:
- return None
- if (l1, l2) not in self.pipelines:
- logging.info('%s-%s not in pipelines of this process, starting …' % (l1, l2))
- self.pipelines[(l1, l2)] = translation.startPipeline(commands)
- return self.pipelines[(l1, l2)]
+ pair = (l1, l2)
+ if self.shouldStartPipe(l1, l2):
+ logging.info("Starting up a new pipeline for %s-%s …", l1, l2)
+ if not pair in self.pipelines:
+ self.pipelines[pair] = []
+ p = translation.makePipeline(self.getPipeCmds(l1, l2))
+ heapq.heappush(self.pipelines[pair], p)
+ return self.pipelines[pair][0]
def logBeforeTranslation(self):
if self.scaleMtLogs:
@@ -274,10 +307,9 @@ class TranslateHandler(BaseHandler):
if '%s-%s' % (l1, l2) in self.pairs:
before = self.logBeforeTranslation()
- lock = self.getPipeLock(l1, l2)
- _, commands = self.getPipeCmds(l1, l2)
pipeline = self.getPipeline(l1, l2)
- translated = yield translation.translate(toTranslate, lock, pipeline, commands)
+ self.notePairUsage((l1, l2))
+ translated = yield pipeline.translate(toTranslate)
self.logAfterTranslation(before, toTranslate)
self.sendResponse({
'responseData': {
@@ -286,7 +318,6 @@ class TranslateHandler(BaseHandler):
'responseDetails': None,
'responseStatus': 200
})
- self.notePairUsage((l1, l2))
self.cleanPairs()
else:
self.send_error(400, explanation='That pair is not installed')
@@ -641,7 +672,7 @@ class PipeDebugHandler(BaseHandler):
missingFreqsDb = ''
-def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_idle_secs, verbosity=0, scaleMtLogs=False, memory=0):
+def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeout, max_pipes_per_pair, min_pipes_per_pair, max_users_per_pipe, max_idle_secs, restart_pipe_after, verbosity=0, scaleMtLogs=False, memory=0):
global missingFreqsDb
missingFreqsDb= missingFreqs
@@ -650,7 +681,11 @@ def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeo
Handler.langNames = langNames
Handler.missingFreqs = missingFreqs
Handler.timeout = timeout
+ Handler.max_pipes_per_pair = max_pipes_per_pair
+ Handler.min_pipes_per_pair = min_pipes_per_pair
+ Handler.max_users_per_pipe = max_users_per_pipe
Handler.max_idle_secs = max_idle_secs
+ Handler.restart_pipe_after = restart_pipe_after
Handler.scaleMtLogs = scaleMtLogs
Handler.inMemoryUnknown = True if memory > 0 else False
Handler.inMemoryLimit = memory
@@ -673,8 +708,17 @@ def setupHandler(port, pairs_path, nonpairs_path, langNames, missingFreqs, timeo
for dirpath, modename, lang_pair in modes['tagger']:
Handler.taggers[lang_pair] = (dirpath, modename)
+def sanity_check():
+ locale_vars = ["LANG", "LC_ALL"]
+ u8 = re.compile("UTF-?8", re.IGNORECASE)
+ if not any(re.search(u8, os.environ.get(key, ""))
+ for key in locale_vars):
+ print("servlet.py: error: APY needs a UTF-8 locale, please set LANG or LC_ALL",
+ file=sys.stderr)
+ sys.exit(1)
if __name__ == '__main__':
+ sanity_check()
parser = argparse.ArgumentParser(description='Start Apertium APY')
parser.add_argument('pairs_path', help='path to Apertium installed pairs (all modes files in this path are included)')
parser.add_argument('-s', '--nonpairs-path', help='path to Apertium SVN (only non-translator debug modes are included from this path)')
@@ -684,10 +728,14 @@ if __name__ == '__main__':
parser.add_argument('-c', '--ssl-cert', help='path to SSL Certificate', default=None)
parser.add_argument('-k', '--ssl-key', help='path to SSL Key File', default=None)
parser.add_argument('-t', '--timeout', help='timeout for requests (default = 10)', type=int, default=10)
- parser.add_argument('-j', '--num-processes', help='number of processes to run (default = number of cores)', type=int, default=0)
+ parser.add_argument('-j', '--num-processes', help='number of processes to run (default = 1; use 0 to run one http server per core, where each http server runs all available language pairs)', nargs='?', type=int, default=1)
parser.add_argument('-d', '--daemon', help='daemon mode: redirects stdout and stderr to files apertium-apy.log and apertium-apy.err ; use with --log-path', action='store_true')
parser.add_argument('-P', '--log-path', help='path to log output files to in daemon mode; defaults to local directory', default='./')
- parser.add_argument('-m', '--max-idle-secs', help='shut down pipelines it have not been used in this many seconds', type=int, default=0)
+ parser.add_argument('-i', '--max-pipes-per-pair', help='how many pipelines we can spin up per language pair (default = 1)', type=int, default=1)
+ parser.add_argument('-n', '--min-pipes-per-pair', help='when shutting down pipelines, keep at least this many open per language pair (default = 0)', type=int, default=0)
+ parser.add_argument('-u', '--max-users-per-pipe', help='how many concurrent requests per pipeline before we consider spinning up a new one (default = 5)', type=int, default=5)
+ parser.add_argument('-m', '--max-idle-secs', help='if specified, shut down pipelines that have not been used in this many seconds', type=int, default=0)
+ parser.add_argument('-r', '--restart-pipe-after', help='restart a pipeline if it has had this many requests (default = 1000)', type=int, default=1000)
parser.add_argument('-v', '--verbosity', help='logging verbosity', type=int, default=0)
parser.add_argument('-S', '--scalemt-logs', help='generates ScaleMT-like logs; use with --log-path; disables', action='store_true')
parser.add_argument('-M', '--unknown-memory-limit', help="keeps unknown words in memory until a limit is reached", type=int, default=0)
@@ -718,7 +766,7 @@ if __name__ == '__main__':
if not cld2:
logging.warning('Unable to import CLD2, continuing using naive method of language detection')
- setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_idle_secs, args.verbosity, args.scalemt_logs, args.unknown_memory_limit)
+ setupHandler(args.port, args.pairs_path, args.nonpairs_path, args.lang_names, args.missing_freqs, args.timeout, args.max_pipes_per_pair, args.min_pipes_per_pair, args.max_users_per_pipe, args.max_idle_secs, args.restart_pipe_after, args.verbosity, args.scalemt_logs, args.unknown_memory_limit)
application = tornado.web.Application([
(r'/', RootHandler),
diff --git a/tools/apertiumlangs.sql b/tools/apertiumlangs.sql
index 7bbf4ba..c62fdb4 100644
--- a/tools/apertiumlangs.sql
+++ b/tools/apertiumlangs.sql
@@ -2721,7 +2721,7 @@ INSERT INTO "languageNames" VALUES(2734,'en','ne','Nepali');
INSERT INTO "languageNames" VALUES(2735,'en','nl','Dutch');
INSERT INTO "languageNames" VALUES(2736,'en','nn','Norwegian Nynorsk');
INSERT INTO "languageNames" VALUES(2737,'en','no','Norwegian');
-INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogai');
+INSERT INTO "languageNames" VALUES(2738,'en','nog','Nogay');
INSERT INTO "languageNames" VALUES(2739,'en','oc','Occitan');
INSERT INTO "languageNames" VALUES(2740,'en','os','Ossetic');
INSERT INTO "languageNames" VALUES(2741,'en','pa','Punjabi');
@@ -2756,7 +2756,7 @@ INSERT INTO "languageNames" VALUES(2769,'en','tk','Turkmen');
INSERT INTO "languageNames" VALUES(2770,'en','tl','Tagalog');
INSERT INTO "languageNames" VALUES(2771,'en','tr','Turkish');
INSERT INTO "languageNames" VALUES(2772,'en','tt','Tatar');
-INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvinian');
+INSERT INTO "languageNames" VALUES(2773,'en','tyv','Tuvan');
INSERT INTO "languageNames" VALUES(2774,'en','udm','Udmurt');
INSERT INTO "languageNames" VALUES(2775,'en','uk','Ukrainian');
INSERT INTO "languageNames" VALUES(2776,'en','ur','Urdu');
diff --git a/toro.py b/toro.py
new file mode 100644
index 0000000..d978299
--- /dev/null
+++ b/toro.py
@@ -0,0 +1,983 @@
+# From https://github.com/ajdavis/toro/
+
+# Toro Copyright (c) 2012 A. Jesse Jiryu Davis
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import contextlib
+import heapq
+import collections
+from functools import partial
+from queue import Full, Empty
+
+from tornado import ioloop
+from tornado import gen
+from tornado.concurrent import Future
+
+
+version_tuple = (0, 8, '+')
+
+version = '.'.join(map(str, version_tuple))
+"""Current version of Toro."""
+
+
+__all__ = [
+ # Exceptions
+ 'NotReady', 'AlreadySet', 'Full', 'Empty', 'Timeout',
+
+ # Primitives
+ 'AsyncResult', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore',
+ 'Lock',
+
+ # Queues
+ 'Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'
+]
+
+
+class NotReady(Exception):
+ """Raised when accessing an :class:`AsyncResult` that has no value yet."""
+ pass
+
+
+class AlreadySet(Exception):
+ """Raised when setting a value on an :class:`AsyncResult` that already
+ has one."""
+ pass
+
+
+class Timeout(Exception):
+ """Raised when a deadline passes before a Future is ready."""
+
+ def __str__(self):
+ return "Timeout"
+
+
+class _TimeoutFuture(Future):
+
+ def __init__(self, deadline, io_loop):
+ """Create a Future with optional deadline.
+
+ If deadline is not None, it may be a number denoting a unix timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` object
+ for a deadline relative to the current time.
+
+ set_exception(toro.Timeout()) is executed after a timeout.
+ """
+
+ super(_TimeoutFuture, self).__init__()
+ self.io_loop = io_loop
+ if deadline is not None:
+ callback = partial(self.set_exception, Timeout())
+ self._timeout_handle = io_loop.add_timeout(deadline, callback)
+ else:
+ self._timeout_handle = None
+
+ def set_result(self, result):
+ self._cancel_timeout()
+ super(_TimeoutFuture, self).set_result(result)
+
+ def set_exception(self, exception):
+ self._cancel_timeout()
+ super(_TimeoutFuture, self).set_exception(exception)
+
+ def _cancel_timeout(self):
+ if self._timeout_handle:
+ self.io_loop.remove_timeout(self._timeout_handle)
+ self._timeout_handle = None
+
+
+class _ContextManagerList(list):
+ def __enter__(self, *args, **kwargs):
+ for obj in self:
+ obj.__enter__(*args, **kwargs)
+
+ def __exit__(self, *args, **kwargs):
+ for obj in self:
+ obj.__exit__(*args, **kwargs)
+
+
+class _ContextManagerFuture(Future):
+ """A Future that can be used with the "with" statement.
+
+ When a coroutine yields this Future, the return value is a context manager
+ that can be used like:
+
+ with (yield future):
+ pass
+
+ At the end of the block, the Future's exit callback is run. Used for
+ Lock.acquire() and Semaphore.acquire().
+ """
+ def __init__(self, wrapped, exit_callback):
+ super(_ContextManagerFuture, self).__init__()
+ wrapped.add_done_callback(self._done_callback)
+ self.exit_callback = exit_callback
+
+ def _done_callback(self, wrapped):
+ if wrapped.exception():
+ self.set_exception(wrapped.exception())
+ else:
+ self.set_result(wrapped.result())
+
+ def result(self):
+ if self.exception():
+ raise self.exception()
+
+ # Otherwise return a context manager that cleans up after the block.
+ @contextlib.contextmanager
+ def f():
+ try:
+ yield
+ finally:
+ self.exit_callback()
+ return f()
+
+
+def _consume_expired_waiters(waiters):
+ # Delete waiters at the head of the queue who've timed out
+ while waiters and waiters[0].done():
+ waiters.popleft()
+
+
+_null_result = object()
+
+
+class AsyncResult(object):
+ """A one-time event that stores a value or an exception.
+
+ The only distinction between AsyncResult and a simple Future is that
+ AsyncResult lets coroutines wait with a deadline. The deadline can be
+ configured separately for each waiter.
+
+ An :class:`AsyncResult` instance cannot be reset.
+
+ :Parameters:
+ - `io_loop`: Optional custom IOLoop.
+ """
+
+ def __init__(self, io_loop=None):
+ self.io_loop = io_loop or ioloop.IOLoop.current()
+ self.value = _null_result
+ self.waiters = []
+
+ def __str__(self):
+ result = '<%s ' % (self.__class__.__name__, )
+ if self.ready():
+ result += 'value=%r' % self.value
+ else:
+ result += 'unset'
+ if self.waiters:
+ result += ' waiters[%s]' % len(self.waiters)
+
+ return result + '>'
+
+ def set(self, value):
+ """Set a value and wake up all the waiters."""
+ if self.ready():
+ raise AlreadySet
+
+ self.value = value
+ waiters, self.waiters = self.waiters, []
+ for waiter in waiters:
+ if not waiter.done(): # Might have timed out
+ waiter.set_result(value)
+
+ def ready(self):
+ return self.value is not _null_result
+
+ def get(self, deadline=None):
+ """Get a value once :meth:`set` is called. Returns a Future.
+
+ The Future's result will be the value. The Future raises
+ :exc:`toro.Timeout` if no value is set before the deadline.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+ a deadline relative to the current time.
+ """
+ future = _TimeoutFuture(deadline, self.io_loop)
+ if self.ready():
+ future.set_result(self.value)
+ else:
+ self.waiters.append(future)
+
+ return future
+
+ def get_nowait(self):
+ """Get the value if ready, or raise :class:`NotReady`."""
+ if self.ready():
+ return self.value
+ else:
+ raise NotReady
+
+
+class Condition(object):
+ """A condition allows one or more coroutines to wait until notified.
+
+ Like a standard Condition_, but does not need an underlying lock that
+ is acquired and released.
+
+ .. _Condition: http://docs.python.org/library/threading.html#threading.Condition
+
+ :Parameters:
+ - `io_loop`: Optional custom IOLoop.
+ """
+
+ def __init__(self, io_loop=None):
+ self.io_loop = io_loop or ioloop.IOLoop.current()
+ self.waiters = collections.deque() # Queue of _Waiter objects
+
+ def __str__(self):
+ result = '<%s' % (self.__class__.__name__, )
+ if self.waiters:
+ result += ' waiters[%s]' % len(self.waiters)
+ return result + '>'
+
+ def wait(self, deadline=None):
+ """Wait for :meth:`notify`. Returns a Future.
+
+ :exc:`~toro.Timeout` is executed after a timeout.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ future = _TimeoutFuture(deadline, self.io_loop)
+ self.waiters.append(future)
+ return future
+
+ def notify(self, n=1):
+ """Wake up `n` waiters.
+
+ :Parameters:
+ - `n`: The number of waiters to awaken (default: 1)
+ """
+ waiters = [] # Waiters we plan to run right now
+ while n and self.waiters:
+ waiter = self.waiters.popleft()
+ if not waiter.done(): # Might have timed out
+ n -= 1
+ waiters.append(waiter)
+
+ for waiter in waiters:
+ waiter.set_result(None)
+
+ def notify_all(self):
+ """Wake up all waiters."""
+ self.notify(len(self.waiters))
+
+
+# TODO: show correct examples that avoid thread / process issues w/ concurrent.futures.Future
+class Event(object):
+ """An event blocks coroutines until its internal flag is set to True.
+
+ Similar to threading.Event_.
+
+ .. _threading.Event: http://docs.python.org/library/threading.html#threading.Event
+
+ .. seealso:: :doc:`examples/event_example`
+
+ :Parameters:
+ - `io_loop`: Optional custom IOLoop.
+ """
+
+ def __init__(self, io_loop=None):
+ self.io_loop = io_loop or ioloop.IOLoop.current()
+ self.condition = Condition(io_loop=io_loop)
+ self._flag = False
+
+ def __str__(self):
+ return '<%s %s>' % (
+ self.__class__.__name__, 'set' if self._flag else 'clear')
+
+ def is_set(self):
+ """Return ``True`` if and only if the internal flag is true."""
+ return self._flag
+
+ def set(self):
+ """Set the internal flag to ``True``. All waiters are awakened.
+ Calling :meth:`wait` once the flag is true will not block.
+ """
+ self._flag = True
+ self.condition.notify_all()
+
+ def clear(self):
+ """Reset the internal flag to ``False``. Calls to :meth:`wait`
+ will block until :meth:`set` is called.
+ """
+ self._flag = False
+
+ def wait(self, deadline=None):
+ """Block until the internal flag is true. Returns a Future.
+
+ The Future raises :exc:`~toro.Timeout` after a timeout.
+
+ :Parameters:
+ - `callback`: Function taking no arguments.
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ if self._flag:
+ future = _TimeoutFuture(None, self.io_loop)
+ future.set_result(None)
+ return future
+ else:
+ return self.condition.wait(deadline)
+
+
+class Queue(object):
+ """Create a queue object with a given maximum size.
+
+ If `maxsize` is 0 (the default) the queue size is unbounded.
+
+ Unlike the `standard Queue`_, you can reliably know this Queue's size
+ with :meth:`qsize`, since your single-threaded Tornado application won't
+ be interrupted between calling :meth:`qsize` and doing an operation on the
+ Queue.
+
+ **Examples:**
+
+ :doc:`examples/producer_consumer_example`
+
+ :doc:`examples/web_spider_example`
+
+ :Parameters:
+ - `maxsize`: Optional size limit (no limit by default).
+ - `io_loop`: Optional custom IOLoop.
+
+ .. _`Gevent's Queue`: http://www.gevent.org/gevent.queue.html
+
+ .. _`standard Queue`: http://docs.python.org/library/queue.html#Queue.Queue
+ """
+ def __init__(self, maxsize=0, io_loop=None):
+ self.io_loop = io_loop or ioloop.IOLoop.current()
+ if maxsize is None:
+ raise TypeError("maxsize can't be None")
+
+ if maxsize < 0:
+ raise ValueError("maxsize can't be negative")
+
+ self._maxsize = maxsize
+
+ # _TimeoutFutures
+ self.getters = collections.deque([])
+ # Pairs of (item, _TimeoutFuture)
+ self.putters = collections.deque([])
+ self._init(maxsize)
+
+ def _init(self, maxsize):
+ self.queue = collections.deque()
+
+ def _get(self):
+ return self.queue.popleft()
+
+ def _put(self, item):
+ self.queue.append(item)
+
+ def __repr__(self):
+ return '<%s at %s %s>' % (
+ type(self).__name__, hex(id(self)), self._format())
+
+ def __str__(self):
+ return '<%s %s>' % (type(self).__name__, self._format())
+
+ def _format(self):
+ result = 'maxsize=%r' % (self.maxsize, )
+ if getattr(self, 'queue', None):
+ result += ' queue=%r' % self.queue
+ if self.getters:
+ result += ' getters[%s]' % len(self.getters)
+ if self.putters:
+ result += ' putters[%s]' % len(self.putters)
+ return result
+
+ def _consume_expired_putters(self):
+ # Delete waiters at the head of the queue who've timed out
+ while self.putters and self.putters[0][1].done():
+ self.putters.popleft()
+
+ def qsize(self):
+ """Number of items in the queue"""
+ return len(self.queue)
+
+ @property
+ def maxsize(self):
+ """Number of items allowed in the queue."""
+ return self._maxsize
+
+ def empty(self):
+ """Return ``True`` if the queue is empty, ``False`` otherwise."""
+ return not self.queue
+
+ def full(self):
+ """Return ``True`` if there are `maxsize` items in the queue.
+
+ .. note:: if the Queue was initialized with `maxsize=0`
+ (the default), then :meth:`full` is never ``True``.
+ """
+ if self.maxsize == 0:
+ return False
+ else:
+ return self.maxsize <= self.qsize()
+
+ def put(self, item, deadline=None):
+ """Put an item into the queue. Returns a Future.
+
+ The Future blocks until a free slot is available for `item`, or raises
+ :exc:`toro.Timeout`.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ _consume_expired_waiters(self.getters)
+ future = _TimeoutFuture(deadline, self.io_loop)
+ if self.getters:
+ assert not self.queue, "queue non-empty, why are getters waiting?"
+ getter = self.getters.popleft()
+
+ # Use _put and _get instead of passing item straight to getter, in
+ # case a subclass has logic that must run (e.g. JoinableQueue).
+ self._put(item)
+ getter.set_result(self._get())
+ future.set_result(None)
+ else:
+ if self.maxsize and self.maxsize <= self.qsize():
+ self.putters.append((item, future))
+ else:
+ self._put(item)
+ future.set_result(None)
+
+ return future
+
+ def put_nowait(self, item):
+ """Put an item into the queue without blocking.
+
+ If no free slot is immediately available, raise queue.Full.
+ """
+ _consume_expired_waiters(self.getters)
+ if self.getters:
+ assert not self.queue, "queue non-empty, why are getters waiting?"
+ getter = self.getters.popleft()
+
+ self._put(item)
+ getter.set_result(self._get())
+ elif self.maxsize and self.maxsize <= self.qsize():
+ raise Full
+ else:
+ self._put(item)
+
+ def get(self, deadline=None):
+ """Remove and return an item from the queue. Returns a Future.
+
+ The Future blocks until an item is available, or raises
+ :exc:`toro.Timeout`.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ self._consume_expired_putters()
+ future = _TimeoutFuture(deadline, self.io_loop)
+ if self.putters:
+ assert self.full(), "queue not full, why are putters waiting?"
+ item, putter = self.putters.popleft()
+ self._put(item)
+ putter.set_result(None)
+ future.set_result(self._get())
+ elif self.qsize():
+ future.set_result(self._get())
+ else:
+ self.getters.append(future)
+
+ return future
+
+ def get_nowait(self):
+ """Remove and return an item from the queue without blocking.
+
+ Return an item if one is immediately available, else raise
+ :exc:`queue.Empty`.
+ """
+ self._consume_expired_putters()
+ if self.putters:
+ assert self.full(), "queue not full, why are putters waiting?"
+ item, putter = self.putters.popleft()
+ self._put(item)
+ putter.set_result(None)
+ return self._get()
+ elif self.qsize():
+ return self._get()
+ else:
+ raise Empty
+
+
+class PriorityQueue(Queue):
+ """A subclass of :class:`Queue` that retrieves entries in priority order
+ (lowest first).
+
+ Entries are typically tuples of the form: ``(priority number, data)``.
+
+ :Parameters:
+ - `maxsize`: Optional size limit (no limit by default).
+ - `initial`: Optional sequence of initial items.
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def _init(self, maxsize):
+ self.queue = []
+
+ def _put(self, item, heappush=heapq.heappush):
+ heappush(self.queue, item)
+
+ def _get(self, heappop=heapq.heappop):
+ return heappop(self.queue)
+
+
+class LifoQueue(Queue):
+ """A subclass of :class:`Queue` that retrieves most recently added entries
+ first.
+
+ :Parameters:
+ - `maxsize`: Optional size limit (no limit by default).
+ - `initial`: Optional sequence of initial items.
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def _init(self, maxsize):
+ self.queue = []
+
+ def _put(self, item):
+ self.queue.append(item)
+
+ def _get(self):
+ return self.queue.pop()
+
+
+class JoinableQueue(Queue):
+ """A subclass of :class:`Queue` that additionally has :meth:`task_done`
+ and :meth:`join` methods.
+
+ .. seealso:: :doc:`examples/web_spider_example`
+
+ :Parameters:
+ - `maxsize`: Optional size limit (no limit by default).
+ - `initial`: Optional sequence of initial items.
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def __init__(self, maxsize=0, io_loop=None):
+ Queue.__init__(self, maxsize=maxsize, io_loop=io_loop)
+ self.unfinished_tasks = 0
+ self._finished = Event(io_loop)
+ self._finished.set()
+
+ def _format(self):
+ result = Queue._format(self)
+ if self.unfinished_tasks:
+ result += ' tasks=%s' % self.unfinished_tasks
+ return result
+
+ def _put(self, item):
+ self.unfinished_tasks += 1
+ self._finished.clear()
+ Queue._put(self, item)
+
+ def task_done(self):
+ """Indicate that a formerly enqueued task is complete.
+
+ Used by queue consumers. For each :meth:`get <Queue.get>` used to
+ fetch a task, a subsequent call to :meth:`task_done` tells the queue
+ that the processing on the task is complete.
+
+ If a :meth:`join` is currently blocking, it will resume when all
+ items have been processed (meaning that a :meth:`task_done` call was
+ received for every item that had been :meth:`put <Queue.put>` into the
+ queue).
+
+ Raises ``ValueError`` if called more times than there were items
+ placed in the queue.
+ """
+ if self.unfinished_tasks <= 0:
+ raise ValueError('task_done() called too many times')
+ self.unfinished_tasks -= 1
+ if self.unfinished_tasks == 0:
+ self._finished.set()
+
+ def join(self, deadline=None):
+ """Block until all items in the queue are processed. Returns a Future.
+
+ The count of unfinished tasks goes up whenever an item is added to
+ the queue. The count goes down whenever a consumer calls
+ :meth:`task_done` to indicate that all work on the item is complete.
+ When the count of unfinished tasks drops to zero, :meth:`join`
+ unblocks.
+
+ The Future raises :exc:`toro.Timeout` if the count is not zero before
+ the deadline.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ return self._finished.wait(deadline)
+
+
+class Semaphore(object):
+ """A lock that can be acquired a fixed number of times before blocking.
+
+ A Semaphore manages a counter representing the number of release() calls
+ minus the number of acquire() calls, plus an initial value. The acquire()
+ method blocks if necessary until it can return without making the counter
+ negative.
+
+ If not given, value defaults to 1.
+
+ :meth:`acquire` supports the context manager protocol:
+
+ >>> from tornado import gen
+ >>> import toro
+ >>> semaphore = toro.Semaphore()
+ >>>
+ >>> @gen.coroutine
+ ... def f():
+ ... with (yield semaphore.acquire()):
+ ... assert semaphore.locked()
+ ...
+ ... assert not semaphore.locked()
+
+ .. note:: Unlike the standard threading.Semaphore_, a :class:`Semaphore`
+ can tell you the current value of its :attr:`counter`, because code in a
+ single-threaded Tornado app can check these values and act upon them
+ without fear of interruption from another thread.
+
+ .. _threading.Semaphore: http://docs.python.org/library/threading.html#threading.Semaphore
+
+ .. seealso:: :doc:`examples/web_spider_example`
+
+ :Parameters:
+ - `value`: An int, the initial value (default 1).
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def __init__(self, value=1, io_loop=None):
+ if value < 0:
+ raise ValueError('semaphore initial value must be >= 0')
+
+ # The semaphore is implemented as a Queue with 'value' objects
+ self.q = Queue(io_loop=io_loop)
+ for _ in range(value):
+ self.q.put_nowait(None)
+
+ self._unlocked = Event(io_loop=io_loop)
+ if value:
+ self._unlocked.set()
+
+ def __repr__(self):
+ return '<%s at %s%s>' % (
+ type(self).__name__, hex(id(self)), self._format())
+
+ def __str__(self):
+ return '<%s%s>' % (
+ self.__class__.__name__, self._format())
+
+ def _format(self):
+ return ' counter=%s' % self.counter
+
+ @property
+ def counter(self):
+ """An integer, the current semaphore value"""
+ return self.q.qsize()
+
+ def locked(self):
+ """True if :attr:`counter` is zero"""
+ return self.q.empty()
+
+ def release(self):
+ """Increment :attr:`counter` and wake one waiter.
+ """
+ self.q.put(None)
+ if not self.locked():
+ # No one was waiting on acquire(), so self.q.qsize() is positive
+ self._unlocked.set()
+
+ def wait(self, deadline=None):
+ """Wait for :attr:`locked` to be False. Returns a Future.
+
+ The Future raises :exc:`toro.Timeout` after the deadline.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ return self._unlocked.wait(deadline)
+
+ def acquire(self, deadline=None):
+ """Decrement :attr:`counter`. Returns a Future.
+
+ Block if the counter is zero and wait for a :meth:`release`. The
+ Future raises :exc:`toro.Timeout` after the deadline.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ queue_future = self.q.get(deadline)
+ if self.q.empty():
+ self._unlocked.clear()
+ future = _ContextManagerFuture(queue_future, self.release)
+ return future
+
+ def __enter__(self):
+ raise RuntimeError(
+ "Use Semaphore like 'with (yield semaphore)', not like"
+ " 'with semaphore'")
+
+ __exit__ = __enter__
+
+
+class BoundedSemaphore(Semaphore):
+ """A semaphore that prevents release() being called too often.
+
+ A bounded semaphore checks to make sure its current value doesn't exceed
+ its initial value. If it does, ``ValueError`` is raised. In most
+ situations semaphores are used to guard resources with limited capacity.
+ If the semaphore is released too many times it's a sign of a bug.
+
+ If not given, *value* defaults to 1.
+
+ .. seealso:: :doc:`examples/web_spider_example`
+ """
+ def __init__(self, value=1, io_loop=None):
+ super(BoundedSemaphore, self).__init__(value=value, io_loop=io_loop)
+ self._initial_value = value
+
+ def release(self):
+ if self.counter >= self._initial_value:
+ raise ValueError("Semaphore released too many times")
+ return super(BoundedSemaphore, self).release()
+
+
+class Lock(object):
+ """A lock for coroutines.
+
+ It is created unlocked. When unlocked, :meth:`acquire` changes the state
+ to locked. When the state is locked, yielding :meth:`acquire` waits until
+ a call to :meth:`release`.
+
+ The :meth:`release` method should only be called in the locked state;
+ an attempt to release an unlocked lock raises RuntimeError.
+
+ When more than one coroutine is waiting for the lock, the first one
+ registered is awakened by :meth:`release`.
+
+ :meth:`acquire` supports the context manager protocol:
+
+ >>> from tornado import gen
+ >>> import toro
+ >>> lock = toro.Lock()
+ >>>
+ >>> @gen.coroutine
+ ... def f():
+ ... with (yield lock.acquire()):
+ ... assert lock.locked()
+ ...
+ ... assert not lock.locked()
+
+ .. note:: Unlike with the standard threading.Lock_, code in a
+ single-threaded Tornado application can check if a :class:`Lock`
+ is :meth:`locked`, and act on that information without fear that another
+ thread has grabbed the lock, provided you do not yield to the IOLoop
+ between checking :meth:`locked` and using a protected resource.
+
+ .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects
+
+ .. seealso:: :doc:`examples/lock_example`
+
+ :Parameters:
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def __init__(self, io_loop=None):
+ self._block = BoundedSemaphore(value=1, io_loop=io_loop)
+
+ def __str__(self):
+ return "<%s _block=%s>" % (
+ self.__class__.__name__,
+ self._block)
+
+ def acquire(self, deadline=None):
+ """Attempt to lock. Returns a Future.
+
+ The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for a
+ deadline relative to the current time.
+ """
+ return self._block.acquire(deadline)
+
+ def release(self):
+ """Unlock.
+
+ If any coroutines are waiting for :meth:`acquire`,
+ the first in line is awakened.
+
+ If not locked, raise a RuntimeError.
+ """
+ if not self.locked():
+ raise RuntimeError('release unlocked lock')
+ self._block.release()
+
+ def locked(self):
+ """``True`` if the lock has been acquired"""
+ return self._block.locked()
+
+ def __enter__(self):
+ raise RuntimeError(
+ "Use Lock like 'with (yield lock)', not like"
+ " 'with lock'")
+
+ __exit__ = __enter__
+
+
+class RWLock(object):
+ """A reader-writer lock for coroutines.
+
+ It is created unlocked. When unlocked, :meth:`acquire_write` always changes
+ the state to locked. When unlocked, :meth:`acquire_read` can changed the
+ state to locked, if :meth:`acquire_read` was called max_readers times. When
+ the state is locked, yielding :meth:`acquire_read`/meth:`acquire_write`
+ waits until a call to :meth:`release_write` in case of locking on write, or
+ :meth:`release_read` in case of locking on read.
+
+ The :meth:`release_read` method should only be called in the locked-on-read
+ state; an attempt to release an unlocked lock raises RuntimeError.
+
+ The :meth:`release_write` method should only be called in the locked on
+ write state; an attempt to release an unlocked lock raises RuntimeError.
+
+ When more than one coroutine is waiting for the lock, the first one
+ registered is awakened by :meth:`release_read`/:meth:`release_write`.
+
+ :meth:`acquire_read`/:meth:`acquire_write` support the context manager
+ protocol:
+
+ >>> from tornado import gen
+ >>> import toro
+ >>> lock = toro.RWLock(max_readers=10)
+ >>>
+ >>> @gen.coroutine
+ ... def f():
+ ... with (yield lock.acquire_read()):
+ ... assert not lock.locked()
+ ...
+ ... with (yield lock.acquire_write()):
+ ... assert lock.locked()
+ ...
+ ... assert not lock.locked()
+
+ .. note:: Unlike with the standard threading.Lock_, code in a
+ single-threaded Tornado application can check if a :class:`RWLock`
+ is :meth:`locked`, and act on that information without fear that another
+ thread has grabbed the lock, provided you do not yield to the IOLoop
+ between checking :meth:`locked` and using a protected resource.
+
+ .. _threading.Lock: http://docs.python.org/2/library/threading.html#lock-objects
+
+ :Parameters:
+ - `max_readers`: Optional max readers value, default 1.
+ - `io_loop`: Optional custom IOLoop.
+ """
+ def __init__(self, max_readers=1, io_loop=None):
+ self._max_readers = max_readers
+ self._block = BoundedSemaphore(value=max_readers, io_loop=io_loop)
+
+ def __str__(self):
+ return "<%s _block=%s>" % (
+ self.__class__.__name__,
+ self._block)
+
+ def acquire_read(self, deadline=None):
+ """Attempt to lock for read. Returns a Future.
+
+ The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+ a deadline relative to the current time.
+ """
+ return self._block.acquire(deadline)
+
+ @gen.coroutine
+ def acquire_write(self, deadline=None):
+ """Attempt to lock for write. Returns a Future.
+
+ The Future raises :exc:`toro.Timeout` if the deadline passes.
+
+ :Parameters:
+ - `deadline`: Optional timeout, either an absolute timestamp
+ (as returned by ``io_loop.time()``) or a ``datetime.timedelta`` for
+ a deadline relative to the current time.
+ """
+ futures = [self._block.acquire(deadline) for _ in
+ xrange(self._max_readers)]
+ try:
+ managers = yield futures
+ except Timeout:
+ for f in futures:
+ # Avoid traceback logging.
+ f.exception()
+ raise
+
+ raise gen.Return(_ContextManagerList(managers))
+
+ def release_read(self):
+ """Releases one reader.
+
+ If any coroutines are waiting for :meth:`acquire_read` (in case of full
+ readers queue), the first in line is awakened.
+
+ If not locked, raise a RuntimeError.
+ """
+ if not self.locked():
+ raise RuntimeError('release unlocked lock')
+ self._block.release()
+
+ def release_write(self):
+ """Releases after write.
+
+ The first in queue will be awakened after release.
+
+ If not locked, raise a RuntimeError.
+ """
+ if not self.locked():
+ raise RuntimeError('release unlocked lock')
+ for i in xrange(self._max_readers):
+ self._block.release()
+
+ def locked(self):
+ """``True`` if the lock has been acquired"""
+ return self._block.locked()
+
+ def __enter__(self):
+ raise RuntimeError(
+ "Use RWLock like 'with (yield lock)', not like"
+ " 'with lock'")
+
+ __exit__ = __enter__
diff --git a/translation.py b/translation.py
index 257d63c..479effd 100644
--- a/translation.py
+++ b/translation.py
@@ -2,8 +2,88 @@ import re, os, tempfile
from subprocess import Popen, PIPE
from tornado import gen
import tornado.process, tornado.iostream
+try: # >=4.2
+ import tornado.locks as locks
+except ImportError:
+ import toro as locks
import logging
from select import PIPE_BUF
+from contextlib import contextmanager
+from collections import namedtuple
+from time import time
+
+class Pipeline(object):
+ def __init__(self):
+ # The lock is needed so we don't let two coroutines write
+ # simultaneously to a pipeline; then the first call to read might
+ # read translations of text put there by the second call …
+ self.lock = locks.Lock()
+ # The users count is how many requests have picked this
+ # pipeline for translation. If this is 0, we can safely shut
+ # down the pipeline.
+ self.users = 0
+ self.lastUsage = 0
+ self.useCount = 0
+
+ @contextmanager
+ def use(self):
+ self.lastUsage = time()
+ self.users += 1
+ try:
+ yield
+ finally:
+ self.users -= 1
+ self.lastUsage = time()
+ self.useCount += 1
+
+ def __lt__(self, other):
+ return self.users < other.users
+
+ @gen.coroutine
+ def translate(self, _):
+ raise Exception("Not implemented, subclass me!")
+
+class FlushingPipeline(Pipeline):
+ def __init__(self, commands, *args, **kwargs):
+ self.inpipe, self.outpipe = startPipeline(commands)
+ super().__init__(*args, **kwargs)
+
+ def __del__(self):
+ logging.debug("shutting down FlushingPipeline that was used %d times", self.useCount)
+ self.inpipe.stdin.close()
+ self.inpipe.stdout.close()
+ # TODO: It seems the process immediately becomes <defunct>,
+ # but only completely removed after a second request to the
+ # server – why?
+
+ @gen.coroutine
+ def translate(self, toTranslate):
+ with self.use():
+ all_split = splitForTranslation(toTranslate, n_users=self.users)
+ parts = yield [translateNULFlush(part, self) for part in all_split]
+ return "".join(parts)
+
+class SimplePipeline(Pipeline):
+ def __init__(self, commands, *args, **kwargs):
+ self.commands = commands
+ super().__init__(*args, **kwargs)
+
+ @gen.coroutine
+ def translate(self, toTranslate):
+ with self.use():
+ with (yield self.lock.acquire()):
+ res = yield translateSimple(toTranslate, self.commands)
+ return res
+
+
+ParsedModes = namedtuple('ParsedModes', 'do_flush commands')
+
+def makePipeline(modes_parsed):
+ if modes_parsed.do_flush:
+ return FlushingPipeline(modes_parsed.commands)
+ else:
+ return SimplePipeline(modes_parsed.commands)
+
def startPipeline(commands):
procs = []
@@ -19,9 +99,9 @@ def startPipeline(commands):
procs.append(tornado.process.Subprocess(cmd,
stdin=in_from,
stdout=out_from))
-
return procs[0], procs[-1]
+
def parseModeFile(mode_path):
mode_str = open(mode_path, 'r').read().strip()
if mode_str:
@@ -41,12 +121,12 @@ def parseModeFile(mode_path):
commands = []
for cmd in mode_str.strip().split('|'):
cmd = cmd.replace('$2', '').replace('$1', '-g')
- cmd = re.sub('^(\S*)', '\g<1> -z', cmd)
+ cmd = re.sub(r'^(\S*)', r'\g<1> -z', cmd)
commands.append(cmd.split())
- return do_flush, commands
+ return ParsedModes(do_flush, commands)
else:
- logging.error('Could not parse mode file %s' % mode_path)
- raise Exception('Could not parse mode file %s' % mode_path)
+ logging.error('Could not parse mode file %s', mode_path)
+ raise Exception('Could not parse mode file %s', mode_path)
def upToBytes(string, max_bytes):
@@ -66,7 +146,7 @@ def upToBytes(string, max_bytes):
l -= 1
return 0
-def hardbreakFn(string, rush_hour):
+def hardbreakFn(string, n_users):
"""If others are queueing up to translate at the same time, we send
short requests, otherwise we try to minimise the number of
requests, but without letting buffers fill up.
@@ -74,30 +154,35 @@ def hardbreakFn(string, rush_hour):
These numbers could probably be tweaked a lot.
"""
- if rush_hour:
+ if n_users > 2:
return 1000
else:
return upToBytes(string, PIPE_BUF)
def preferPunctBreak(string, last, hardbreak):
"""We would prefer to split on a period or space seen before the
- hardbreak, if we can.
+ hardbreak, if we can. If the remaining string is smaller or equal
+ than the hardbreak, return end of the string
"""
+
+ if(len(string[last:])<= hardbreak):
+ return last+hardbreak+1
+
softbreak = int(hardbreak/2)+1
softnext = last + softbreak
hardnext = last + hardbreak
dot = string.rfind(".", softnext, hardnext)
if dot>-1:
- return dot
+ return dot+1
else:
space = string.rfind(" ", softnext, hardnext)
if space>-1:
- return space
+ return space+1
else:
return hardnext
-def splitForTranslation(toTranslate, rush_hour):
+def splitForTranslation(toTranslate, n_users):
"""Splitting it up a bit ensures we don't fill up FIFO buffers (leads
to processes hanging on read/write)."""
allSplit = [] # [].append and join faster than str +=
@@ -105,16 +190,18 @@ def splitForTranslation(toTranslate, rush_hour):
rounds=0
while last < len(toTranslate) and rounds<10:
rounds+=1
- hardbreak = hardbreakFn(toTranslate[last:], rush_hour)
+ hardbreak = hardbreakFn(toTranslate[last:], n_users)
next = preferPunctBreak(toTranslate, last, hardbreak)
allSplit.append(toTranslate[last:next])
+ #logging.getLogger().setLevel(logging.DEBUG)
+ logging.debug("splitForTranslation: last:%s hardbreak:%s next:%s appending:%s"%(last,hardbreak,next,toTranslate[last:next]))
last = next
return allSplit
@gen.coroutine
-def translateNULFlush(toTranslate, lock, pipeline):
- with (yield lock.acquire()):
- proc_in, proc_out = pipeline
+def translateNULFlush(toTranslate, pipeline):
+ with (yield pipeline.lock.acquire()):
+ proc_in, proc_out = pipeline.inpipe, pipeline.outpipe
proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE)
proc_deformat.stdin.write(bytes(toTranslate, 'utf-8'))
@@ -125,14 +212,14 @@ def translateNULFlush(toTranslate, lock, pipeline):
# TODO: PipeIOStream has no flush, but seems to work anyway?
#proc_in.stdin.flush()
- output = yield proc_out.stdout.read_until(bytes('\0', 'utf-8'))
+ output = yield gen.Task(proc_out.stdout.read_until, bytes('\0', 'utf-8'))
proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE)
proc_reformat.stdin.write(output)
return proc_reformat.communicate()[0].decode('utf-8')
-def translateWithoutFlush(toTranslate, lock, pipeline):
+def translateWithoutFlush(toTranslate, proc_in, proc_out):
proc_deformat = Popen("apertium-deshtml", stdin=PIPE, stdout=PIPE)
proc_deformat.stdin.write(bytes(toTranslate, 'utf-8'))
deformatted = proc_deformat.communicate()[0]
@@ -164,8 +251,8 @@ def translatePipeline(toTranslate, commands):
output.append(toTranslate)
output.append(towrite.decode('utf-8'))
- pipeline = []
- pipeline.append("apertium-deshtml")
+ all_cmds = []
+ all_cmds.append("apertium-deshtml")
for cmd in commands:
proc = Popen(cmd, stdin=PIPE, stdout=PIPE)
@@ -173,40 +260,29 @@ def translatePipeline(toTranslate, commands):
towrite = proc.communicate()[0]
output.append(towrite.decode('utf-8'))
- pipeline.append(cmd)
+ all_cmds.append(cmd)
proc_reformat = Popen("apertium-rehtml-noent", stdin=PIPE, stdout=PIPE)
proc_reformat.stdin.write(towrite)
towrite = proc_reformat.communicate()[0].decode('utf-8')
output.append(towrite)
- pipeline.append("apertium-rehtml-noent")
+ all_cmds.append("apertium-rehtml-noent")
- return output, pipeline
+ return output, all_cmds
@gen.coroutine
def translateSimple(toTranslate, commands):
proc_in, proc_out = startPipeline(commands)
- assert(proc_in==proc_out)
- yield proc_in.stdin.write(bytes(toTranslate, 'utf-8'))
+ assert proc_in == proc_out
+ yield gen.Task(proc_in.stdin.write, bytes(toTranslate, 'utf-8'))
proc_in.stdin.close()
- translated = yield proc_out.stdout.read_until_close()
+ translated = yield gen.Task(proc_out.stdout.read_until_close)
proc_in.stdout.close()
return translated.decode('utf-8')
-def translateDoc(fileToTranslate, format, modeFile):
- modesdir=os.path.dirname(os.path.dirname(modeFile))
- mode=os.path.splitext(os.path.basename(modeFile))[0]
- return Popen(['apertium', '-f', format, '-d', modesdir, mode],
- stdin=fileToTranslate, stdout=PIPE).communicate()[0]
-
- at gen.coroutine
-def translate(toTranslate, lock, pipeline, commands):
- if pipeline:
- allSplit = splitForTranslation(toTranslate, rush_hour = lock.locked())
- parts = yield [translateNULFlush(part, lock, pipeline) for part in allSplit]
- return "".join(parts)
- else:
- with (yield lock.acquire()):
- res = yield translateSimple(toTranslate, commands)
- return res
+def translateDoc(fileToTranslate, fmt, modeFile):
+ modesdir = os.path.dirname(os.path.dirname(modeFile))
+ mode = os.path.splitext(os.path.basename(modeFile))[0]
+ return Popen(['apertium', '-f', fmt, '-d', modesdir, mode],
+ stdin=fileToTranslate, stdout=PIPE).communicate()[0]
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/apertium-apy.git
More information about the debian-science-commits
mailing list