[pytango] 298/483: git-svn-id: http://svn.code.sf.net/p/tango-cs/code/bindings/PyTango/trunk at 24226 4e9c00fd-8f2e-0410-aa12-93ce3db5e235

Sandor Bodo-Merle sbodomerle-guest at moszumanska.debian.org
Thu Sep 28 19:14:52 UTC 2017


This is an automated email from the git hooks/post-receive script.

sbodomerle-guest pushed a commit to annotated tag bliss_8.10
in repository pytango.

commit 594fe0d57e8ab0f50bcc9074e2e527cc32c15d38
Author: tiagocoutinho <tiagocoutinho at 4e9c00fd-8f2e-0410-aa12-93ce3db5e235>
Date:   Fri Nov 15 16:27:41 2013 +0000

    git-svn-id: http://svn.code.sf.net/p/tango-cs/code/bindings/PyTango/trunk@24226 4e9c00fd-8f2e-0410-aa12-93ce3db5e235
---
 .../python/ipython/ipython_00_11/ipython_10_00.py  | 1271 ++++++++++++++++++++
 1 file changed, 1271 insertions(+)

diff --git a/src/boost/python/ipython/ipython_00_11/ipython_10_00.py b/src/boost/python/ipython/ipython_00_11/ipython_10_00.py
new file mode 100644
index 0000000..0653145
--- /dev/null
+++ b/src/boost/python/ipython/ipython_00_11/ipython_10_00.py
@@ -0,0 +1,1271 @@
+#!/usr/bin/env python
+
+# -----------------------------------------------------------------------------
+# This file is part of PyTango (http://www.tinyurl.com/PyTango)
+#
+# Copyright 2006-2012 CELLS / ALBA Synchrotron, Bellaterra, Spain
+# Copyright 2013-2014 European Synchrotron Radiation Facility, Grenoble, France
+#
+# Distributed under the terms of the GNU Lesser General Public License,
+# either version 3 of the License, or (at your option) any later version.
+# See LICENSE.txt for more info.
+# -----------------------------------------------------------------------------
+
+"""An IPython profile designed to provide a user friendly interface to Tango"""
+
+from __future__ import print_function
+
+__all__ = ["load_config", "load_ipython_extension", "unload_ipython_extension"]
+
+import sys
+import os
+import re
+import io
+import operator
+import textwrap
+
+from IPython.core.error import UsageError
+from IPython.utils.ipstruct import Struct
+from IPython.core.page import page
+from IPython.core.interactiveshell import InteractiveShell
+from IPython.config.application import Application
+from IPython.terminal.ipapp import launch_new_instance
+
+import PyTango
+import PyTango.utils
+
+_TG_EXCEPTIONS = PyTango.DevFailed, PyTango.ConnectionFailed, \
+    PyTango.CommunicationFailed, \
+    PyTango.NamedDevFailed, PyTango.NamedDevFailedList, \
+    PyTango.WrongNameSyntax, PyTango.NonDbDevice, PyTango.WrongData, \
+    PyTango.NonSupportedFeature, PyTango.AsynCall, \
+    PyTango.AsynReplyNotArrived, PyTango.EventSystemFailed, \
+    PyTango.DeviceUnlocked, PyTango.NotAllowed
+
+_DB_SYMB = "db"
+_DFT_TANGO_HOST = None
+_TANGO_STORE = "__tango_store"
+_TANGO_ERR = "__tango_error"
+_PYTHON_ERR = "__python_error"
+_tango_init = False
+
+#-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
+# IPython utilities
+#-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
+
+def get_pylab_mode():
+    return get_app().pylab
+
+def get_color_mode():
+    return get_config().InteractiveShell.colors
+
+def get_app():
+    #return TerminalIPythonApp.instance()
+    return Application.instance()
+
+def get_shell():
+    """Get the global InteractiveShell instance."""
+    return get_app().shell
+
+def get_ipapi():
+    """Get the global InteractiveShell instance."""
+    return InteractiveShell.instance()
+
+def get_config():
+    return get_app().config
+
+def get_editor():
+    return get_ipapi().editor
+
+def get_user_ns():
+    return get_shell().user_ns
+
+class DeviceClassCompleter(object):
+    """Completer class that returns the list of devices of some class when
+    called. """
+    
+    def __init__(self, klass, devices):
+        self._klass = klass
+        self._devices = devices
+    
+    def __call__(self, ip, evt):
+        return self._devices
+
+
+# Rewrite DeviceProxy constructor because the database context that the user is
+# using may be different than the default TANGO_HOST. What we do is always append
+# the name of the database in usage to the device name given by the user (in case 
+# he doesn't give a database name him(her)self, of course.
+#__DeviceProxy_init_orig__ = PyTango.DeviceProxy.__init__
+#def __DeviceProxy__init__(self, dev_name):
+#    db = __get_db()
+#    if db is None: return
+#    if dev_name.count(":") == 0:
+#        db_name = "%s:%s" % (db.get_db_host(), db.get_db_port())
+#        dev_name = "%s/%s" % (db_name, dev_name)
+#    __DeviceProxy_init_orig__(self, dev_name)
+#PyTango.DeviceProxy.__init__ = __DeviceProxy__init__
+
+#-------------------------------------------------------------------------------
+# Completers
+#-------------------------------------------------------------------------------
+
+def __DeviceProxy_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    ret = list(db._db_cache.devices.keys())
+    ret.extend(db._db_cache.aliases.keys())
+    return ret
+
+def __DeviceClass_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    return list(db._db_cache.klasses.keys())
+
+def __DeviceAlias_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    return list(db._db_cache.aliases.keys())
+
+def __AttributeAlias_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    return list(db._db_cache.attr_aliases.keys())
+
+def __PureDeviceProxy_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    return list(db._db_cache.devices.keys())
+
+def __AttributeProxy_completer(ip, evt):
+    db = __get_db()
+    if db is None: return
+    cache = db._db_cache
+    
+    symb = evt.symbol
+    n = symb.count("/")
+    ret, devs, dev_aliases = None, cache.devices, cache.aliases
+    # dev_list and dev_alias_list are case insensitive. They should only be used
+    # to search for elements. Their elements are the same as the keys of the 
+    # dictionaries devs and dev_aliases respectively
+    dev_list, dev_alias_list = cache.device_list, cache.alias_list
+    dev_name = None
+    if n == 0:
+        # means it can be an attr alias, a device name has alias or as full device name
+        ret = list(cache.get("attr_aliases").keys())
+        ret.extend([ d+"/" for d in devs ])
+        ret.extend([ d+"/" for d in dev_aliases ])
+        # device alias found!
+        if symb in dev_alias_list:
+            dev_name = symb
+    elif n == 1:
+        # it can still be a full device name
+        ret = [ d+"/" for d in devs ]
+        # it can also be devalias/attr_name
+        dev, attr = symb.split("/")
+        if dev in dev_alias_list:
+            dev_name = dev
+    elif n == 2:
+        # it is for sure NOT an attribute alias or a device alias
+        ret = [ d+"/" for d in devs ]
+        # device found!
+        if symb in dev_list:
+            dev_name = symb
+    elif n == 3:
+        # it is for sure a full attribute name
+        dev, sep, attr = symb.rpartition("/")
+        if dev in dev_list:
+            dev_name = dev
+
+    if dev_name is None:
+        return ret
+    
+    try:
+        d = __get_device_proxy(dev_name)
+        # first check in cache for the attribute list
+        if not hasattr(d, "_attr_list"):
+            d._attr_list = d.get_attribute_list()
+        if ret is None:
+            ret = []
+        ret.extend([ "%s/%s" % (dev_name, a) for a in d._attr_list ])
+    except:
+        # either DeviceProxy could not be created or device is not online
+        pass
+
+    return ret
+
+def __get_device_proxy(dev_name):
+    db = __get_db()
+    if db is None: return
+    cache = db._db_cache
+    from_alias = cache.aliases.get(dev_name)
+    
+    if from_alias is not None:
+        dev_name = from_alias
+
+    data = cache.devices.get(dev_name)
+    if data is not None:
+        d = data[3]
+        if d is None:
+            try:
+                d = data[3] = PyTango.DeviceProxy(dev_name)
+            except:
+                pass
+        return d
+
+def __get_device_subscriptions(dev_name):
+    db = __get_db()
+    if db is None: return
+    cache = db._db_cache
+    from_alias = cache.aliases.get(dev_name)
+    
+    if from_alias is not None:
+        dev_name = from_alias
+
+    data = cache.devices.get(dev_name)
+    if data is not None:
+        return data[4]
+
+__monitor_completer = __AttributeProxy_completer
+
+#-------------------------------------------------------------------------------
+# Magic commands
+#-------------------------------------------------------------------------------
+
+def refreshdb(self, parameter_s=''):
+    init_db(parameter_s)
+
+def switchdb(self, parameter_s=''):
+    """Switches the active tango Database.
+    
+    Usage: switchdb <host>[(:| )<port>]
+
+    <port> is optional. If not given it defaults to 10000.
+    
+    Examples:
+    In [1]: switchdb homer:10005
+    In [2]: switchdb homer 10005
+    In [3]: switchdb homer"""
+    
+    if parameter_s == '':
+        raise UsageError("%switchdb: Must specify a tango database name. "\
+                         "See '%switchdb?'")
+    return init_db(parameter_s)
+
+def lsdev(self, parameter_s=''):
+    """Lists all known tango devices.
+    
+    Usage: lsdev [<device name filter(regular expression)]
+    
+    Examples:
+    In [1]: lsdev
+    In [2]: lsdev sys.*"""
+    
+    if parameter_s:
+        reg_exp = re.compile(parameter_s, re.IGNORECASE)
+    else:
+        reg_exp = None
+    
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database. Device list is empty")
+        return
+    data = db._db_cache.devices
+
+    s = io.BytesIO()
+    lengths = 40, 25, 25, 20
+    title = "Device", "Alias", "Server", "Class"
+    templ = "{0:{l[0]}} {1:{l[1]}} {2:{l[2]}} {3:{l[3]}}"
+    msg = templ.format(*title, l=lengths)
+    print(msg, file=s)
+    print(*map(operator.mul, lengths, len(lengths)*"-"), file=s)
+    for d, v in data.items():
+        if reg_exp and not reg_exp.match(d):
+            continue
+        print(templ.format(d, v[0], v[1], v[2], l=lengths), file=s)
+    s.seek(0)
+    page(s.read())
+
+def lsdevclass(self, parameter_s=''):
+    """Lists all known tango device classes.
+    
+    Usage: lsdevclass [<class name filter(regular expression)]
+    
+    Examples:
+    In [1]: lsdevclass
+    In [2]: lsdevclass Motor.*"""
+    
+    if parameter_s:
+        reg_exp = re.compile(parameter_s, re.IGNORECASE)
+    else:
+        reg_exp = None
+    
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database. Device class list is empty")
+        return
+    data = db._db_cache.klasses
+
+    s = io.BytesIO()
+    data = [ "%-030s" % klass for klass in data.keys() if not reg_exp or reg_exp.match(klass) ]
+    s = textwrap.fill(" ".join(data), 80)
+    page(s)
+
+def lsserv(self, parameter_s=''):
+    """Lists all known tango servers.
+    
+    Usage: lsserv [<class name filter(regular expression)]
+    
+    Examples:
+    In [1]: lsserv
+    In [2]: lsserv Motor/.*"""
+    
+    if parameter_s:
+        reg_exp = re.compile(parameter_s, re.IGNORECASE)
+    else:
+        reg_exp = None
+    
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database. Device class list is empty")
+        return
+    data = db._db_cache.servers
+
+    s = io.BytesIO()
+    data = [ "%-030s" % server for server in data.keys() if not reg_exp or reg_exp.match(server) ]
+    s = textwrap.fill(" ".join(data), 80)
+    page(s)
+
+def tango_error(self, parameter_s=''):
+    """Displays detailed information about the last tango error"""
+    global _TANGO_ERR
+    err_info = get_user_ns().get(_TANGO_ERR)
+    if err_info is None:
+        print("No tango error reported so far.")
+        return
+    print("Last tango error:")
+    print(err_info[1])
+
+def python_error(self, parameter_s=''):
+    """Displays detailed information about the last python error"""
+    global _PYTHON_ERR
+    err_info = get_user_ns().get(_PYTHON_ERR)
+    if err_info is None:
+        print("No error reported so far.")
+        return
+    ip = get_ipapi()
+    etype, evalue, etb = err_info[:3]
+    ip.InteractiveTB(etype=etype, evalue=evalue, etb=etb, tb_offset=None)
+
+_EVT_LOG = None
+def __get_event_log():
+    global _EVT_LOG
+    if _EVT_LOG is None:
+#        qthreads = get_config().q4thread
+#        if qthreads:
+#            import ipy_qt
+#            model = ipy_qt.EventLoggerTableModel(capacity=10000)
+#            _EVT_LOG = ipy_qt.EventLogger(model=model)
+#            _EVT_LOG.setWindowTitle("ITango - Event Logger Table")
+#        else:
+        import PyTango.ipython.eventlogger
+        _EVT_LOG = PyTango.ipython.eventlogger.EventLogger(capacity=10000, pager=page)
+    return _EVT_LOG
+
+def mon(self, parameter_s=''):
+    """Monitor a given attribute.
+    
+    %mon -a <attribute name>           - activates monitoring of given attribute
+    %mon -d <attribute name>           - deactivates monitoring of given attribute
+    %mon -r                            - deactivates monitoring of all attributes
+    %mon -i <id>                       - displays detailed information for the event with given id
+    %mon -l <dev filter> <attr filter> - shows event table filtered with the regular expression for attribute name
+    %mon                               - shows event table (= %mon -i .* .*)"""
+    
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    
+    # make sure parameter_s is a str and not a unicode
+    parameter_s = str(parameter_s)
+    opts, args = self.parse_options(parameter_s,'adril', mode='list')
+    if len(args) > 3:
+        raise UsageError("%mon: too many arguments")
+    if 'd' in opts:
+        try:
+            todel = args[0]
+        except IndexError:
+            raise UsageError("%mon -d: must provide an attribute to unmonitor")
+        else:
+            try:
+                dev, sep, attr = todel.rpartition("/")
+                subscriptions = __get_device_subscriptions(dev)
+                id = subscriptions[attr.lower()]
+                del subscriptions[attr.lower()]
+                d = __get_device_proxy(dev)
+                d.unsubscribe_event(id)
+                print("Stopped monitoring '%s'" % todel)
+            except KeyError:
+                raise UsageError("%%mon -d: Not monitoring '%s'" % todel)
+                    
+    elif 'a' in opts:
+        try:
+            toadd = args[0]
+        except IndexError:
+            raise UsageError("%mon -a: must provide an attribute to monitor")
+        dev, sep, attr = toadd.rpartition("/")
+        subscriptions = __get_device_subscriptions(dev)
+        id = subscriptions.get(attr.lower())
+        if id is not None:
+            raise UsageError("%%mon -a: Already monitoring '%s'" % toadd)
+        d = __get_device_proxy(dev)
+        w = __get_event_log()
+        model = w.model()
+        id = d.subscribe_event(attr, PyTango.EventType.CHANGE_EVENT, model, [])
+        subscriptions[attr.lower()] = id
+        print("'%s' is now being monitored. Type 'mon' to see all events" % toadd)
+    elif 'r' in opts:
+        for d, v in db._db_cache.devices.items():
+            d, subs = v[3], v[4]
+            for id in subs.values():
+                d.unsubscribe_event(id)
+            v[4] = {}
+    elif 'i' in opts:
+        try:
+            evtid = int(args[0])
+        except IndexError:
+            raise UsageError("%mon -i: must provide an event ID")
+        except ValueError:
+            raise UsageError("%mon -i: must provide a valid event ID")
+        try:
+            w = __get_event_log()
+            e = w.getEvents()[evtid]
+            if e.err:
+                print(str(PyTango.DevFailed(*e.errors)))
+            else:
+                print(str(e))
+        except IndexError:
+            raise UsageError("%mon -i: must provide a valid event ID")
+    elif 'l' in opts:
+        try:
+            dexpr = args[0]
+            aexpr = args[1]
+        except IndexError:
+            raise UsageError("%mon -l: must provide valid device and " \
+                             "attribute filters")
+        w = __get_event_log()
+        w.show(dexpr, aexpr)
+    else:
+        w = __get_event_log()
+        w.show()
+
+#-------------------------------------------------------------------------------
+# Useful functions (not magic commands but accessible from CLI as normal python
+# functions)
+#-------------------------------------------------------------------------------
+
+def get_device_map():
+    """Returns a dictionary where keys are device names and value is a sequence
+    of 4 elements: 
+        - alias name (empty string if no alias is defined)
+        - tango server name (full tango server name <name>/<instance>)
+        - tango class name
+        - DeviceProxy to the device or None if it hasn't been initialized yet
+          (this last element is for internal tango usage only. If you need a 
+           DeviceProxy to this device, create your own)"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.devices
+
+def get_server_map():
+    """Returns a dictionary where keys are server names (<name>/<instance>)
+    and value is a sequence of device names that belong to the server"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.servers
+
+def get_class_map():
+    """Returns a dictionary where keys are the tango classes and value is a 
+    sequence of device names that belong to the tango class"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.klasses
+
+def get_alias_map():
+    """Returns a dictionary where keys are the tango device aliases and value 
+    is a the tango device name"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.aliases
+
+def get_device_list():
+    """Returns a case insensitive list of device names for the current 
+    database"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.device_list
+
+def get_alias_list():
+    """Returns a case insensitive list of device aliases for the current 
+    database"""
+    db = __get_db()
+    if db is None:
+        print("You are not connected to any Tango Database.")
+        return
+    return db._db_cache.alias_list    
+    
+#-------------------------------------------------------------------------------
+# Private helper methods
+#-------------------------------------------------------------------------------
+
+def __exc_handler(ip, etype, value, tb, tb_offset=None):
+    global _TG_EXCEPTIONS
+    user_ns = get_user_ns()
+    if etype in _TG_EXCEPTIONS:
+        global _TANGO_ERR
+        user_ns[_TANGO_ERR] = etype, value, tb, tb_offset
+        if len(value.args):
+            v = value[0]
+            print("%s: %s" % (v.reason ,v.desc))
+        else:
+            print("Empty DevFailed")
+        print("(For more detailed information type: tango_error)")
+    else:
+        global _PYTHON_ERR
+        user_ns[_PYTHON_ERR] = etype, value, tb, tb_offset
+        print(etype.__name__ + ": " + str(value))
+        print("(For more detailed information type: python_error)")
+
+def __get_default_tango_host():
+    global _DFT_TANGO_HOST
+    if _DFT_TANGO_HOST is None:
+        try:
+            db = PyTango.Database()
+            _DFT_TANGO_HOST = "%s:%s" % (db.get_db_host(), db.get_db_port())
+        except:
+            pass
+    return _DFT_TANGO_HOST
+
+def __get_db(host_port=None):
+    """host_port == None: Use current DB whatever it is or create
+                          default if doesn't exist
+       host_port == '' : use default db. If it is not the current db, switch
+                         current db to it and return it
+       host_port == ... : if ... is not the current db, switch current db to it
+                          and return it
+    """
+    
+    ip = get_ipapi()
+    user_ns = get_user_ns()
+
+    global _DB_SYMB
+    db = user_ns.get(_DB_SYMB)
+    
+    if host_port is None:
+        if db is None:
+            host_port = __get_default_tango_host()
+    elif host_port == '':
+        host_port = __get_default_tango_host()
+    else:
+        host_port = host_port.strip().replace(" ",":")
+        if host_port.count(":") == 0:
+            host_port += ":10000"
+    
+    if host_port is not None:
+        host_port = str(host_port)
+    
+    if db is None:
+        create_db = True
+    elif host_port is None:
+        create_db = False
+    else:
+        old_host_port = "%s:%s" % (db.get_db_host(), db.get_db_port())
+        create_db = old_host_port != host_port
+
+    if create_db:
+        try:
+            db = PyTango.Database(*host_port.split(":"))
+            
+            user_ns["DB_NAME"] = host_port
+        except Exception as e:
+            if db:
+                print("\nCould not access Database %s:" % host_port)
+                print(str(e))
+                old_host_port = "%s:%s" % (db.get_db_host(), db.get_db_port())
+                print("Maintaining connection to Database", old_host_port)
+                user_ns["DB_NAME"] = old_host_port
+            else:
+                print("\nCould not access any Database. Make sure:")
+                print("\t- .tangorc, /etc/tangorc or TANGO_HOST environment is defined.")
+                print("\t- the Database DS is running")
+                user_ns["DB_NAME"] = "OFFLINE"
+                
+        # register the 'db' in the user namespace
+        user_ns.update({ _DB_SYMB : db })
+        
+    return db
+
+def __get_obj_name(o):
+    try:
+        n = o.__name__
+    except:
+        try:
+            n = o.__class__.__name__
+        except:
+            n = "<unknown>"
+    return n
+
+def __completer_wrapper(f):
+    def wrapper(ip, evt):
+        try:
+            return f(ip, evt)
+        except Exception as e:
+            print()
+            print("An unexpected exception ocorred during ITango command completer.")
+            print("Please send a bug report to the PyTango team with the following information:")
+            print(80*"-")
+            print("Completer:",__get_obj_name(f))
+            print(80*"-")
+            import traceback
+            traceback.print_exc()
+            print(80*"-")
+            raise e
+    return wrapper
+
+def __expose_magic(ip, name, fn, completer_func=None):
+    ip.define_magic(name, fn)
+    
+    if completer_func is None:
+        return
+    
+    # enable macro param completion
+    ip.set_hook('complete_command', completer_func, re_key = ".*" + name)
+
+def __unexpose_magic(ip, name):
+    delattr(ip, name)
+
+def __build_color_scheme(ip, name):
+    import IPython.Prompts
+    import IPython.PyColorize
+    import IPython.excolors
+    from IPython.utils.coloransi import TermColors, InputTermColors
+
+    # make some schemes as instances so we can copy them for modification easily:
+    PromptColors = IPython.Prompts.PromptColors
+    ANSICodeColors = IPython.PyColorize.ANSICodeColors
+    ExceptionColors = IPython.excolors.ExceptionColors
+    TBColors = ip.IP.InteractiveTB.color_scheme_table
+    SyntaxColors = ip.IP.SyntaxTB.color_scheme_table
+    InspectColors = IPython.OInspect.InspectColors
+    
+    promptTangoColors = PromptColors['Linux'].copy(name)
+    ANSITangoColors = ANSICodeColors['Linux'].copy(name)
+    exceptionTangoColors = ExceptionColors['Linux'].copy(name)
+    TBTangoColors = TBColors['Linux'].copy(name)
+    syntaxTangoColors = SyntaxColors['Linux'].copy(name)
+    inspectTangoColors = InspectColors['Linux'].copy(name)
+    
+    # initialize prompt with default tango colors
+    promptTangoColors.colors.in_prompt  = InputTermColors.Purple
+    promptTangoColors.colors.in_number  = InputTermColors.LightPurple
+    promptTangoColors.colors.in_prompt2 = InputTermColors.Purple
+    promptTangoColors.colors.out_prompt = TermColors.Blue
+    promptTangoColors.colors.out_number = TermColors.LightBlue
+
+    ret= { "prompt" : (PromptColors, promptTangoColors),
+          "ANSI"   : (ANSICodeColors, ANSITangoColors),
+          "except" : (ExceptionColors, exceptionTangoColors),
+          "TB"     : (TBColors, TBTangoColors),
+          "Syntax" : (SyntaxColors, syntaxTangoColors),
+          "Inspect": (InspectColors, inspectTangoColors) }
+
+    if ip.IP.isthreaded:
+        TBThreadedColors = ip.IP.sys_excepthook.color_scheme_table
+        TBThreadedTangoColors = TBThreadedColors['Linux'].copy(name)
+        ret["TBThreaded"] = TBThreadedColors, TBThreadedTangoColors
+    return ret
+
+#-------------------------------------------------------------------------------
+# Initialization methods
+#-------------------------------------------------------------------------------
+
+def init_pytango(ip):
+    """Initializes the IPython environment with PyTango elements"""
+
+    # export symbols to IPython namepspace
+    ip.ex("import PyTango")
+    ip.ex("from PyTango import DeviceProxy, AttributeProxy, Database, Group")
+    ip.ex("Device = DeviceProxy")
+    ip.ex("Attribute = AttributeProxy")
+
+    # add completers
+    dp_completer = __completer_wrapper(__DeviceProxy_completer)
+    attr_completer = __completer_wrapper(__AttributeProxy_completer)
+    ip.set_hook('complete_command', dp_completer, re_key = ".*DeviceProxy[^\w\.]+")
+    ip.set_hook('complete_command', dp_completer, re_key = ".*Device[^\w\.]+")
+    ip.set_hook('complete_command', attr_completer, re_key = ".*AttributeProxy[^\w\.]+")
+    ip.set_hook('complete_command', attr_completer, re_key = ".*Attribute[^\w\.]+")
+    
+    ip.set_custom_exc((Exception,), __exc_handler)
+
+def init_db(parameter_s=''):
+    ip = get_ipapi()
+    user_ns = get_user_ns()
+    global _DB_SYMB
+    old_db = user_ns.get(_DB_SYMB)
+    
+    db = __get_db(parameter_s)
+    
+    if old_db is not None and hasattr(old_db, "_db_cache"):
+        old_junk = old_db._db_cache["junk"].keys()
+        for e in old_junk:
+            del user_ns[e]
+    else:
+        old_junk = ()
+        
+    if db is None: return
+    
+    os.environ["TANGO_HOST"] = "%s:%s" % (db.get_db_host(), db.get_db_port())
+    
+    # Initialize device and server information
+    query = "SELECT name, alias, server, class FROM device order by name"
+    
+    r = db.command_inout("DbMySqlSelect", query)
+    row_nb, column_nb = r[0][-2], r[0][-1]
+    results, data = r[0][:-2], r[1]
+    assert row_nb == len(data) / column_nb
+    devices, aliases, servers, klasses = data[0::4], data[1::4], data[2::4], data[3::4]
+
+    #CD = PyTango.utils.CaselessDict
+    CD = dict
+    dev_dict, serv_dict, klass_dict, alias_dict = CD(), CD(), CD(), CD()
+    
+    for device, alias, server, klass in zip(devices, aliases, servers, klasses):
+        dev_lower = device.lower()
+
+        # hide dserver devices
+        if dev_lower.startswith("dserver/"): continue
+        
+        # hide alias that start with "_"
+        if alias and alias[0] == "_": alias = ''
+        
+        # last None below is to be filled by DeviceProxy object on demand
+        # last empty dict<str, int> where keys is attribute name and value is 
+        # the subscription id
+        dev_dict[device] = [alias, server, klass, None, {}]
+        serv_devs = serv_dict.get(server)
+        if serv_devs is None:
+            serv_dict[server] = serv_devs = []
+        serv_devs.append(device)
+        klass_devs = klass_dict.get(klass)
+        if klass_devs is None:
+            klass_dict[klass] = klass_devs = []
+        klass_devs.append(device)
+        if len(alias):
+            alias_dict[alias] = device
+            serv_devs.append(alias)
+            klass_devs.append(alias)
+
+    exposed_klasses = {}
+    excluded_klasses = "DServer",
+    for klass, devices in klass_dict.items():
+        if klass in excluded_klasses:
+            continue
+        exists = klass in user_ns
+        if not exists or klass in old_junk:
+            c = DeviceClassCompleter(klass, devices)
+            ip.set_hook('complete_command', c, re_key = ".*" + klass + "[^\w\.]+")
+            exposed_klasses[klass] = PyTango.DeviceProxy
+    
+    # expose classes no user namespace
+    user_ns.update(exposed_klasses)
+    
+    # Initialize attribute information
+    query = "SELECT name, alias FROM attribute_alias order by alias"
+
+    r = db.command_inout("DbMySqlSelect", query)
+    row_nb, column_nb = r[0][-2], r[0][-1]
+    results, data = r[0][:-2], r[1]
+    assert row_nb == len(data) / column_nb
+    attributes, aliases = data[0::2], data[1::2]
+    
+    attr_alias_dict = {}
+    for attribute, alias in zip(attributes, aliases):
+        if len(alias):
+            attr_alias_dict[alias] = attribute
+    
+    device_list = PyTango.utils.CaselessList(dev_dict.keys())
+    alias_list = PyTango.utils.CaselessList(alias_dict.keys())
+    attr_alias_list = PyTango.utils.CaselessList(attr_alias_dict.keys())
+    
+    # Build cache
+    db_cache = Struct(devices=dev_dict, aliases=alias_dict,
+        servers=serv_dict, klasses=klass_dict, junk=exposed_klasses,
+        attr_aliases=attr_alias_dict, device_list=device_list,
+        alias_list=alias_list, attr_alias_list=attr_alias_list)
+    
+    db._db_cache = db_cache
+
+    # Add this DB to the list of known DBs (for possible use in magic commands)
+    if db.get_db_port_num() == 10000:
+        db_name = db.get_db_host()
+    else:
+        db_name = "%s:%s" % (db.get_db_host(), db.get_db_port())
+    return db
+
+def init_magic(ip):
+
+    import IPython.core.magic
+
+    new_style_magics = hasattr(IPython.core.magic, 'Magics') and hasattr(IPython.core.magic, 'magics_class')
+
+    if new_style_magics:
+        @IPython.core.magic.magics_class
+        class Tango(IPython.core.magic.Magics):
+            
+            refreshdb = IPython.core.magic.line_magic(refreshdb)
+            switchdb = IPython.core.magic.line_magic(switchdb)
+            lsdev = IPython.core.magic.line_magic(lsdev)
+            lsdevclass = IPython.core.magic.line_magic(lsdevclass)
+            lsserv = IPython.core.magic.line_magic(lsserv)
+            tango_error = IPython.core.magic.line_magic(tango_error)
+            python_error = IPython.core.magic.line_magic(python_error)
+            mon = IPython.core.magic.line_magic(mon)
+
+        ip.register_magics(Tango)
+        ip.set_hook('complete_command', __monitor_completer, re_key = ".*" + "mon")
+    else:
+        __expose_magic(ip, "refreshdb", refreshdb)
+        __expose_magic(ip, "switchdb", switchdb)
+        __expose_magic(ip, "lsdev", lsdev)
+        __expose_magic(ip, "lsdevclass", lsdevclass)
+        __expose_magic(ip, "lsserv", lsserv)
+        __expose_magic(ip, "tango_error", tango_error)
+        __expose_magic(ip, "python_error", python_error)
+        __expose_magic(ip, "mon", mon, __monitor_completer)
+    
+    get_user_ns().update({"get_device_map"   : get_device_map,
+                   "get_server_map"  : get_server_map,
+                   "get_class_map"   : get_class_map,
+                   "get_alias_map"   : get_alias_map,
+                   "get_device_list" : get_device_list,
+                   "get_alias_list"  : get_alias_list})
+
+def complete(text):
+    """a super complete!!!!"""
+    self = get_ipapi().IP
+    complete = self.Completer.complete
+    state = 0
+    comps = set()
+    while True:
+        newcomp = complete("", state, line_buffer=text)
+        if newcomp is None:
+            break
+        comps.add(newcomp)
+        state += 1
+    outcomps = sorted(comps)
+    return outcomps
+
+__DIRNAME = os.path.dirname(os.path.abspath(__file__))
+__RES_DIR = os.path.join(__DIRNAME, os.path.pardir, 'resource')
+
+class __TangoInfo(object):
+    """Helper class for when DeviceProxy.info() is not available"""
+    
+    def __init__(self, dev):
+        try:
+            db = dev.get_device_db()
+            klass = db.get_class_for_device(dev.dev_name())
+            self.dev_class = self.dev_type = klass
+        except:
+            self.dev_class = self.dev_type = 'Device'
+        self.doc_url = 'http://www.esrf.eu/computing/cs/tango/tango_doc/ds_doc/index.html'
+        self.server_host = 'Unknown'
+        self.server_id = 'Unknown'
+        self.server_version = 1
+
+def __get_device_icon(dev_proxy, klass="Device"):
+    icon_prop = "__icon"
+    db = dev_proxy.get_device_db()
+    try:
+        icon_filename = dev_proxy.get_property(icon_prop)[icon_prop]
+        if icon_filename:
+            icon_filename = icon_filename[0]
+        else:
+            icon_filename = db.get_class_property(klass, icon_prop)[icon_prop]
+            if icon_filename:
+                icon_filename = icon_filename[0]
+            else:            
+                icon_filename = klass.lower() + os.path.extsep + "png"
+    except:
+        icon_filename = klass.lower() + os.path.extsep + "png"
+    
+    if os.path.isabs(icon_filename):
+        icon = icon_filename
+    else:
+        icon = os.path.join(__RES_DIR, icon_filename)
+    if not os.path.isfile(icon):
+        icon = os.path.join(__RES_DIR, "device.png")
+    return icon
+
+__DEV_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td width="140" rowspan="7" valign="middle" align="center"><img src="{icon}" height="128"/></td>
+    <td width="140">Name:</td><td><b>{name}</b></td></tr>
+<tr><td width="140">Alias:</td><td>{alias}</td></tr>
+<tr><td width="140">Database:</td><td>{database}</td></tr>
+<tr><td width="140">Type:</td><td>{dev_class}</td></tr>
+<tr><td width="140">Server:</td><td>{server_id}</td></tr>
+<tr><td width="140">Server host:</td><td>{server_host}</td></tr>
+<tr><td width="140">Documentation:</td><td><a target="_blank" href="{doc_url}">{doc_url}</a></td></tr>
+</table>"""
+
+def display_deviceproxy_html(dev_proxy):
+    """displayhook function for PyTango.DeviceProxy, rendered as HTML"""
+    try:
+        info = dev_proxy.info()
+    except:
+        info = __TangoInfo(dev_proxy)
+    name = dev_proxy.dev_name()
+    fmt = dict(dev_class=info.dev_class, server_id=info.server_id,
+               server_host=info.server_host, name=name)
+    
+    try:
+        fmt["alias"] = dev_proxy.alias()
+    except:
+        fmt["alias"] = "-----"
+
+    db = dev_proxy.get_device_db()
+    try:
+        fmt["database"] = db.get_db_host() + ":" + db.get_db_port()
+    except:
+        try:
+            fmt["database"] = db.get_file_name()
+        except:
+            fmt["database"]  = "Unknown"
+
+    doc_url = info.doc_url.split("\n")[0]
+    try:
+        fmt["doc_url"] = doc_url[doc_url.index("http"):]
+    except ValueError:
+        fmt["doc_url"] = doc_url
+
+    fmt['icon'] = __get_device_icon(dev_proxy, info.dev_class)
+
+    return __DEV_HTML_TEMPLATE.format(**fmt)
+
+__DB_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td width="140" rowspan="2" valign="middle" align="center"><img src="{icon}" height="128"/></td>
+    <td><b>{name}</b></td></tr>
+<tr><td>{info}</td></tr>
+</table>"""
+
+def display_database_html(db):
+    """displayhook function for PyTango.Database, rendered as HTML"""
+    fmt = dict()
+
+    try:
+        fmt["name"] = db.get_db_host() + ":" + db.get_db_port()
+    except:
+        try:
+            fmt["name"] = db.get_file_name()
+        except:
+            fmt["name"]  = "Unknown"
+
+    try:
+        fmt["info"] = db.get_info().replace("\n", "<BR/>")
+    except:
+        fmt["info"] = "Unknown"
+    
+    fmt['icon'] = os.path.join(__RES_DIR, "database.png")
+
+    return __DB_HTML_TEMPLATE.format(**fmt)
+
+__DEV_ATTR_RW_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td colspan="2" bgcolor="{bgcolor}">{name} ({type}, {data_format}, {quality}) at {time}</td></tr>
+<tr><td bgcolor="#EEEEEE" width="140">value{r_dim}:</td>
+    <td bgcolor="#EEEEEE">{value}</td></tr>
+<tr><td bgcolor="#EEEEEE" width="140">w_value{w_dim}:</td>
+    <td bgcolor="#EEEEEE">{w_value}</td></tr>
+</table>"""
+
+__DEV_ATTR_RO_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td colspan="2" bgcolor="{bgcolor}">{name} ({type}, {data_format}, {quality}) at {time}</td></tr>
+<tr><td bgcolor="#EEEEEE" width="140">value{r_dim}:</td>
+    <td bgcolor="#EEEEEE">{value}</td></tr>
+</table>"""
+
+__DEV_ATTR_ERR_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td bgcolor="#FF0000">{name} ({type}, {data_format}, {quality}) at {time}</td></tr>
+<tr><td bgcolor="#EEEEEE">{error}</td></tr>
+</table>"""
+
+QUALITY_TO_HEXCOLOR_STR = {
+    PyTango.AttrQuality.ATTR_VALID : ("#00FF00", "#000000"),
+    PyTango.AttrQuality.ATTR_INVALID : ("#808080", "#FFFFFF"),    
+    PyTango.AttrQuality.ATTR_ALARM : ("#FF8C00", "#FFFFFF"),    
+    PyTango.AttrQuality.ATTR_WARNING : ("#FF8C00", "#FFFFFF"),    
+    PyTango.AttrQuality.ATTR_CHANGING : ("#80A0FF", "#000000"),
+    None : ("#808080", "#000000"), 
+}
+
+def display_deviceattribute_html(da):
+    """displayhook function for PyTango.DeviceAttribute, rendered as HTML"""
+    fmt = dict(name=da.name, type=da.type, data_format=da.data_format)
+    template = None
+    if da.has_failed:
+        fmt['error'] = "\n".join(map(str, da.get_err_stack())).replace("\n", "<br/>")
+        
+        template = __DEV_ATTR_ERR_HTML_TEMPLATE
+    else:
+        rd, wd = da.r_dimension, da.w_dimension
+        if wd.dim_x == 0 and wd.dim_y == 0 and da.w_value is None:
+            template = __DEV_ATTR_RO_HTML_TEMPLATE
+        else:
+            template = __DEV_ATTR_RW_HTML_TEMPLATE
+            fmt['w_value'] = str(da.w_value)
+            if da.data_format == PyTango.AttrDataFormat.SCALAR:
+                fmt['w_dim'] = ""
+            else:
+                fmt['w_dim'] = "<br/>(%d, %d)" % (wd.dim_x, wd.dim_y)
+        fmt['bgcolor'], fmt['fgcolor'] = QUALITY_TO_HEXCOLOR_STR[da.quality]
+        fmt['quality'] = str(da.quality)
+        if da.data_format == PyTango.AttrDataFormat.SCALAR:
+            fmt['r_dim'] = ""
+        else:
+            fmt['r_dim'] = "<br/>(%d, %d)" % (rd.dim_x, rd.dim_y)
+        fmt['value'] = str(da.value)
+        fmt['time'] = str(da.time.todatetime())
+    return template.format(**fmt)
+
+__GROUP_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td width="100" bgcolor="#EEEEEE">Name:</td><td bgcolor="#EEEEEE">{name}</td></tr>
+<tr><td width="100" bgcolor="#EEEEEE">Size:</td><td bgcolor="#EEEEEE">{size}</td></tr>
+<tr><td width="100" bgcolor="#EEEEEE">Devices:</td><td bgcolor="#EEEEEE">{devices}</td></tr>
+</table>"""
+
+def display_group_html(group):
+    devices = group.get_device_list()
+    devices = ", ".join(devices)
+    fmt=dict(name=group.get_name(), size=group.get_size(), devices=devices) 
+    return __GROUP_HTML_TEMPLATE.format(**fmt)
+
+__GROUP_REPLY_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td bgcolor="#EEEEEE">{name}</td></tr>
+<tr><td>{data}</td></tr>    
+"""
+
+__GROUP_REPLY_ERR_HTML_TEMPLATE = """\
+<table border="0" cellpadding="2" width="100%">
+<tr><td bgcolor="#FF0000">{name}</td></tr>
+<tr><td bgcolor="#EEEEEE">{error}</td></tr>
+</table>"""
+    
+def display_groupreply_html(gr):
+    fmt = dict(name="%s/%s" % (gr.dev_name(), gr.obj_name()))
+    template = None
+    if gr.has_failed():
+        fmt['error'] = "\n".join(map(str, gr.get_err_stack())).replace("\n", "<br/>")
+        template = __GROUP_REPLY_ERR_HTML_TEMPLATE
+    else:
+        template = __GROUP_REPLY_HTML_TEMPLATE
+        data = gr.get_data()
+        if isinstance(data, PyTango.DeviceAttribute):
+            data = display_deviceattribute_html(data)
+        fmt["data"] = data
+        
+    ret = template.format(**fmt)
+    return ret
+
+def init_display(ip):
+    html_formatter = ip.display_formatter.formatters["text/html"]
+    html_formatter.for_type(PyTango.DeviceProxy, display_deviceproxy_html)
+    html_formatter.for_type(PyTango.Database, display_database_html)
+    html_formatter.for_type(PyTango.DeviceAttribute, display_deviceattribute_html)
+    html_formatter.for_type(PyTango.Group, display_group_html)
+    html_formatter.for_type(PyTango.GroupAttrReply, display_groupreply_html)
+    html_formatter.for_type(PyTango.GroupCmdReply, display_groupreply_html)
+
+    
+def init_ipython(ip=None, store=True, pytango=True, colors=True, console=True,
+                 magic=True):
+
+    if ip is None:
+        ip = get_ipapi()
+    
+    global _tango_init
+    if _tango_init is True: return
+
+    init_display(ip)
+    
+    if pytango:
+        init_pytango(ip)
+    
+    init_db()
+
+    if magic:
+        init_magic(ip)
+    
+    _tango_init = True
+
+def load_config(config):
+    import PyTango.ipython
+    import IPython.utils.coloransi
+    
+    d = { "version" : str(PyTango.ipython.get_pytango_version()),
+          "pyver" : str(PyTango.ipython.get_python_version()),
+          "ipyver" : str(PyTango.ipython.get_ipython_version()),
+          "pytangover" : str(PyTango.ipython.get_pytango_version()), }
+    d.update(IPython.utils.coloransi.TermColors.__dict__)
+
+    so = Struct(
+        tango_banner="""%(Blue)shint: Try typing: mydev = Device("%(LightBlue)s<tab>%(Normal)s\n""")
+
+    so = config.get("tango_options", so)
+
+    ipy_ver = PyTango.ipython.get_ipython_version()
+    
+    # ------------------------------------
+    # Application
+    # ------------------------------------
+    app = config.Application
+    app.log_level = 30
+
+    # ------------------------------------
+    # InteractiveShell
+    # ------------------------------------
+    i_shell = config.InteractiveShell
+    i_shell.colors = 'Linux'
+
+    if ipy_ver >= "0.12":
+        # ------------------------------------
+        # PromptManager (ipython >= 0.12)
+        # ------------------------------------
+        prompt = config.PromptManager
+        prompt.in_template = 'ITango [\\#]: '
+        prompt.out_template = 'Result [\\#]: '
+    else:
+        # (Deprecated in ipython >= 0.12 use PromptManager.in_template)
+        i_shell.prompt_in1 = 'ITango [\\#]: '
+        # (Deprecated in ipython >= 0.12 use PromptManager.out_template)
+        i_shell.prompt_out = 'Result [\\#]: '
+    
+    # ------------------------------------
+    # InteractiveShellApp
+    # ------------------------------------
+    i_shell_app = config.InteractiveShellApp
+    extensions = getattr(i_shell_app, 'extensions', [])
+    extensions.append('PyTango.ipython')
+    i_shell_app.extensions = extensions
+    
+    # ------------------------------------
+    # TerminalIPythonApp: options for the IPython terminal (and not Qt Console)
+    # ------------------------------------
+    term_app = config.TerminalIPythonApp
+    term_app.display_banner = True
+    #term_app.nosep = False
+    #term_app.classic = True
+    
+    # ------------------------------------
+    # IPKernelApp: options for the  Qt Console
+    # ------------------------------------
+    #kernel_app = config.IPKernelApp
+    ipython_widget = config.IPythonWidget
+    ipython_widget.in_prompt  = 'ITango [<span class="in-prompt-number">%i</span>]: '
+    ipython_widget.out_prompt = 'Result [<span class="out-prompt-number">%i</span>]: '
+    
+    #zmq_i_shell = config.ZMQInteractiveShell
+    
+    # ------------------------------------
+    # TerminalInteractiveShell
+    # ------------------------------------
+    term_i_shell = config.TerminalInteractiveShell
+    banner = """\
+%(Purple)sITango %(version)s%(Normal)s -- An interactive %(Purple)sTango%(Normal)s client.
+
+Running on top of Python %(pyver)s, IPython %(ipyver)s and PyTango %(pytangover)s
+
+help      -> ITango's help system.
+object?   -> Details about 'object'. ?object also works, ?? prints more.
+"""
+    
+    banner = banner % d
+    banner = banner.format(**d)
+    tango_banner = so.tango_banner % d
+    tango_banner = tango_banner.format(**d)
+    all_banner = "\n".join((banner, tango_banner))
+
+    term_i_shell.banner1 = banner
+    term_i_shell.banner2 = tango_banner
+
+    # ------------------------------------
+    # FrontendWidget
+    # ------------------------------------
+    frontend_widget = config.ITangoConsole
+    frontend_widget.banner = all_banner
+    
+def load_ipython_extension(ipython):
+    # The ``ipython`` argument is the currently active
+    # :class:`InteractiveShell` instance that can be used in any way.
+    # This allows you do to things like register new magics, plugins or
+    # aliases.
+    init_ipython(ip=ipython, store=False, colors=False)
+
+def unload_ipython_extension(ipython):
+    # If you want your extension to be unloadable, put that logic here.
+    #print "Unloading PyTango IPython extension"
+    pass
+
+def run():
+
+    # overwrite the original IPython Qt widget with our own so we can put a
+    # customized banner. IPython may have been installed without Qt support so we
+    # protect this code against an import error
+    try:
+        from IPython.utils.traitlets import Unicode
+        from IPython.qt.console.rich_ipython_widget import RichIPythonWidget
+
+        class ITangoConsole(RichIPythonWidget):
+            
+            banner = Unicode(config=True)
+
+            def _banner_default(self):
+                config = get_config()
+                return config.ITangoConsole.banner
+
+        import IPython.qt.console.qtconsoleapp
+        IPythonQtConsoleApp = IPython.qt.console.qtconsoleapp.IPythonQtConsoleApp
+        IPythonQtConsoleApp.widget_factory = ITangoConsole      
+    except ImportError:
+        pass
+
+    argv = sys.argv
+
+    try:
+        for i, arg in enumerate(argv[:1]):
+            if arg.startswith('--profile='):
+                break
+        else:
+            argv.append("--profile=tango")
+    except:
+        pass    
+        
+    launch_new_instance()
+    

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-science/packages/pytango.git



More information about the debian-science-commits mailing list