[pytango] 116/122: Add unittests for tango events
Sandor Bodo-Merle
sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:18:24 UTC 2017
This is an automated email from the git hooks/post-receive script.
sbodomerle-guest pushed a commit to tag v9.2.1
in repository pytango.
commit 2f2ec27466f56efad5896d7093bc5ae6fff066bf
Author: Vincent Michel <vincent.michel at maxlab.lu.se>
Date: Wed Jan 18 18:53:29 2017 +0100
Add unittests for tango events
---
tests/test_event.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 84 insertions(+)
diff --git a/tests/test_event.py b/tests/test_event.py
new file mode 100644
index 0000000..ff2abff
--- /dev/null
+++ b/tests/test_event.py
@@ -0,0 +1,84 @@
+
+# Imports
+
+import socket
+from functools import partial
+
+import pytest
+
+from tango import EventType, GreenMode, DeviceProxy
+from tango.server import Device
+from tango.server import command, attribute
+from tango.test_utils import DeviceTestContext
+
+from tango.gevent import DeviceProxy as gevent_DeviceProxy
+from tango.futures import DeviceProxy as futures_DeviceProxy
+from tango.asyncio import DeviceProxy as asyncio_DeviceProxy
+
+# Helpers
+
+device_proxy_map = {
+ GreenMode.Synchronous: DeviceProxy,
+ GreenMode.Futures: futures_DeviceProxy,
+ GreenMode.Asyncio: partial(asyncio_DeviceProxy, wait=True),
+ GreenMode.Gevent: gevent_DeviceProxy}
+
+
+def get_open_port():
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(("", 0))
+ s.listen(1)
+ port = s.getsockname()[1]
+ s.close()
+ return port
+
+
+# Test device
+
+class EventDevice(Device):
+
+ def init_device(self):
+ self.set_change_event("attr", True, False)
+
+ @attribute
+ def attr(self):
+ return 0.
+
+ @command
+ def send_event(self):
+ self.push_change_event("attr", 1.)
+
+
+# Device fixture
+
+ at pytest.fixture(params=[GreenMode.Synchronous,
+ GreenMode.Futures,
+ GreenMode.Asyncio,
+ GreenMode.Gevent],
+ scope="module")
+def event_device(request):
+ green_mode = request.param
+ # Hack: a port have to be specified explicitely for events to work
+ port = get_open_port()
+ context = DeviceTestContext(EventDevice, port=port, process=True)
+ with context:
+ yield device_proxy_map[green_mode](context.get_device_access())
+
+
+# Tests
+
+def test_subscribe_event(event_device):
+ results = []
+
+ def callback(evt):
+ results.append(evt.attr_value.value)
+
+ event_device.subscribe_event(
+ "attr", EventType.CHANGE_EVENT, callback, wait=True)
+ event_device.command_inout("send_event", wait=True)
+ # Wait for tango event
+ event_device.read_attribute("state", wait=True)
+ event_device.read_attribute("state", wait=True)
+ event_device.read_attribute("state", wait=True)
+ # Test the event values
+ assert results == [0., 1.]
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git
More information about the debian-science-commits
mailing list