[Da-tools-commits] ./da-tools/userdir-ldap-cgi-python r9: building all new in OOP style.
Robin Wittler
real at geek-at-work.org
Tue Jan 20 17:29:13 UTC 2009
------------------------------------------------------------
revno: 9
committer: Robin Wittler <real at geek-at-work.org>
branch nick: userdir-ldap-cgi-python
timestamp: Tue 2009-01-20 18:29:13 +0100
message:
building all new in OOP style.
modified:
ud_ldap_ng/lib/ud_ldap_ng.py
-------------- next part --------------
=== modified file 'ud_ldap_ng/lib/ud_ldap_ng.py'
--- a/ud_ldap_ng/lib/ud_ldap_ng.py 2009-01-13 21:13:30 +0000
+++ b/ud_ldap_ng/lib/ud_ldap_ng.py 2009-01-20 17:29:13 +0000
@@ -1,169 +1,329 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-
-__version__ = '0.0.2'
+__version__ = '0.0.4'
__author__ = 'Robin Wittler <real at geek-at-work.org>'
__license__ = 'GPL3'
import ldap
-def connectDebianLDAP(uid=None, password=None):
- '''Connect to the Debian LDAP with uid and password.
- If uid and/or password is None (the default)
- then connectLDAP uses an empty uid and password.
-
- @param uid The uid that should be used to connect
- @type uid str or None
- @param password The password tha should used to connect
- @type password str or None
- @return The connection object
- '''
- host = 'db.debian.org'
- con = ldap.open(host)
- if not uid:
- uid = ""
- if not password:
- password = ""
- con.simple_bind_s(uid, password)
- return con
-
-def getDebianHost(con, host):
- '''getDebianHost takes a ldap connection object and returns
- recursive all host attributes from debian ldap for given host.
-
- @param con The ldap connection object
- @type con ldap connection object
- @param host The hostname
- @type host str
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- if not type(host) == str:
- raise TypeError('host must be type str')
- dn = 'host=%s,ou=hosts,dc=debian,dc=org' %(host)
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope))
-
-def getDebianHostAttributes(con, host, attr_list):
- '''getDebianHostAttributes takes a ldap connection object and returns
- all given host attributes
-
- @param con The ldap connection object
- @type con ldap connection object
- @param host The hostname
- @type host str
- @param attr_list The list of Attributes to query
- @type attr_list list
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- if not type(host) == str:
- raise TypeError('host must be type str')
- if not type(attr_list) == list:
- raise TypeError('attr_list must be type list')
- for entry in attr_list:
- if not type(entry) == str:
- raise TypeError('entry %s in attr_list must be type str' %(repr(entry)))
- dn = 'host=%s,ou=hosts,dc=debian,dc=org' %(host)
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope, attrlist=attr_list))
-
-def getDebianHosts(con):
- '''getDebianHosts takes a ldap connection object and returns
- recursive all host entries with all attributes from the debian ldap.
-
- @param con The ldap connection object
- @type con ldap connection object
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- dn = 'ou=hosts,dc=debian,dc=org'
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope))
-
-def getDebianHostsAttributes(con, attr_list=['hostname', 'architecture', 'sponsor', 'purpose', 'status']):
- '''getDebianHostsAttributes takes a ldap connection object
- and returns the given optional attributes recursive for all hosts
- from debian ldap.
- Defaults for attr_list are: hostname, architecture, sponsor, purpose, status
-
- @param con The ldap connection object
- @type con ldap connection object
- @param attr_list The list of attributes that should be searched for (optional)
- @type attr_list list
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- if not type(attr_list) == list:
- raise TypeError('attr_list must be type list')
- for entry in attr_list:
- if not type(entry) == str:
- raise TypeError('entry %s in attr_list must be type str' %(repr(entry)))
- dn = 'ou=hosts,dc=debian,dc=org'
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope, attrlist=attr_list))
-
-def getDebianUser(con, user):
- '''getDebianUser takes a ldap connection object and returns
- all attributes for the given user.
-
- @param con The ldap connection object
- @type con ldap connection object
- @param user The username you wanne query for
- @type user str
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- if not type(user) == str:
- raise TypeError('user must be type str')
- dn = 'uid=%s,ou=users,dc=debian,dc=org' %(user)
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope))
-
-def getDebianUserAttributes(con, user, attr_list):
- '''getDebianUserAttributes takes a ldap connection object and returns
- the given attributes from attr_list for a given user from the ldap.
-
- @param con The ldap connection object
- @type con ldap connection object
- @param user The username you wanne query for
- @type user str
- @param attr_list The list of attributes that should be searched for
- @type attr_list list
- @return A generator object with the ldap answer
- '''
- if not hasattr(con, 'search_s'):
- raise AttributeError('con must have a "search_s" attribute')
- if not type(user) == str:
- raise TypeError('user must be type str')
- for entry in attr_list:
- if not type(entry) == str:
- raise TypeError('entry %s in attr_list must be type str' %(repr(entry)))
- dn = 'uid=%s,ou=users,dc=debian,dc=org' %(user)
- scope = ldap.SCOPE_SUBTREE
- return prepareLDAPResult(con.search_s(dn, scope, attrlist=attr_list))
-
-def prepareLDAPResult(ldapresult):
- '''prepareLDAPResult takes an ldap result, removes the
- ldap path and yield the results.
-
- @param ldapresult The ldap result
- @type ldapresult list
- @yield the ldap result
- '''
- if not type(ldapresult) == list:
- raise TypeError('ldapresult must be type list')
- for entrys in ldapresult:
- for entry in entrys:
- if type(entry) == dict and entry:
- yield entry
-
-__all__ = ['connectDebianLDAP', 'getDebianHost', 'getDebianHostAttributes', 'getDebianHosts',
- 'getDebianHostsAttributes', 'getDebianUser', 'getDebianUserAttributes', 'prepareLDAPResult']
+class DebianLDAPResult(object):
+ '''This class represents a ldap result.
+ If the optional argument adjust is True, this class automaticly adjust all
+ entrys in this way:
+
+ not adjusted format:
+ [
+ ((host='test',ou='hosts',dc='debian',dc='org'), {'hostname': ['test'], 'purpose': ['prod'], 'sponsor': ['Nice Guy Sponsor'], 'status': ['down']}),
+ ((host='test2',ou='hosts',dc='debian',dc='org'), {'hostname': ['test2'], 'sponsor': ['Nice Guy Sponsor'], 'status': ['down']})
+ ]
+
+ in the not adjusted format the second entry misses one attribute: purpose
+
+
+ adjusted format:
+ [
+ ((host='test',ou='hosts',dc='debian',dc='org'), {'hostname': ['test'], 'purpose': ['prod'], 'sponsor': ['Nice Guy Sponsor'], 'status': ['down']}),
+ ((host='test2',ou='hosts',dc='debian',dc='org'), {'hostname': ['test2'], 'purpose': ['Unknown'], 'sponsor': ['Nice Guy Sponsor'], 'status': ['down']})
+ ]
+
+ after the adjustResults method, the second entry has the missing attribut.
+ So, the adjustResults method looks over all attributes
+
+ '''
+ def __new__(cls, results, adjust=0):
+ cls.isRawLDAPResult(results)
+ cls.__original_results = results
+ if adjust:
+ cls.__results = cls.adjustResults(results)
+ else:
+ cls.__results = results
+ return object.__new__(cls)
+
+ @classmethod
+ def isRawLDAPResult(cls, results):
+ '''Checks if a iterable is a wellformed ldap result'''
+ try:
+ i = iter(results)
+ except TypeError:
+ raise TypeError('results must be iterable')
+ for tuple_nr, result_tuple in enumerate(results):
+ if not type(result_tuple) == tuple:
+ raise TypeError('item %s in results must be type tuple' %(tuple_nr))
+ if not len(result_tuple) == 2:
+ raise ValueError('tuple %s in results must have 2 values.' %(tuple_nr))
+ if not type(result_tuple[0]) == str:
+ raise ValueError(' item 0 in tuple %s must be type str' %(tuple_nr))
+ if not type(result_tuple[1]) == dict:
+ raise ValueError(' item 1 in tuple %s must be type dict' %(tuple_nr))
+ return True
+
+ @classmethod
+ def isDebianLDAPResult(cls, results):
+ '''Checks if a iterable is a wellformed DebianLDAPResult.'''
+ try:
+ i = iter(results)
+ except TypeError:
+ raise TypeError('results must be iterable')
+ for nr, result in enumerate(results):
+ if not type(result) == dict:
+ raise TypeError('item %s in results must be type dict' %(nr))
+ return True
+
+ @classmethod
+ def adjustResults(cls, results, default_value=['Unknown',]):
+ all_attr = cls.__getAllAttrFromRawResult(results)
+ ret_list = list()
+ cls.isRawLDAPResult(results)
+ for dn, result in results:
+ if not result:
+ results.remove((dn, result))
+ continue
+ for key in all_attr:
+ if key not in result:
+ result.setdefault(key, default_value)
+ ret_list.append(result)
+ return ret_list
+
+ @classmethod
+ def sortDebianResults(cls, keyword, reverse):
+ if not type(keyword) == str and keyword:
+ raise TypeError('keyword must be type str, None or False')
+ if not keyword in cls.__getAllAttrFromDebianResult(cls.__results):
+ raise KeyError('keyword exist not in results')
+ if not type(reverse) == bool:
+ raise TypeError('reverse must be type bool')
+ ret_list = list()
+ for result in cls.__results:
+ if not keyword in result:
+ raise KeyError('keyword is not in result: %s' %(result))
+ ret_list.append((result.get(keyword), result))
+ ret_list = sorted(ret_list, reverse=reverse)
+ for keyword, result in [(i,k) for i,k in ret_list]:
+ ret_list.remove((keyword, result))
+ ret_list.append(result)
+ return ret_list
+
+ @classmethod
+ def __getAllAttrFromRawResult(cls, results):
+ all_attr = set()
+ cls.isRawLDAPResult(results)
+ for dn, result in results:
+ for key in result:
+ all_attr.add(key)
+ return all_attr
+
+ @classmethod
+ def __getAllAttrFromDebianResult(cls, results):
+ all_attr = set()
+ cls.isDebianLDAPResult(results)
+ for result in results:
+ for key in result:
+ all_attr.add(key)
+ return all_attr
+
+ @classmethod
+ def getOriginalResults(cls):
+ return iter(cls.__original_results)
+
+ @classmethod
+ def __iter__(cls):
+ return iter(cls.__results)
+
+class MyLDAP(object):
+ '''This is a simple wrapper around ldap.
+ There are some Methods added to questioning the ldap for some values.'''
+ def connect(self, host='', uid='', password=''):
+ '''Connects and binds to a ldap. Takes three args: host, uid and password.
+ host defaults to an empty string, uid defaults to an empty string and
+ password defaults to an empty string. Returns None.
+
+ @param host The host to connect to.
+ @type host str
+ @param uid The uid used while connecting.
+ @type uid str
+ @param password The password used while connecting.
+ @type password str
+ @return None
+ '''
+ if getattr(self, 'con', None):
+ raise Exception('The "connect" Method was called before.')
+ for key, value in dict(host=host, uid=uid, password=password).items():
+ if not type(value) == str:
+ raise TypeError('%s must be type str' %(key))
+ self.con = ldap.open(host)
+ self.host = host
+ self.uid = uid
+ self.con.simple_bind_s(self.uid, password)
+ return None
+
+ def getHost(self):
+ '''Returns the host string
+
+ @return str'''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ return self.__host
+
+ def setHost(self, value):
+ '''Set the host string and returns it.
+ Normaly it will be called by the connect method and is immutable.
+
+ @param value The string you want set as host
+ @type value str
+ @return str'''
+ if not getattr(self, 'host', None):
+ if not type(value) == str:
+ raise TypeError('value must be type str')
+ self.__host = value
+ return self.__host
+
+ def delHost(self):
+ '''Simply returns the host string insted of deleting it.
+
+ @return str'''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ return self.__host
+
+ def getUid(self):
+ '''Returns the uid string
+
+ @return str'''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ return self.__uid
+
+ def setUid(self, value):
+ '''Set the uid string and returns it.
+
+ @param value The string you want to set as uid.
+ @type value str
+ @return str'''
+ if not getattr(self, 'uid', None):
+ if not type(value) == str:
+ raise TypeError('value must be type str')
+ self.__uid = value
+ return self.__uid
+
+ def delUid(self):
+ '''Simply returns the uid string insted of deleting it.
+
+ @return str'''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ return self.__uid
+
+ def getCon(self):
+ '''Returns the LDAP Connection Object.
+
+ @return ldap.ldapobject.SimpleLDAPObject'''
+ return self.__con
+
+ def setCon(self, value):
+ '''Set the LDAP Connection Object.
+
+ @param value The LDAP Connection Object.
+ @type value ldap.ldapobject.SimpleLDAPObject
+ @return None'''
+ if not isinstance(value, ldap.ldapobject.SimpleLDAPObject):
+ raise TypeError('value must be instance of ldap.ldapobject.SimpleLDAPObject')
+ for attr in dir(ldap.ldapobject.SimpleLDAPObject):
+ if not getattr(value, attr, None):
+ raise AttributeError('value must have a %s attribute' %(attr))
+ if not getattr(self, 'con', None):
+ self.__con = value
+
+ def delCon(self):
+ '''Simply returns None insted of deleting the LDAP Connection Object'''
+ return None
+
+ def ldapSearch(self, dn, scope, filterstr='(objectClass=*)', attrlist=None, attrsonly=0):
+ '''This method takes 4 arguments and forms a syncron ldap request from these args.
+ It simply returns the ldap result.
+
+ @param dn The ldap search dn.
+ @type dn str
+ @param scope The ldap Scope result depth
+ @type scope int
+ @param filterstr The ldap search filter
+ @type filterstr int
+ @param attrlist The attributes to explizit search for
+ @type attrlist list
+ @param attrsonly Set this to get only the attributes from ldap
+ @type attrsonly int
+ @return list
+ '''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ if not type(dn) == str:
+ raise TypeError('dn must be type str')
+ if not type(scope) == int:
+ raise TypeError('scope must be type int')
+ if not type(filterstr) == str:
+ raise TypeError('filterstr must be type str')
+ if not type(attrlist) == list and attrlist:
+ raise TypeError('attrlist must be type list or None')
+ if attrlist:
+ for nr, value in enumerate(attrlist):
+ if not type(value) == str:
+ raise TypeError('value %s in attrlist must be type str' %(nr))
+ if not type(attrsonly) == int:
+ raise TypeError('attrsonly must be type int')
+ return self.con.search_s(dn, scope, filterstr=filterstr, attrlist=attrlist, attrsonly=attrsonly)
+
+ host = property(getHost, setHost, delHost, 'This is the host property.')
+ uid = property(getUid, setUid, delUid, 'This is the uid property.')
+ con = property(getCon, setCon, delCon, 'This is the LDAP Connection Object property.')
+
+
+class AnonymousDebianLDAP(MyLDAP):
+ def __init__(self):
+ self.connect('db.debian.org')
+ def getDebianHost(self, host, attrlist=None):
+ '''getDebianHost returns recursive all host attributes from debian ldap for given host.
+ Optional you can submit a list of attributes in which you are only interested.
+
+ @param host The host to query for
+ @type host str
+ @param attrlist An Optional list of Attributes to query for
+ @type attrlist list
+ @return A generator object with the ldap answer
+ '''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ if not type(host) == str:
+ raise TypeError('host must be type str')
+ dn = 'host=%s,ou=hosts,dc=debian,dc=org' %(host)
+ scope = ldap.SCOPE_SUBTREE
+ return DebianLDAPResult(self.ldapSearch(dn, scope, attrlist=attrlist))
+
+
+ def getDebianHosts(self, attrlist=None, keyword='hostname', reversed=False):
+ '''getDebianHosts returns recursive all hosts and all hosts attributes from debian ldap.
+ Optional you can submit a list of attributes in which you are only interested.
+ You can submit a keyword (which defaults to 'hostname') and then the returned list will
+ be sorted by this keyword. If the optional arg reversed set to True,
+ the returned list will be reversed.
+
+ @param attrlist An Optional list of Attributes to query for
+ @type attrlist list
+ @param keyword The keyword to sort
+ @type keyword str
+ @param reversed reverse the sort
+ @type reversed bool
+ @return ResultObject
+ '''
+ if not getattr(self, 'con', None):
+ raise Exception('You must call the "connect" Method before.')
+ if not type(keyword) == str:
+ raise TypeError('keyword must be type str')
+ if not type(reversed) == bool:
+ raise TypeError('reversed must be type bool')
+ dn = 'ou=hosts,dc=debian,dc=org'
+ scope = ldap.SCOPE_SUBTREE
+ return DebianLDAPResult(self.ldapSearch(dn, scope, attrlist=attrlist))
+
+__all__ = ['MyLDAP', 'DebianLDAP']
if __name__ == '__main__':
pass
More information about the Da-tools-commits
mailing list