[pytango] 445/483: First try at gevent friendly server
Sandor Bodo-Merle
sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:15:10 UTC 2017
This is an automated email from the git hooks/post-receive script.
sbodomerle-guest pushed a commit to annotated tag bliss_8.10
in repository pytango.
commit 0a98664babc773872c8441f35d42eedabdc7332b
Author: coutinho <coutinho at esrf.fr>
Date: Mon Mar 2 15:22:31 2015 +0100
First try at gevent friendly server
---
src/boost/python/server.py | 222 ++++++++++++++++++++++++++++-----------------
1 file changed, 139 insertions(+), 83 deletions(-)
diff --git a/src/boost/python/server.py b/src/boost/python/server.py
index 4d424d4..920ce29 100644
--- a/src/boost/python/server.py
+++ b/src/boost/python/server.py
@@ -17,7 +17,7 @@ from __future__ import absolute_import
__all__ = ["DeviceMeta", "Device", "LatestDeviceImpl", "attribute",
"command", "device_property", "class_property",
- "run", "server_run", "Server"]
+ "run", "server_run", "Server", "green_mode", "get_gevent_worker"]
import os
import sys
@@ -190,26 +190,44 @@ def check_dev_klass_attr_read_method(tango_device_klass, attribute):
read_args = inspect.getargspec(read_method)
+ try:
+ green_mode = read_method._pytango_green_mode
+ except AttributeError:
+ green_mode = True
+
if len(read_args.args) < 2:
- @functools.wraps(read_method)
- def read_attr(self, attr):
- runner = _get_runner()
- if runner:
- ret = runner.execute(read_method, self)
- else:
+ if green_mode:
+ @functools.wraps(read_method)
+ def read_attr(self, attr):
+ worker = get_gevent_worker()
+ if worker:
+ ret = worker.execute(read_method, self)
+ else:
+ ret = read_method(self)
+ if not attr.get_value_flag() and ret is not None:
+ set_complex_value(attr, ret)
+ return ret
+ else:
+ @functools.wraps(read_method)
+ def read_attr(self, attr):
ret = read_method(self)
- if not attr.get_value_flag() and ret is not None:
- set_complex_value(attr, ret)
- return ret
+ if not attr.get_value_flag() and ret is not None:
+ set_complex_value(attr, ret)
+ return ret
else:
- @functools.wraps(read_method)
- def read_attr(self, attr):
- runner = _get_runner()
- if runner:
- ret = runner.execute(read_method, self, attr)
- else:
- ret = read_method(self, attr)
- return ret
+ if green_mode:
+ @functools.wraps(read_method)
+ def read_attr(self, attr):
+ worker = get_gevent_worker()
+ if worker:
+ ret = worker.execute(read_method, self, attr)
+ else:
+ ret = read_method(self, attr)
+ return ret
+ else:
+ read_attr = read_method
+
+ read_attr._pytango_green_mode = green_mode
method_name = "__read_{0}_wrapper__".format(attribute.attr_name)
attribute.read_method_name = method_name
@@ -238,15 +256,29 @@ def check_dev_klass_attr_write_method(tango_device_klass, attribute):
method_name = attribute.write_method_name
write_method = getattr(tango_device_klass, method_name)
- @functools.wraps(write_method)
- def write_attr(self, attr):
- value = attr.get_write_value()
- runner = _get_runner()
- if runner:
- ret = runner.execute(write_method, self, value)
- else:
- ret = write_method(self, value)
- return ret
+ try:
+ green_mode = write_method._pytango_gevent_mode
+ except AttributeError:
+ green_mode = True
+
+ if green_mode:
+ @functools.wraps(write_method)
+ def write_attr(self, attr):
+ value = attr.get_write_value()
+ worker = get_gevent_worker()
+ if worker:
+ ret = worker.execute(write_method, self, value)
+ else:
+ ret = write_method(self, value)
+ return ret
+ else:
+ @functools.wraps(write_method)
+ def write_attr(self, attr):
+ value = attr.get_write_value()
+ return write_method(self, value)
+
+ write_attr._pytango_green_mode = green_mode
+
setattr(tango_device_klass, method_name, write_attr)
@@ -279,9 +311,9 @@ class _DeviceClass(DeviceClass):
self.set_type(name)
def _new_device(self, klass, dev_class, dev_name):
- runner = _get_runner()
- if runner:
- return runner.execute(DeviceClass._new_device, self,
+ worker = get_gevent_worker()
+ if worker:
+ return worker.execute(DeviceClass._new_device, self,
klass, dev_class, dev_name)
else:
return DeviceClass._new_device(self, klass, dev_class,
@@ -673,15 +705,25 @@ def command(f=None, dtype_in=None, dformat_in=None, doc_in="",
din = [from_typeformat_to_type(dtype_in, dformat_in), doc_in]
dout = [from_typeformat_to_type(dtype_out, dformat_out), doc_out]
- @functools.wraps(f)
- def cmd(self, *args, **kwargs):
- runner = _get_runner()
- if runner:
- ret = runner.execute(f, self, *args, **kwargs)
- else:
- ret = f(self, *args, **kwargs)
- return ret
+ try:
+ green_mode = f._pytango_green_mode
+ except AttributeError:
+ green_mode = True
+
+ if green_mode:
+ @functools.wraps(f)
+ def cmd(self, *args, **kwargs):
+ worker = get_gevent_worker()
+ if worker:
+ ret = worker.execute(f, self, *args, **kwargs)
+ else:
+ ret = f(self, *args, **kwargs)
+ return ret
+ else:
+ print("found non green command", f)
+ cmd = f
cmd.__tango_command__ = name, [din, dout]
+ cmd._pytango_green_mode = green_mode
return cmd
@@ -842,35 +884,35 @@ def __server_run(classes, args=None, msg_stream=sys.stdout, util=None,
util = PyTango.Util(args)
u_instance = PyTango.Util.instance()
- if gevent_mode:
- runner = _create_runner()
- if event_loop:
- event_loop = functools.partial(runner.execute, event_loop)
+ if gevent_mode and event_loop:
+ gevent_worker = _create_gevent_worker()
+ event_loop = functools.partial(gevent_worker.execute, event_loop)
if event_loop is not None:
u_instance.server_set_event_loop(event_loop)
log = logging.getLogger("PyTango")
- def tango_loop(runner=None):
+ def tango_loop(worker=None):
+ log.debug("Tango loop started")
_add_classes(util, classes)
u_instance.server_init()
- if runner:
- runner.execute(post_init_callback)
+ if worker:
+ worker.execute(post_init_callback)
else:
post_init_callback()
write("Ready to accept request\n")
u_instance.server_run()
- if runner:
- runner.stop()
+ if worker:
+ worker.stop()
log.debug("Tango loop exit")
if gevent_mode:
- runner = _create_runner()
- start_new_thread = runner._threading.start_new_thread
- tango_thread_id = start_new_thread(tango_loop, (runner,))
- runner.run()
- log.debug("Runner finished")
+ gevent_worker = _create_gevent_worker()
+ start_new_thread = gevent_worker._threading.start_new_thread
+ tango_thread_id = start_new_thread(tango_loop, (gevent_worker,))
+ gevent_worker.run()
+ log.debug("Gevent_worker finished")
else:
tango_loop()
@@ -1051,15 +1093,29 @@ def server_run(classes, args=None, msg_stream=sys.stdout,
green_mode=green_mode)
-__RUNNER = None
+def green_mode(f=None, enable=True):
+ """
+ Decorator to force the method to run in the tango thread instead of the
+ green thread.
+ """
+ if f is None:
+ return functools.partial(green_mode, enable=enable)
+
+ f._pytango_green_mode = enable
+ print("Set %s green mode to %s" % (f, enable))
+ return f
+
+
+__GEVENT_WORKER = None
-def _get_runner():
- return __RUNNER
+def get_gevent_worker():
+ global __GEVENT_WORKER
+ return __GEVENT_WORKER
-def _create_runner():
- global __RUNNER
- if __RUNNER:
- return __RUNNER
+def _create_gevent_worker():
+ global __GEVENT_WORKER
+ if __GEVENT_WORKER:
+ return __GEVENT_WORKER
try:
from queue import Queue
@@ -1069,7 +1125,7 @@ def _create_runner():
import gevent
import gevent.event
- class Runner:
+ class GeventWorker:
from gevent import _threading
@@ -1124,8 +1180,8 @@ def _create_runner():
self.__tasks.put(task)
self.__watcher.send()
- __RUNNER = Runner()
- return __RUNNER
+ __GEVENT_WORKER = GeventWorker()
+ return __GEVENT_WORKER
_CLEAN_UP_TEMPLATE = """
@@ -1235,8 +1291,8 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
def _command(dev, func_name=None):
obj = dev._object
f = getattr(obj, func_name)
- if server.runner:
- result = server.runner.execute(f)
+ if server.gevent_worker:
+ result = server.gevent_worker.execute(f)
else:
result = f()
return server.dumps(result)
@@ -1245,8 +1301,8 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
obj = dev._object
args, kwargs = loads(*param)
f = getattr(obj, func_name)
- if server.runner:
- result = server.runner.execute(f, *args, **kwargs)
+ if server.gevent_worker:
+ result = server.gevent_worker.execute(f, *args, **kwargs)
else:
result = f(*args, **kwargs)
return server.dumps(result)
@@ -1276,8 +1332,8 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
fmt = AttrDataFormat.SCALAR
def read(dev, attr):
name = attr.get_name()
- if server.runner:
- value = server.runner.execute(getattr, dev._object, name)
+ if server.gevent_worker:
+ value = server.gevent_worker.execute(getattr, dev._object, name)
else:
value = getattr(dev._object, name)
attr.set_value(*server.dumps(value))
@@ -1285,23 +1341,23 @@ def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
name = attr.get_name()
value = attr.get_write_value()
value = loads(*value)
- if server.runner:
- server.runner.execute(setattr, dev._object, name, value)
+ if server.gevent_worker:
+ server.gevent_worker.execute(setattr, dev._object, name, value)
else:
setattr(dev._object, name, value)
else:
def read(dev, attr):
name = attr.get_name()
- if server.runner:
- value = server.runner.execute(getattr, dev._object, name)
+ if server.gevent_worker:
+ value = server.gevent_worker.execute(getattr, dev._object, name)
else:
value = getattr(dev._object, name)
attr.set_value(value)
def write(dev, attr):
name = attr.get_name()
value = attr.get_write_value()
- if server.runner:
- server.runner.execute(setattr, dev._object, name, value)
+ if server.gevent_worker:
+ server.gevent_worker.execute(setattr, dev._object, name, value)
else:
setattr(dev._object, name, value)
read.__name__ = "_read_" + name
@@ -1382,9 +1438,9 @@ class Server:
self.__tango_classes = _to_classes(tango_classes or [])
self.__tango_devices = []
if self.gevent_mode:
- self.__runner = _create_runner()
+ self.__worker = _create_gevent_worker()
else:
- self.__runner = None
+ self.__worker = None
self.log = logging.getLogger("PyTango.Server")
self.__phase = Server.Phase0
@@ -1399,7 +1455,7 @@ class Server:
if not cb:
return
if self.gevent_mode:
- self.__runner.execute(cb)
+ self.__worker.execute(cb)
else:
cb()
@@ -1501,7 +1557,7 @@ class Server:
if gevent_mode:
if event_loop:
- event_loop = functools.partial(self.__runner.execute,
+ event_loop = functools.partial(self.__worker.execute,
event_loop)
if event_loop:
u_instance.server_set_event_loop(event_loop)
@@ -1509,12 +1565,12 @@ class Server:
_add_classes(util, self.__tango_classes)
if gevent_mode:
- start_new_thread = self.__runner._threading.start_new_thread
+ start_new_thread = self.__worker._threading.start_new_thread
tango_thread_id = start_new_thread(self.__tango_loop, ())
def __run(self, timeout=None):
if self.gevent_mode:
- return self.__runner.run(timeout=timeout)
+ return self.__worker.run(timeout=timeout)
else:
self.__tango_loop()
@@ -1527,7 +1583,7 @@ class Server:
self.log.info("Ready to accept request")
u_instance.server_run()
if self.gevent_mode:
- self.__runner.stop()
+ self.__worker.stop()
if self.__auto_clean:
self.__clean_up_process()
self.log.debug("Tango loop exit")
@@ -1584,8 +1640,8 @@ class Server:
return self.green_mode == GreenMode.Gevent
@property
- def runner(self):
- return self.__runner
+ def worker(self):
+ return self.__worker
def dumps(self, obj):
return dumps(self.__protocol, obj)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git
More information about the debian-science-commits
mailing list