[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, debian, updated. upstream/0.9.5.5-717-g0f98819
Daniele Ricci
daniele.athome at gmail.com
Sat Aug 6 08:20:07 UTC 2011
The following commit has been merged in the debian branch:
commit 21deeb53b2f1b50373c75709e3263a671bf5a081
Author: Daniele Ricci <daniele.athome at gmail.com>
Date: Fri Oct 15 14:13:23 2010 +0200
PIM.Messages: MessageRead changed to MessageUnread
PIM.Messages: query for retrieving threads
Signed-off-by: Daniele Ricci <daniele.athome at gmail.com>
diff --git a/framework/subsystems/opimd/db_handler.py b/framework/subsystems/opimd/db_handler.py
index cfdcad9..701d679 100644
--- a/framework/subsystems/opimd/db_handler.py
+++ b/framework/subsystems/opimd/db_handler.py
@@ -79,6 +79,12 @@ rootdir = os.path.join( rootdir, 'opim' )
_SQLITE_FILE_NAME = os.path.join(rootdir,'pim.db')
+def dict_factory(cursor, row, skip_field = None):
+ d = {}
+ for idx, col in enumerate(cursor.description):
+ if col[0] != skip_field:
+ d[col[0]] = row[idx]
+ return d
class DbHandler(object):
con = None
@@ -306,6 +312,33 @@ class DbHandler(object):
query = query + " LIMIT ?"
params.append(int(query_desc['_limit']))
return {'Query':query, 'Parameters':params}
+
+ def build_sql_query(self, query_desc):
+ """Modify a raw SQL query with some others rules."""
+
+ query = query_desc['sql']
+ params = []
+
+ for name, value in query_desc.iteritems():
+ #skip system fields
+ if name.startswith('_'):
+ #FIXME: put this in a central place!
+ if name not in ('_limit', '_resolve_phonenumber', '_retrieve_full_contact'):
+ raise InvalidField("Query rule '%s' does not exist." % (name, ))
+ else:
+ continue
+ elif name.startswith('@'):
+ if name[1:] not in DomainManager.get_domains():
+ raise InvalidField("Domain '%s' does not exist." % (name[1:], ))
+ else:
+ continue
+
+ if '_limit' in query_desc:
+ query = query + " LIMIT ?"
+ params.append(int(query_desc['_limit']))
+
+ return {'Query':query, 'Parameters':params}
+
def sanitize_result(self, raw):
map = {}
@@ -319,13 +352,22 @@ class DbHandler(object):
map[field] = name
return map
- def get_full_result(self, raw_result, join_parameters):
+ def get_full_result(self, raw_result, join_parameters, cursor = None):
if raw_result == None:
return None
#convert from a list of tuples of ids to a list of ids
ids = map(lambda x: x[0], raw_result)
- return self.get_content(ids, join_parameters)
-
+ if cursor:
+ try:
+ skip_field = cursor.description[0][0]
+ except IndexError:
+ skip_field = None
+ other_fields = map(lambda x: dict_factory(cursor, x, skip_field), raw_result)
+ else:
+ other_fields = []
+
+ return self.get_content(ids, join_parameters, other_fields)
+
def query(self, query_desc):
#FIXME: join_parametrs should be cool, and not just a simple hash
join_parameters = {}
@@ -340,14 +382,33 @@ class DbHandler(object):
cur = self.con.cursor()
cur.execute(query['Query'], query['Parameters'])
- res = self.get_full_result(cur.fetchall(), join_parameters)
+ res = self.get_full_result(cur.fetchall(), join_parameters, cur)
cur.close()
return res
-
- def get_content(self, ids, join_parameters):
+
+ def raw_sql(self, query_desc):
+ #FIXME: join_parametrs should be cool, and not just a simple hash
+ join_parameters = {}
+ query = self.build_sql_query(query_desc)
+ if query == None:
+ logger.error("Failed creating threads query for %s", str(query_desc))
+ raise QueryFailed("Failed creating threads query.")
+ if query_desc.get('_resolve_phonenumber'):
+ join_parameters['resolve'] = True
+ if query_desc.get('_retrieve_full_contact'):
+ join_parameters['full'] = True
+
+ cur = self.con.cursor()
+ cur.execute(query['Query'], query['Parameters'])
+ res = self.get_full_result(cur.fetchall(), join_parameters, cur)
+ cur.close()
+ return res
+
+ def get_content(self, ids, join_parameters, other_fields = []):
cur = self.con.cursor()
res = []
query = self.build_retrieve_query(join_parameters)
+ row_index = 0
for id in ids:
cur.execute(query, {'id': id})
tmp = self.sanitize_result(cur.fetchall())
@@ -365,6 +426,13 @@ class DbHandler(object):
pass
tmp['Path'] = self.domain.id_to_path(id)
tmp['EntryId'] = id
+ try:
+ for field, value in other_fields[row_index].iteritems():
+ tmp[field] = value
+ except IndexError:
+ pass
+
+ row_index += 1
res.append(tmp)
cur.close()
return res
diff --git a/framework/subsystems/opimd/pimd_messages.py b/framework/subsystems/opimd/pimd_messages.py
index f2f7119..1091803 100644
--- a/framework/subsystems/opimd/pimd_messages.py
+++ b/framework/subsystems/opimd/pimd_messages.py
@@ -35,7 +35,7 @@ from domain_manager import DomainManager, Domain
from helpers import *
from opimd import *
-from query_manager import QueryMatcher, SingleQueryHandler
+from query_manager import SingleQueryHandler, SingleRawSQLQueryHandler
import framework.patterns.tasklet as tasklet
from framework.config import config, busmap
@@ -123,6 +123,58 @@ class QueryManager(DBusFBObject):
return _DBUS_PATH_QUERIES + '/' + str(query_id)
+ def process_query_threads(self, query, dbus_sender):
+ """Handles a query for threads and returns the dbus path of the newly created query result
+
+ @param dbus_sender Sender's unique name on the bus
+ @return dbus path of the query result"""
+
+ db_prefix = self.db_handler.db_prefix
+ query['sql'] = """
+SELECT m.messages_id messages_id,
+ (
+ SELECT count(*) FROM
+ messages_boolean b
+ OUTER LEFT JOIN
+ messages_phonenumber p
+ ON b.messages_id = p.messages_id AND
+ b.field_name = 'MessageUnread'
+ WHERE b.value = '1' AND
+ p.field_name = 'Peer' AND p.value = p1.value
+ ) UnreadCount
+ FROM (
+ messages m
+ JOIN
+ messages_date t
+ USING (messages_id)
+ ) JOIN messages_phonenumber p1
+ USING (messages_id)
+
+ WHERE t.field_name = 'Timestamp' AND
+ t.value IN (
+ SELECT max(timestamp) timestamp FROM (
+ (
+ SELECT date_t.messages_id AS messages_id, date_t.value AS timestamp FROM
+ messages_date AS date_t
+ WHERE date_t.field_name = 'Timestamp'
+ ) res_t
+ JOIN messages_phonenumber num_t ON
+ res_t.messages_id = num_t.messages_id AND num_t.field_name = 'Peer'
+ )
+ GROUP BY num_t.value
+ )
+ ORDER BY t.value DESC
+ """
+
+ query_handler = SingleRawSQLQueryHandler(query, self.db_handler, dbus_sender)
+
+ query_id = self._next_query_id
+ self._next_query_id += 1
+
+ self._queries[query_id] = query_handler
+
+ return _DBUS_PATH_QUERIES + '/' + str(query_id)
+
def check_new_entry(self, entry_id):
"""Checks whether a newly added entry matches one or more queries so they can signal clients
@@ -217,14 +269,14 @@ class MessageDomain(Domain, GenericDomain):
_unread_messages = None
DEFAULT_FIELDS = {
- 'Peer' : 'phonenumber',
- 'Source' : 'text',
- 'Direction' : 'text',
- 'MessageSent' : 'boolean',
- 'MessageRead' : 'boolean',
- 'Timestamp' : 'date',
- 'Timezone' : 'timezone',
- 'Content' : 'text'
+ 'Peer' : 'phonenumber',
+ 'Source' : 'text',
+ 'Direction' : 'text',
+ 'MessageSent' : 'boolean',
+ 'MessageUnread' : 'boolean',
+ 'Timestamp' : 'date',
+ 'Timezone' : 'timezone',
+ 'Content' : 'text'
}
def __init__(self):
"""Creates a new MessageDomain instance"""
@@ -246,7 +298,7 @@ class MessageDomain(Domain, GenericDomain):
self.fso_handler = MessagesFSO(self)
- self._unread_messages = len(self.db_handler.query({'Direction': 'in', 'MessageRead':0}))
+ self._unread_messages = len(self.db_handler.query({'Direction': 'in', 'MessageUnread':1}))
#---------------------------------------------------------------------#
# dbus methods and signals #
@@ -266,14 +318,14 @@ class MessageDomain(Domain, GenericDomain):
@param message_data List of fields; format is [Key:Value, Key:Value, ...]
@return URI of the newly created d-bus message object"""
- read = entry_data.get('MessageRead')
+ unread = entry_data.get('MessageUnread')
#Make sure is boolean
- if read == None:
- read = 0
+ if unread == None:
+ unread = 0
else:
- read = int(entry_data.get('MessageRead'))
+ unread = int(unread)
- if entry_data.get('Direction') == 'in' and not read:
+ if entry_data.get('Direction') == 'in' and unread:
self._unread_messages += 1
self.UnreadMessages(self._unread_messages)
@@ -314,6 +366,15 @@ class MessageDomain(Domain, GenericDomain):
return self.query_manager.process_query(query, sender)
+ @dbus_method(_DIN_MESSAGES, "a{sv}", "s", sender_keyword="sender")
+ def QueryThreads(self, query, sender):
+ """Creates a new query for threads and returns the URI of the resulting query object
+
+ @param sender Unique name of the query sender on the bus
+ @return URI of the query object, e.g. /org.pyneo.PIM/Messages/Queries/4"""
+
+ return self.query_manager.process_query_threads(query, sender)
+
@dbus_method(_DIN_MESSAGES, "", "i")
def GetUnreadMessages(self):
@@ -360,14 +421,14 @@ class MessageDomain(Domain, GenericDomain):
self.check_entry_id(num_id)
message = self.get_content(num_id)
- read = message.get('MessageRead')
+ unread = message.get('MessageUnread')
#Make sure is boolean
- if read == None:
- read = 0
+ if unread == None:
+ unread = 0
else:
- read = int(message.get('MessageRead'))
+ unread = int(unread)
- if not read and message.get('Direction') == 'in':
+ if unread and message.get('Direction') == 'in':
self._unread_messages -= 1
self.UnreadMessages(self._unread_messages)
@@ -392,15 +453,15 @@ class MessageDomain(Domain, GenericDomain):
message = self.get_content(num_id)
#FIXME: What if it was outgoing and is now incoming?
- old_read = message['MessageRead'] if message.has_key('MessageRead') else False
- new_read = data['MessageRead'] if data.has_key('MessageRead') else False
+ old_unread = message['MessageUnread'] if message.has_key('MessageUnread') else False
+ new_unread = data['MessageUnread'] if data.has_key('MessageUnread') else False
old_in = message['Direction'] == 'in' if message.has_key('Direction') else False
new_in = data['Direction'] == 'in' if data.has_key('Direction') else old_in
- if not old_read and old_in and new_read:
+ if old_unread and old_in and not new_unread:
self._unread_messages -= 1
self.UnreadMessages(self._unread_messages)
- elif (old_read and not new_read and new_in) or \
- (not old_read and not old_in and not new_read and new_in):
+ elif (not old_unread and new_unread and new_in) or \
+ (old_unread and not old_in and new_unread and new_in):
self._unread_messages += 1
self.UnreadMessages(self._unread_messages)
self.update(num_id, data)
@@ -464,12 +525,12 @@ class MessagesFSO(object):
if status in ('read', 'unread'):
entry['Direction'] = 'in'
- entry['MessageRead'] = 0
+ entry['MessageUnread'] = 1
else:
entry['Direction'] = 'out'
entry['MessageSent'] = 0
- if status == 'read': entry['MessageRead'] = 1
+ if status == 'read': entry['MessageUnread'] = 0
if status == 'sent': entry['MessageSent'] = 1
entry['Peer'] = number
@@ -523,7 +584,7 @@ class MessagesFSO(object):
if complete:
edit_data['SMS-complete_message']=1
edit_data['Content'] = new_content
- edit_data['MessageRead'] = 0
+ edit_data['MessageUnread'] = 1
self.domain.Update(edit_data, '/' + str(id))
else:
register = 1
@@ -636,7 +697,7 @@ class MessagesFSO(object):
entry['Content'] = msg
entry['Peer'] = number
entry['Direction'] = 'in'
- entry['MessageRead'] = 0
+ entry['MessageUnread'] = 1
entry['Source'] = 'SMS'
# We get number of quarters of an hours, convert to minutes:
zone = int(timestamp[18:]) * 15
diff --git a/framework/subsystems/opimd/query_manager.py b/framework/subsystems/opimd/query_manager.py
index 306d73e..61f570c 100644
--- a/framework/subsystems/opimd/query_manager.py
+++ b/framework/subsystems/opimd/query_manager.py
@@ -39,9 +39,7 @@ import db_handler
import logging
logger = logging.getLogger( MODULE_NAME )
-#----------------------------------------------------------------------------#
-class QueryMatcher(object):
-#----------------------------------------------------------------------------#
+class BaseQueryMatcher(object):
query_obj = None
def __init__(self, query):
@@ -59,30 +57,58 @@ class QueryMatcher(object):
assert(self.query_obj, "Query object is empty, cannot match!")
- matches = []
- results = db_handler.query(self.query_obj)
- return results
+ matches = []
+
+#----------------------------------------------------------------------------#
+class QueryMatcher(BaseQueryMatcher):
+#----------------------------------------------------------------------------#
+ def match(self, db_handler):
+ """Tries to match a db_handler to the current query
+
+ @param a db_handler
+ @return List of entry IDs that match"""
+
+ BaseQueryMatcher.match(self, db_handler)
+ return db_handler.query(self.query_obj)
+
+#----------------------------------------------------------------------------#
+class RawSQLQueryMatcher(BaseQueryMatcher):
+#----------------------------------------------------------------------------#
+ def match(self, db_handler):
+ """Tries to match a db_handler to the current query
+
+ @param a db_handler
+ @return List of entry IDs that match"""
+
+ def match(self, db_handler):
+ """Tries to match a db_handler to the current query
+
+ @param a db_handler
+ @return List of entry IDs that match"""
+
+ BaseQueryMatcher.match(self, db_handler)
+ return db_handler.raw_sql(self.query_obj)
#----------------------------------------------------------------------------#
-class SingleQueryHandler(object):
+class BaseQueryHandler(object):
+ """A base query handler to extend from."""
#----------------------------------------------------------------------------#
db_handler = None
query = None # The query this handler is processing
_entries = None
cursors = None # The next entry we'll serve, depending on the client calling us
- def __init__(self, query, db_handler, dbus_sender):
- """Creates a new SingleQueryHandler instance
+ def __init__(self, query, db_handler, matcher, dbus_sender):
+ """Creates a new BaseQueryHandler instance
@param query Query to evaluate
- @param entries Set of Entry objects to use
+ @param db_handler database handler
+ @param matcher prebuilt query matcher
@param dbus_sender Sender's unique name on the bus"""
self.query = query
self.sanitize_query()
- matcher = QueryMatcher(self.query)
-
self.db_handler = db_handler
self._entries = matcher.match(self.db_handler)
self.cursors = {}
@@ -215,8 +241,8 @@ class SingleQueryHandler(object):
@return True if entry matches this query, False otherwise
@todo Currently this messes up the order of the result set if a specific order was desired"""
- return False
-
+ return False
+
# TODO Register with the new entry to receive changes
@@ -226,5 +252,27 @@ class SingleQueryHandler(object):
# and be done with it. For those, theres's no need to re-read all results.
# Let clients know that this result set changed
-
+#----------------------------------------------------------------------------#
+class SingleQueryHandler(BaseQueryHandler):
+ """Handles a single dictionary based query."""
+#----------------------------------------------------------------------------#
+ def __init__(self, query, db_handler, dbus_sender):
+ """Creates a new SingleQueryHandler instance
+
+ @param query Query to evaluate
+ @param entries Set of Entry objects to use
+ @param dbus_sender Sender's unique name on the bus"""
+ BaseQueryHandler.__init__(self, query, db_handler, QueryMatcher(query), dbus_sender)
+
+
+class SingleRawSQLQueryHandler(BaseQueryHandler):
+ """Handles a single raw SQL based query."""
+ def __init__(self, query, db_handler, dbus_sender):
+ """Creates a new SingleRawSQLQueryHandler instance
+
+ @param query Query to evaluate
+ @param db_handler database handler
+ @param dbus_sender Sender's unique name on the bus"""
+
+ BaseQueryHandler.__init__(self, query, db_handler, RawSQLQueryMatcher(query), dbus_sender)
--
FSO frameworkd Debian packaging
More information about the pkg-fso-commits
mailing list