[Pkg-bitcoin-commits] [python-quamash] 65/269: Fixes to tests, mostly doctests.
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:16 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit c86693c00e108f17beff73acdca486c5ef266733
Author: Mark Harviston <mark.harviston at gmail.com>
Date: Thu Jul 10 21:18:39 2014 -0700
Fixes to tests, mostly doctests.
doctests now actually work, before they did not.
test_qeventloop, removed totally superfluous class, replaced with top-level testing functions (personal preference)
---
pytest.ini | 2 ++
quamash/__init__.py | 66 ++++++++++++++++++++++++++-------------
tests/test_qeventloop.py | 81 ++++++++++++++++++++++++------------------------
3 files changed, 87 insertions(+), 62 deletions(-)
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..e38c40a
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,2 @@
+[pytest]
+addopts=--doctest-modules quamash tests
diff --git a/quamash/__init__.py b/quamash/__init__.py
index 98dc82d..3c255ca 100644
--- a/quamash/__init__.py
+++ b/quamash/__init__.py
@@ -101,11 +101,12 @@ class QThreadExecutor(QtCore.QObject):
"""
ThreadExecutor that produces QThreads
Same API as `concurrent.futures.Executor`
-
+
+ >>> from quamash import QThreadExecutor
>>> with QThreadExecutor(5) as executor:
- >>> f = executor.submit(lambda x: 2 + x, x)
- >>> r = f.result()
- >>> assert r == 4
+ ... f = executor.submit(lambda x: 2 + x, 2)
+ ... r = f.result()
+ ... assert r == 4
"""
def __init__(self, max_workers=10, parent=None):
super().__init__(parent)
@@ -147,21 +148,38 @@ def _easycallback(fn):
Remember: only objects that inherit from QObject can support signals/slots
+ >>> try: from PyQt5.QtCore import QThread, QObject
+ ... except ImportError: from PySide.QtCore import QThread, QObject
+ >>>
+ >>> try: from PyQt5.QtWidgets import QApplication
+ ... except ImportError: from PySide.QtGui import QApplication
+ >>>
+ >>> app = QApplication([])
+ >>>
+ >>> from quamash import QEventLoop, _easycallback
+ >>> import asyncio
+ >>>
+ >>> global_thread = QThread.currentThread()
>>> class MyObject(QObject):
- >>> @_easycallback
- >>> def mycallback(self):
- >>> dostuff()
+ ... @_easycallback
+ ... def mycallback(self):
+ ... global global_thread, mythread
+ ... cur_thread = QThread.currentThread()
+ ... assert cur_thread is not global_thread
+ ... assert cur_thread is mythread
>>>
+ >>> mythread = QThread()
+ >>> mythread.start()
>>> myobject = MyObject()
+ >>> myobject.moveToThread(mythread)
>>>
- >>> @task
- >>> def mytask():
- >>> myobject.mycallback()
+ >>> @asyncio.coroutine
+ ... def mycoroutine():
+ ... myobject.mycallback()
>>>
- >>> loop = QEventLoop()
+ >>> loop = QEventLoop(app)
>>> with loop:
- >>> loop.call_soon(mytask)
- >>> loop.run_forever()
+ ... loop.run_until_complete(mycoroutine())
"""
@wraps(fn)
def in_wrapper(self, *args, **kwargs):
@@ -189,15 +207,20 @@ else:
class QEventLoop(QtCore.QObject, _baseclass):
"""
Implementation of asyncio event loop that uses the Qt Event loop
- >>> @quamash.task
- >>> def my_task(x):
- >>> return x + 2
+ >>> import quamash, asyncio
+ >>> try: from PyQt5.QtWidgets import QApplication
+ ... except ImportError: from PySide.QtCore import QApplication
>>>
- >>> app = QApplication()
- >>> with QEventLoop(app) as loop:
- >>> y = loop.call_soon(my_task)
+ >>> app = QApplication([])
>>>
- >>> assert y == 4
+ >>> @asyncio.coroutine
+ ... def xplusy(x, y):
+ ... yield from asyncio.sleep(2)
+ ... assert x + y == 4
+ ... yield from asyncio.sleep(2)
+ >>>
+ >>> with QEventLoop(app) as loop:
+ ... loop.run_until_complete(xplusy(2,2))
"""
def __init__(self, app):
self.__timers = []
@@ -492,6 +515,8 @@ class QEventLoop(QtCore.QObject, _baseclass):
except:
sys.stderr.write('{}, {}\n'.format(args, kwds))
+ def remove(self, timer):
+ self.__timers.remove(timer)
class _Cancellable:
def __init__(self, timer, loop):
@@ -499,5 +524,4 @@ class _Cancellable:
self.__loop = loop
def cancel(self):
- self.__loop.remove(timer)
self.__timer.stop()
diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py
index 6d3a2ab..2f2a70b 100644
--- a/tests/test_qeventloop.py
+++ b/tests/test_qeventloop.py
@@ -52,45 +52,44 @@ def loop(request, application):
return lp
-class TestQEventLoop:
- def test_can_run_tasks_in_default_executor(self, loop):
- """Verify that tasks can be run in default (threaded) executor."""
- def blocking_func():
- nonlocal was_invoked
- was_invoked = True
-
- @asyncio.coroutine
- def blocking_task():
- yield from loop.run_in_executor(None, blocking_func)
-
- was_invoked = False
- loop.run_until_complete(blocking_task())
-
- assert was_invoked
-
- def test_can_execute_subprocess(self, loop):
- """Verify that a subprocess can be executed."""
- transport, protocol = loop.run_until_complete(loop.subprocess_exec(
- _SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))
+def test_can_run_tasks_in_default_executor(loop):
+ """Verify that tasks can be run in default (threaded) executor."""
+ def blocking_func():
+ nonlocal was_invoked
+ was_invoked = True
+
+ @asyncio.coroutine
+ def blocking_task():
+ yield from loop.run_in_executor(None, blocking_func)
+
+ was_invoked = False
+ loop.run_until_complete(blocking_task())
+
+ assert was_invoked
+
+def test_can_execute_subprocess(loop):
+ """Verify that a subprocess can be executed."""
+ transport, protocol = loop.run_until_complete(loop.subprocess_exec(
+ _SubprocessProtocol, sys.executable or 'python', '-c', 'print(\'Hello async world!\')'))
+ loop.run_forever()
+ assert transport.get_returncode() == 0
+ assert protocol.received_stdout == 'Hello async world!'
+
+def test_can_terminate_subprocess(loop):
+ """Verify that a subprocess can be terminated."""
+ # Start a never-ending process
+ transport = loop.run_until_complete(loop.subprocess_exec(
+ _SubprocessProtocol, sys.executable or 'python', '-c', 'import time\nwhile True: time.sleep(1)'))[0]
+ # Terminate!
+ transport.kill()
+ # Wait for process to die
+ loop.run_forever()
+
+ assert transport.get_returncode() != 0
+
+def test_can_function_as_context_manager(application):
+ """Verify that a QEventLoop can function as its own context manager."""
+ with quamash.QEventLoop(application) as loop:
+ assert isinstance(loop, quamash.QEventLoop)
+ loop.call_soon(loop.stop)
loop.run_forever()
- assert transport.get_returncode() == 0
- assert protocol.received_stdout == 'Hello async world!'
-
- def test_can_terminate_subprocess(self, loop):
- """Verify that a subprocess can be terminated."""
- # Start a never-ending process
- transport = loop.run_until_complete(loop.subprocess_exec(
- _SubprocessProtocol, 'python', '-c', 'import time\nwhile True: time.sleep(1)'))[0]
- # Terminate!
- transport.kill()
- # Wait for process to die
- loop.run_forever()
-
- assert transport.get_returncode() != 0
-
- def test_can_function_as_context_manager(self, application):
- """Verify that a QEventLoop can function as its own context manager."""
- with quamash.QEventLoop(application) as loop:
- assert isinstance(loop, quamash.QEventLoop)
- loop.call_soon(loop.stop)
- loop.run_forever()
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list