[pytango] 83/122: Move tango object server to its own module
Sandor Bodo-Merle
sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:18:21 UTC 2017
This is an automated email from the git hooks/post-receive script.
sbodomerle-guest pushed a commit to tag v9.2.1
in repository pytango.
commit 565d9399a69523c7794906c777d1ffa87830f666
Author: Vincent Michel <vincent.michel at maxlab.lu.se>
Date: Mon Nov 14 16:21:57 2016 +0100
Move tango object server to its own module
---
tango/server.py | 563 +-----------------------------------------------
tango/tango_object.py | 586 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 592 insertions(+), 557 deletions(-)
diff --git a/tango/server.py b/tango/server.py
index 1e0a820..a3158de 100644
--- a/tango/server.py
+++ b/tango/server.py
@@ -15,30 +15,25 @@ from __future__ import with_statement
from __future__ import print_function
from __future__ import absolute_import
-import os
import sys
import copy
-import types
import inspect
import logging
-import weakref
import functools
import traceback
-from ._tango import (AttrDataFormat, AttrWriteType,
- DevFailed, Except, GreenMode, constants,
- Database, DbDevInfo, DevState, CmdArgType,
- PipeWriteType)
+from ._tango import AttrDataFormat, AttrWriteType, CmdArgType, PipeWriteType
+from ._tango import DevFailed, Except, GreenMode
+
from .attr_data import AttrData
from .pipe_data import PipeData
from .device_class import DeviceClass
-from .utils import (get_latest_device_class, is_seq, is_non_str_seq,
- scalar_to_array_type, TO_TANGO_TYPE)
-from .codec import loads, dumps
+from .utils import get_latest_device_class, is_seq, is_non_str_seq
+from .utils import scalar_to_array_type, TO_TANGO_TYPE
__all__ = ["DeviceMeta", "Device", "LatestDeviceImpl", "attribute",
"command", "pipe", "device_property", "class_property",
- "run", "server_run", "Server", "get_worker", "get_async_worker"]
+ "run", "server_run", "get_worker", "get_async_worker"]
API_VERSION = 2
@@ -1692,549 +1687,3 @@ def _create_asyncio_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return LoopExecutor(loop=loop)
-
-
-_CLEAN_UP_TEMPLATE = """
-import sys
-from tango import Database
-
-db = Database()
-server_instance = '{server_instance}'
-try:
- devices = db.get_device_class_list(server_instance)[::2]
- for device in devices:
- db.delete_device(device)
- try:
- db.delete_device_alias(db.get_alias(device))
- except:
- pass
-except:
- print ('Failed to cleanup!')
-"""
-
-import numpy
-
-def __to_tango_type_fmt(value):
- dfmt = AttrDataFormat.SCALAR
- value_t = type(value)
- dtype = TO_TANGO_TYPE.get(value_t)
- max_dim_x, max_dim_y = 1, 0
- if dtype is None:
- if isinstance(value, numpy.ndarray):
- dtype = TO_TANGO_TYPE.get(value.dtype.name)
- shape_l = len(value.shape)
- if shape_l == 1:
- dfmt = AttrDataFormat.SPECTRUM
- max_dim_x = max(2**16, value.shape[0])
- elif shape_l == 2:
- dfmt = AttrDataFormat.IMAGE
- max_dim_x = max(2**16, value.shape[0])
- max_dim_y = max(2**16, value.shape[1])
- else:
- dtype = CmdArgType.DevEncoded
- return dtype, dfmt, max_dim_x, max_dim_y
-
-
-def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
- slog = server.server_instance.replace("/", ".")
- log = logging.getLogger("tango.Server." + slog)
-
- obj_klass = obj.__class__
- obj_klass_name = obj_klass.__name__
-
- if tango_class_name is None:
- tango_class_name = obj_klass_name
-
- class DeviceDispatcher(Device):
- __metaclass__ = DeviceMeta
-
- TangoClassName = tango_class_name
-
- def __init__(self, tango_class_obj, name):
- tango_object = server.get_tango_object(name)
- self.__tango_object = weakref.ref(tango_object)
- Device.__init__(self, tango_class_obj, name)
-
- def init_device(self):
- Device.init_device(self)
- self.set_state(DevState.ON)
-
- @property
- def _tango_object(self):
- return self.__tango_object()
-
- @property
- def _object(self):
- return self._tango_object._object
-
-
- DeviceDispatcher.__name__ = tango_class_name
- DeviceDispatcher.TangoClassName = tango_class_name
- DeviceDispatcherClass = DeviceDispatcher.TangoClassClass
-
- for name in dir(obj):
- if name.startswith("_"):
- continue
- log.debug("inspecting %s.%s", obj_klass_name, name)
- try:
- member = getattr(obj, name)
- except:
- log.info("failed to inspect member '%s.%s'",
- obj_klass_name, name)
- log.debug("Details:", exc_info=1)
- if inspect.isclass(member) or inspect.ismodule(member):
- continue
- if member_filter and not member_filter(obj, tango_class_name,
- name, member):
- log.debug("filtered out %s from %s", name, tango_class_name)
- continue
- if inspect.isroutine(member):
- # try to find out if there are any parameters
- in_type = CmdArgType.DevEncoded
- out_type = CmdArgType.DevEncoded
- try:
- arg_spec = inspect.getargspec(member)
- if not arg_spec.args:
- in_type = CmdArgType.DevVoid
- except TypeError:
- pass
-
- if in_type == CmdArgType.DevVoid:
- def _command(dev, func_name=None):
- obj = dev._object
- f = getattr(obj, func_name)
- result = server.worker.execute(f)
- return server.dumps(result)
- else:
- def _command(dev, param, func_name=None):
- obj = dev._object
- args, kwargs = loads(*param)
- f = getattr(obj, func_name)
- result = server.worker.execute(f, *args, **kwargs)
- return server.dumps(result)
- cmd = functools.partial(_command, func_name=name)
- cmd.__name__ = name
- doc = member.__doc__
- if doc is None:
- doc = ""
- cmd.__doc__ = doc
- cmd = types.MethodType(cmd, None, DeviceDispatcher)
- setattr(DeviceDispatcher, name, cmd)
- DeviceDispatcherClass.cmd_list[name] = \
- [[in_type, doc], [out_type, ""]]
- else:
- read_only = False
- if hasattr(obj_klass, name):
- kmember = getattr(obj_klass, name)
- if inspect.isdatadescriptor(kmember):
- if kmember.fset is None:
- read_only = True
- else:
- continue
- value = member
- dtype, fmt, x, y = __to_tango_type_fmt(value)
- if dtype is None or dtype == CmdArgType.DevEncoded:
- dtype = CmdArgType.DevEncoded
- fmt = AttrDataFormat.SCALAR
- def read(dev, attr):
- name = attr.get_name()
- value = server.worker.execute(getattr, dev._object, name)
- attr.set_value(*server.dumps(value))
- def write(dev, attr):
- name = attr.get_name()
- value = attr.get_write_value()
- value = loads(*value)
- server.worker.execute(setattr, dev._object, name, value)
- else:
- def read(dev, attr):
- name = attr.get_name()
- value = server.worker.execute(getattr, dev._object, name)
- attr.set_value(value)
- def write(dev, attr):
- name = attr.get_name()
- value = attr.get_write_value()
- server.worker.execute(setattr, dev._object, name, value)
- read.__name__ = "_read_" + name
- setattr(DeviceDispatcher, read.__name__, read)
-
- pars = dict(name=name, dtype=dtype, dformat=fmt,
- max_dim_x=x, max_dim_y=y, fget=read)
- if read_only:
- write = None
- else:
- write.__name__ = "_write_" + name
- pars['fset'] = write
- setattr(DeviceDispatcher, write.__name__, write)
- attr_data = AttrData.from_dict(pars)
- DeviceDispatcherClass.attr_list[name] = attr_data
- return DeviceDispatcher
-
-
-class Server:
- """
- Server helper
- """
-
- Phase0, Phase1, Phase2 = range(3)
- PreInitPhase = Phase1
- PostInitPhase = Phase2
-
- class TangoObjectAdapter:
-
- def __init__(self, server, obj, full_name, alias=None,
- tango_class_name=None):
- self.__server = weakref.ref(server)
- self.full_name = full_name
- self.alias = alias
- self.class_name = obj.__class__.__name__
- if tango_class_name is None:
- tango_class_name = self.class_name
- self.tango_class_name = tango_class_name
- self.__object = weakref.ref(obj, self.__onObjectDeleted)
-
- def __onObjectDeleted(self, object_weak):
- self.__object = None
- server = self._server
- server.log.info("object deleted %s(%s)", self.class_name,
- self.full_name)
- server.unregister_object(self.full_name)
-
- @property
- def _server(self):
- return self.__server()
-
- @property
- def _object(self):
- obj = self.__object
- if obj is None:
- return None
- return obj()
-
- def __init__(self, server_name, server_type=None, port=None,
- event_loop_callback=None, init_callbacks=None,
- auto_clean=False, green_mode=None, tango_classes=None,
- protocol="pickle"):
- if server_name is None:
- raise ValueError("Must give a valid server name")
- self.__server_name = server_name
- self.__server_type = server_type
- self.__port = port
- self.__event_loop_callback = event_loop_callback
- if init_callbacks is None:
- init_callbacks = {}
- self.__init_callbacks = init_callbacks
- self.__util = None
- self.__objects = {}
- self.__running = False
- self.__auto_clean = auto_clean
- self.__green_mode = green_mode
- self.__protocol = protocol
- self.__tango_classes = _to_classes(tango_classes or [])
- self.__tango_devices = []
- if self.async_mode:
- self.__worker = _create_async_worker(self.green_mode)
- else:
- self.__worker = get_worker()
- set_worker(self.__worker)
- self.log = logging.getLogger("tango.Server")
- self.__phase = Server.Phase0
-
- def __build_args(self):
- args = [self.server_type, self.__server_name]
- if self.__port is not None:
- args.extend(["-ORBendPoint",
- "giop:tcp::{0}".format(self.__port)])
- return args
-
- def __exec_cb(self, cb):
- if not cb:
- return
- self.worker.execute(cb)
-
- def __find_tango_class(self, key):
- pass
-
- def __prepare(self):
- """Update database with existing devices"""
- self.log.debug("prepare")
-
- if self.__phase > 0:
- raise RuntimeError("Internal error: Can only prepare in phase 0")
-
- server_instance = self.server_instance
- db = Database()
-
- # get list of server devices if server was already registered
- server_registered = server_instance in db.get_server_list()
-
- if server_registered:
- dserver_name = "dserver/{0}".format(server_instance)
- if db.import_device(dserver_name).exported:
- import tango
- dserver = tango.DeviceProxy(dserver_name)
- try:
- dserver.ping()
- raise Exception("Server already running")
- except:
- self.log.info("Last time server was not properly "
- "shutdown!")
- db_class_map, db_device_map = self.get_devices()
- else:
- db_class_map, db_device_map = {}, {}
-
- db_devices_add = {}
-
- # all devices that are registered in database that are not registered
- # as tango objects or for which the tango class changed will be removed
- db_devices_remove = set(db_device_map) - set(self.__objects)
-
- for local_name, local_object in self.__objects.items():
- local_class_name = local_object.tango_class_name
- db_class_name = db_device_map.get(local_name)
- if db_class_name:
- if local_class_name != db_class_name:
- db_devices_remove.add(local_name)
- db_devices_add[local_name] = local_object
- else:
- db_devices_add[local_name] = local_object
-
- for device in db_devices_remove:
- db.delete_device(device)
- try:
- db.delete_device_alias(db.get_alias(device))
- except:
- pass
-
- # register devices in database
-
- # add DServer
- db_dev_info = DbDevInfo()
- db_dev_info.server = server_instance
- db_dev_info._class = "DServer"
- db_dev_info.name = "dserver/" + server_instance
-
- db_dev_infos = [db_dev_info]
- aliases = []
- for obj_name, obj in db_devices_add.items():
- db_dev_info = DbDevInfo()
- db_dev_info.server = server_instance
- db_dev_info._class = obj.tango_class_name
- db_dev_info.name = obj.full_name
- db_dev_infos.append(db_dev_info)
- if obj.alias:
- aliases.append((obj.full_name, obj.alias))
-
- db.add_server(server_instance, db_dev_infos)
-
- # add aliases
- for alias_info in aliases:
- db.put_device_alias(*alias_info)
-
- def __clean_up_process(self):
- if not self.__auto_clean:
- return
- clean_up = _CLEAN_UP_TEMPLATE.format(server_instance=self.server_instance)
- import subprocess
- res = subprocess.call([sys.executable, "-c", clean_up])
- if res:
- self.log.error("Failed to cleanup")
-
- def __initialize(self):
- self.log.debug("initialize")
- async_mode = self.async_mode
- event_loop = self.__event_loop_callback
-
- util = self.tango_util
- u_instance = util.instance()
-
- if async_mode:
- if event_loop:
- event_loop = functools.partial(self.worker.execute,
- event_loop)
- if event_loop:
- u_instance.server_set_event_loop(event_loop)
-
- _add_classes(util, self.__tango_classes)
-
- if async_mode:
- tango_thread_id = self.worker.run_in_thread(self.__tango_loop)
-
- def __run(self, timeout=None):
- if self.async_mode:
- return self.worker.run(timeout=timeout)
- else:
- self.__tango_loop()
-
- def __tango_loop(self):
- self.log.debug("server loop started")
- self.__running = True
- u_instance = self.tango_util.instance()
- u_instance.server_init()
- self._phase = Server.Phase2
- self.log.info("Ready to accept request")
- u_instance.server_run()
- self.worker.stop()
- if self.__auto_clean:
- self.__clean_up_process()
- self.log.debug("server loop exit")
-
- @property
- def _phase(self):
- return self.__phase
-
- @_phase.setter
- def _phase(self, phase):
- self.__phase = phase
- cb = self.__init_callbacks.get(phase)
- self.__exec_cb(cb)
-
- @property
- def server_type(self):
- server_type = self.__server_type
- if server_type is None:
- server_file = os.path.basename(sys.argv[0])
- server_type = os.path.splitext(server_file)[0]
- return server_type
-
- @property
- def server_instance(self):
- return "{0}/{1}".format(self.server_type, self.__server_name)
-
- @property
- def tango_util(self):
- if self.__util is None:
- import tango
- self.__util = tango.Util(self.__build_args())
- self._phase = Server.Phase1
- return self.__util
-
- @property
- def green_mode(self):
- gm = self.__green_mode
- if gm is None:
- from tango import get_green_mode
- gm = get_green_mode()
- return gm
-
- @green_mode.setter
- def green_mode(self, gm):
- if gm == self.__green_mode:
- return
- if self.__running:
- raise RuntimeError("Cannot change green mode while "
- "server is running")
- self.__green_mode = gm
-
- @property
- def async_mode(self):
- return self.green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
-
- @property
- def worker(self):
- return self.__worker
-
- def dumps(self, obj):
- return dumps(self.__protocol, obj)
-
- def get_devices(self):
- """
- Helper that retuns a dict of devices for this server.
-
- :return:
- Returns a tuple of two elements:
- - dict<tango class name : list of device names>
- - dict<device names : tango class name>
- :rtype: tuple<dict, dict>
- """
- if self.__util is None:
- import tango
- db = tango.Database()
- else:
- db = self.__util.get_database()
- server = self.server_instance
- dev_list = db.get_device_class_list(server)
- class_map, dev_map = {}, {}
- for class_name, dev_name in zip(dev_list[1::2], dev_list[::2]):
- dev_names = class_map.get(class_name)
- if dev_names is None:
- class_map[class_name] = dev_names = []
- dev_name = dev_name.lower()
- dev_names.append(dev_name)
- dev_map[dev_name] = class_name
- return class_map, dev_map
-
- def get_tango_object(self, name):
- return self.__objects.get(name.lower())
-
- def get_tango_class(self, tango_class_name):
- for klass in self.__tango_classes:
- if klass.TangoClassName == tango_class_name:
- return klass
-
- def register_tango_device(self, klass, name):
- if inspect.isclass(klass):
- if isinstance(klass, Device):
- kk, k, kname = Device.TangoClassClass, Device, Device.TangoClassName
- else:
- raise ValueError
- else:
- raise NotImplementedError
-
- def register_tango_class(self, klass):
- if self._phase > Server.Phase1:
- raise RuntimeError("Cannot add new class after phase 1 "
- "(i.e. after server_init)")
- self.__tango_classes.append(klass)
-
- def unregister_object(self, name):
- tango_object = self.__objects.pop(name.lower())
- if self._phase > Server.Phase1:
- import tango
- util = tango.Util.instance()
- if not util.is_svr_shutting_down():
- util.delete_device(tango_object.tango_class_name, name)
-
- def register_object(self, obj, name, tango_class_name=None,
- member_filter=None):
- """
- :param member_filter:
- callable(obj, tango_class_name, member_name, member) -> bool
- """
- slash_count = name.count("/")
- if slash_count == 0:
- alias = name
- full_name = "{0}/{1}".format(self.server_instance, name)
- elif slash_count == 2:
- alias = None
- full_name = name
- else:
- raise ValueError("Invalid name")
-
- class_name = tango_class_name or obj.__class__.__name__
- tango_class = self.get_tango_class(class_name)
-
- if tango_class is None:
- tango_class = create_tango_class(self, obj, class_name,
- member_filter=member_filter)
- self.register_tango_class(tango_class)
-
- tango_object = self.TangoObjectAdapter(self, obj, full_name, alias,
- tango_class_name=class_name)
- self.__objects[full_name.lower()] = tango_object
- if self._phase > Server.Phase1:
- import tango
- util = tango.Util.instance()
- util.create_device(class_name, name)
- return tango_object
-
- def run(self, timeout=None):
- self.log.debug("run")
- async_mode = self.async_mode
- running = self.__running
- if not running:
- self.__prepare()
- self.__initialize()
- else:
- if not async_mode:
- raise RuntimeError("Server is already running")
- self.__run(timeout=timeout)
diff --git a/tango/tango_object.py b/tango/tango_object.py
new file mode 100644
index 0000000..6df4f28
--- /dev/null
+++ b/tango/tango_object.py
@@ -0,0 +1,586 @@
+# ------------------------------------------------------------------------------
+# This file is part of PyTango (http://pytango.rtfd.io)
+#
+# Copyright 2006-2012 CELLS / ALBA Synchrotron, Bellaterra, Spain
+# Copyright 2013-2014 European Synchrotron Radiation Facility, Grenoble, France
+#
+# Distributed under the terms of the GNU Lesser General Public License,
+# either version 3 of the License, or (at your option) any later version.
+# See LICENSE.txt for more info.
+# ------------------------------------------------------------------------------
+
+"""Server for tango objects."""
+
+import os
+import sys
+import types
+import logging
+import weakref
+import inspect
+import functools
+
+from .codec import loads, dumps
+from .attr_data import AttrData
+from .utils import TO_TANGO_TYPE
+from ._tango import AttrDataFormat, CmdArgType, GreenMode
+from ._tango import DbDevInfo, Database, DevState, constants
+from .server import Device, DeviceMeta, _to_classes, _add_classes
+from .server import _create_async_worker, get_worker, set_worker
+
+__all__ = ['Server']
+
+
+_CLEAN_UP_TEMPLATE = """
+import sys
+from tango import Database
+
+db = Database()
+server_instance = '{server_instance}'
+try:
+ devices = db.get_device_class_list(server_instance)[::2]
+ for device in devices:
+ db.delete_device(device)
+ try:
+ db.delete_device_alias(db.get_alias(device))
+ except:
+ pass
+except:
+ print ('Failed to cleanup!')
+"""
+
+
+def __to_tango_type_fmt(value):
+ dfmt = AttrDataFormat.SCALAR
+ value_t = type(value)
+ dtype = TO_TANGO_TYPE.get(value_t)
+ max_dim_x, max_dim_y = 1, 0
+ if dtype is None:
+ if constants.NUMPY_SUPPORT:
+ import numpy
+ else:
+ numpy = None
+ if numpy and isinstance(value, numpy.ndarray):
+ dtype = TO_TANGO_TYPE.get(value.dtype.name)
+ shape_l = len(value.shape)
+ if shape_l == 1:
+ dfmt = AttrDataFormat.SPECTRUM
+ max_dim_x = max(2**16, value.shape[0])
+ elif shape_l == 2:
+ dfmt = AttrDataFormat.IMAGE
+ max_dim_x = max(2**16, value.shape[0])
+ max_dim_y = max(2**16, value.shape[1])
+ else:
+ dtype = CmdArgType.DevEncoded
+ return dtype, dfmt, max_dim_x, max_dim_y
+
+
+def create_tango_class(server, obj, tango_class_name=None, member_filter=None):
+ slog = server.server_instance.replace("/", ".")
+ log = logging.getLogger("tango.Server." + slog)
+
+ obj_klass = obj.__class__
+ obj_klass_name = obj_klass.__name__
+
+ if tango_class_name is None:
+ tango_class_name = obj_klass_name
+
+ class DeviceDispatcher(Device):
+ __metaclass__ = DeviceMeta
+
+ TangoClassName = tango_class_name
+
+ def __init__(self, tango_class_obj, name):
+ tango_object = server.get_tango_object(name)
+ self.__tango_object = weakref.ref(tango_object)
+ Device.__init__(self, tango_class_obj, name)
+
+ def init_device(self):
+ Device.init_device(self)
+ self.set_state(DevState.ON)
+
+ @property
+ def _tango_object(self):
+ return self.__tango_object()
+
+ @property
+ def _object(self):
+ return self._tango_object._object
+
+ DeviceDispatcher.__name__ = tango_class_name
+ DeviceDispatcher.TangoClassName = tango_class_name
+ DeviceDispatcherClass = DeviceDispatcher.TangoClassClass
+
+ for name in dir(obj):
+ if name.startswith("_"):
+ continue
+ log.debug("inspecting %s.%s", obj_klass_name, name)
+ try:
+ member = getattr(obj, name)
+ except:
+ log.info(
+ "failed to inspect member '%s.%s'", obj_klass_name, name)
+ log.debug("Details:", exc_info=1)
+ if inspect.isclass(member) or inspect.ismodule(member):
+ continue
+ if member_filter and not member_filter(obj, tango_class_name,
+ name, member):
+ log.debug("filtered out %s from %s", name, tango_class_name)
+ continue
+ if inspect.isroutine(member):
+ # try to find out if there are any parameters
+ in_type = CmdArgType.DevEncoded
+ out_type = CmdArgType.DevEncoded
+ try:
+ arg_spec = inspect.getargspec(member)
+ if not arg_spec.args:
+ in_type = CmdArgType.DevVoid
+ except TypeError:
+ pass
+
+ if in_type == CmdArgType.DevVoid:
+ def _command(dev, func_name=None):
+ obj = dev._object
+ f = getattr(obj, func_name)
+ result = server.worker.execute(f)
+ return server.dumps(result)
+ else:
+ def _command(dev, param, func_name=None):
+ obj = dev._object
+ args, kwargs = loads(*param)
+ f = getattr(obj, func_name)
+ result = server.worker.execute(f, *args, **kwargs)
+ return server.dumps(result)
+ cmd = functools.partial(_command, func_name=name)
+ cmd.__name__ = name
+ doc = member.__doc__
+ if doc is None:
+ doc = ""
+ cmd.__doc__ = doc
+ cmd = types.MethodType(cmd, None, DeviceDispatcher)
+ setattr(DeviceDispatcher, name, cmd)
+ DeviceDispatcherClass.cmd_list[name] = \
+ [[in_type, doc], [out_type, ""]]
+ else:
+ read_only = False
+ if hasattr(obj_klass, name):
+ kmember = getattr(obj_klass, name)
+ if inspect.isdatadescriptor(kmember):
+ if kmember.fset is None:
+ read_only = True
+ else:
+ continue
+ value = member
+ dtype, fmt, x, y = __to_tango_type_fmt(value)
+ if dtype is None or dtype == CmdArgType.DevEncoded:
+ dtype = CmdArgType.DevEncoded
+ fmt = AttrDataFormat.SCALAR
+
+ def read(dev, attr):
+ name = attr.get_name()
+ value = server.worker.execute(getattr, dev._object, name)
+ attr.set_value(*server.dumps(value))
+
+ def write(dev, attr):
+ name = attr.get_name()
+ value = attr.get_write_value()
+ value = loads(*value)
+ server.worker.execute(setattr, dev._object, name, value)
+ else:
+
+ def read(dev, attr):
+ name = attr.get_name()
+ value = server.worker.execute(getattr, dev._object, name)
+ attr.set_value(value)
+
+ def write(dev, attr):
+ name = attr.get_name()
+ value = attr.get_write_value()
+ server.worker.execute(setattr, dev._object, name, value)
+ read.__name__ = "_read_" + name
+ setattr(DeviceDispatcher, read.__name__, read)
+
+ pars = dict(name=name, dtype=dtype, dformat=fmt,
+ max_dim_x=x, max_dim_y=y, fget=read)
+ if read_only:
+ write = None
+ else:
+ write.__name__ = "_write_" + name
+ pars['fset'] = write
+ setattr(DeviceDispatcher, write.__name__, write)
+ attr_data = AttrData.from_dict(pars)
+ DeviceDispatcherClass.attr_list[name] = attr_data
+ return DeviceDispatcher
+
+
+class Server:
+ """
+ Server helper
+ """
+
+ Phase0, Phase1, Phase2 = range(3)
+ PreInitPhase = Phase1
+ PostInitPhase = Phase2
+
+ class TangoObjectAdapter:
+
+ def __init__(self, server, obj, full_name, alias=None,
+ tango_class_name=None):
+ self.__server = weakref.ref(server)
+ self.full_name = full_name
+ self.alias = alias
+ self.class_name = obj.__class__.__name__
+ if tango_class_name is None:
+ tango_class_name = self.class_name
+ self.tango_class_name = tango_class_name
+ self.__object = weakref.ref(obj, self.__onObjectDeleted)
+
+ def __onObjectDeleted(self, object_weak):
+ self.__object = None
+ server = self._server
+ server.log.info("object deleted %s(%s)", self.class_name,
+ self.full_name)
+ server.unregister_object(self.full_name)
+
+ @property
+ def _server(self):
+ return self.__server()
+
+ @property
+ def _object(self):
+ obj = self.__object
+ if obj is None:
+ return None
+ return obj()
+
+ def __init__(self, server_name, server_type=None, port=None,
+ event_loop_callback=None, init_callbacks=None,
+ auto_clean=False, green_mode=None, tango_classes=None,
+ protocol="pickle"):
+ if server_name is None:
+ raise ValueError("Must give a valid server name")
+ self.__server_name = server_name
+ self.__server_type = server_type
+ self.__port = port
+ self.__event_loop_callback = event_loop_callback
+ if init_callbacks is None:
+ init_callbacks = {}
+ self.__init_callbacks = init_callbacks
+ self.__util = None
+ self.__objects = {}
+ self.__running = False
+ self.__auto_clean = auto_clean
+ self.__green_mode = green_mode
+ self.__protocol = protocol
+ self.__tango_classes = _to_classes(tango_classes or [])
+ self.__tango_devices = []
+ if self.async_mode:
+ self.__worker = _create_async_worker(self.green_mode)
+ else:
+ self.__worker = get_worker()
+ set_worker(self.__worker)
+ self.log = logging.getLogger("tango.Server")
+ self.__phase = Server.Phase0
+
+ def __build_args(self):
+ args = [self.server_type, self.__server_name]
+ if self.__port is not None:
+ args.extend(["-ORBendPoint",
+ "giop:tcp::{0}".format(self.__port)])
+ return args
+
+ def __exec_cb(self, cb):
+ if not cb:
+ return
+ self.worker.execute(cb)
+
+ def __find_tango_class(self, key):
+ pass
+
+ def __prepare(self):
+ """Update database with existing devices"""
+ self.log.debug("prepare")
+
+ if self.__phase > 0:
+ raise RuntimeError("Internal error: Can only prepare in phase 0")
+
+ server_instance = self.server_instance
+ db = Database()
+
+ # get list of server devices if server was already registered
+ server_registered = server_instance in db.get_server_list()
+
+ if server_registered:
+ dserver_name = "dserver/{0}".format(server_instance)
+ if db.import_device(dserver_name).exported:
+ import tango
+ dserver = tango.DeviceProxy(dserver_name)
+ try:
+ dserver.ping()
+ raise Exception("Server already running")
+ except:
+ self.log.info("Last time server was not properly "
+ "shutdown!")
+ db_class_map, db_device_map = self.get_devices()
+ else:
+ db_class_map, db_device_map = {}, {}
+
+ db_devices_add = {}
+
+ # all devices that are registered in database that are not registered
+ # as tango objects or for which the tango class changed will be removed
+ db_devices_remove = set(db_device_map) - set(self.__objects)
+
+ for local_name, local_object in self.__objects.items():
+ local_class_name = local_object.tango_class_name
+ db_class_name = db_device_map.get(local_name)
+ if db_class_name:
+ if local_class_name != db_class_name:
+ db_devices_remove.add(local_name)
+ db_devices_add[local_name] = local_object
+ else:
+ db_devices_add[local_name] = local_object
+
+ for device in db_devices_remove:
+ db.delete_device(device)
+ try:
+ db.delete_device_alias(db.get_alias(device))
+ except:
+ pass
+
+ # register devices in database
+
+ # add DServer
+ db_dev_info = DbDevInfo()
+ db_dev_info.server = server_instance
+ db_dev_info._class = "DServer"
+ db_dev_info.name = "dserver/" + server_instance
+
+ db_dev_infos = [db_dev_info]
+ aliases = []
+ for obj_name, obj in db_devices_add.items():
+ db_dev_info = DbDevInfo()
+ db_dev_info.server = server_instance
+ db_dev_info._class = obj.tango_class_name
+ db_dev_info.name = obj.full_name
+ db_dev_infos.append(db_dev_info)
+ if obj.alias:
+ aliases.append((obj.full_name, obj.alias))
+
+ db.add_server(server_instance, db_dev_infos)
+
+ # add aliases
+ for alias_info in aliases:
+ db.put_device_alias(*alias_info)
+
+ def __clean_up_process(self):
+ if not self.__auto_clean:
+ return
+ clean_up = _CLEAN_UP_TEMPLATE.format(
+ server_instance=self.server_instance)
+ import subprocess
+ res = subprocess.call([sys.executable, "-c", clean_up])
+ if res:
+ self.log.error("Failed to cleanup")
+
+ def __initialize(self):
+ self.log.debug("initialize")
+ async_mode = self.async_mode
+ event_loop = self.__event_loop_callback
+
+ util = self.tango_util
+ u_instance = util.instance()
+
+ if async_mode:
+ if event_loop:
+ event_loop = functools.partial(self.worker.execute,
+ event_loop)
+ if event_loop:
+ u_instance.server_set_event_loop(event_loop)
+
+ _add_classes(util, self.__tango_classes)
+
+ if async_mode:
+ tango_thread_id = self.worker.run_in_thread(self.__tango_loop)
+
+ def __run(self, timeout=None):
+ if self.async_mode:
+ return self.worker.run(timeout=timeout)
+ else:
+ self.__tango_loop()
+
+ def __tango_loop(self):
+ self.log.debug("server loop started")
+ self.__running = True
+ u_instance = self.tango_util.instance()
+ u_instance.server_init()
+ self._phase = Server.Phase2
+ self.log.info("Ready to accept request")
+ u_instance.server_run()
+ self.worker.stop()
+ if self.__auto_clean:
+ self.__clean_up_process()
+ self.log.debug("server loop exit")
+
+ @property
+ def _phase(self):
+ return self.__phase
+
+ @_phase.setter
+ def _phase(self, phase):
+ self.__phase = phase
+ cb = self.__init_callbacks.get(phase)
+ self.__exec_cb(cb)
+
+ @property
+ def server_type(self):
+ server_type = self.__server_type
+ if server_type is None:
+ server_file = os.path.basename(sys.argv[0])
+ server_type = os.path.splitext(server_file)[0]
+ return server_type
+
+ @property
+ def server_instance(self):
+ return "{0}/{1}".format(self.server_type, self.__server_name)
+
+ @property
+ def tango_util(self):
+ if self.__util is None:
+ import tango
+ self.__util = tango.Util(self.__build_args())
+ self._phase = Server.Phase1
+ return self.__util
+
+ @property
+ def green_mode(self):
+ gm = self.__green_mode
+ if gm is None:
+ from tango import get_green_mode
+ gm = get_green_mode()
+ return gm
+
+ @green_mode.setter
+ def green_mode(self, gm):
+ if gm == self.__green_mode:
+ return
+ if self.__running:
+ raise RuntimeError("Cannot change green mode while "
+ "server is running")
+ self.__green_mode = gm
+
+ @property
+ def async_mode(self):
+ return self.green_mode in (GreenMode.Gevent, GreenMode.Asyncio)
+
+ @property
+ def worker(self):
+ return self.__worker
+
+ def dumps(self, obj):
+ return dumps(self.__protocol, obj)
+
+ def get_devices(self):
+ """
+ Helper that retuns a dict of devices for this server.
+
+ :return:
+ Returns a tuple of two elements:
+ - dict<tango class name : list of device names>
+ - dict<device names : tango class name>
+ :rtype: tuple<dict, dict>
+ """
+ if self.__util is None:
+ import tango
+ db = tango.Database()
+ else:
+ db = self.__util.get_database()
+ server = self.server_instance
+ dev_list = db.get_device_class_list(server)
+ class_map, dev_map = {}, {}
+ for class_name, dev_name in zip(dev_list[1::2], dev_list[::2]):
+ dev_names = class_map.get(class_name)
+ if dev_names is None:
+ class_map[class_name] = dev_names = []
+ dev_name = dev_name.lower()
+ dev_names.append(dev_name)
+ dev_map[dev_name] = class_name
+ return class_map, dev_map
+
+ def get_tango_object(self, name):
+ return self.__objects.get(name.lower())
+
+ def get_tango_class(self, tango_class_name):
+ for klass in self.__tango_classes:
+ if klass.TangoClassName == tango_class_name:
+ return klass
+
+ def register_tango_device(self, klass, name):
+ if inspect.isclass(klass):
+ if isinstance(klass, Device):
+ kk, Device.TangoClassClass
+ k = Device
+ kname = Device.TangoClassName
+ # TODO: ??
+ else:
+ raise ValueError
+ else:
+ raise NotImplementedError
+
+ def register_tango_class(self, klass):
+ if self._phase > Server.Phase1:
+ raise RuntimeError("Cannot add new class after phase 1 "
+ "(i.e. after server_init)")
+ self.__tango_classes.append(klass)
+
+ def unregister_object(self, name):
+ tango_object = self.__objects.pop(name.lower())
+ if self._phase > Server.Phase1:
+ import tango
+ util = tango.Util.instance()
+ if not util.is_svr_shutting_down():
+ util.delete_device(tango_object.tango_class_name, name)
+
+ def register_object(self, obj, name, tango_class_name=None,
+ member_filter=None):
+ """
+ :param member_filter:
+ callable(obj, tango_class_name, member_name, member) -> bool
+ """
+ slash_count = name.count("/")
+ if slash_count == 0:
+ alias = name
+ full_name = "{0}/{1}".format(self.server_instance, name)
+ elif slash_count == 2:
+ alias = None
+ full_name = name
+ else:
+ raise ValueError("Invalid name")
+
+ class_name = tango_class_name or obj.__class__.__name__
+ tango_class = self.get_tango_class(class_name)
+
+ if tango_class is None:
+ tango_class = create_tango_class(self, obj, class_name,
+ member_filter=member_filter)
+ self.register_tango_class(tango_class)
+
+ tango_object = self.TangoObjectAdapter(self, obj, full_name, alias,
+ tango_class_name=class_name)
+ self.__objects[full_name.lower()] = tango_object
+ if self._phase > Server.Phase1:
+ import tango
+ util = tango.Util.instance()
+ util.create_device(class_name, name)
+ return tango_object
+
+ def run(self, timeout=None):
+ self.log.debug("run")
+ async_mode = self.async_mode
+ running = self.__running
+ if not running:
+ self.__prepare()
+ self.__initialize()
+ else:
+ if not async_mode:
+ raise RuntimeError("Server is already running")
+ self.__run(timeout=timeout)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git
More information about the debian-science-commits
mailing list