[pkg-fso-commits] [SCM] FSO frameworkd Debian packaging branch, master, updated. milestone4-368-g700ab82
Michael 'Mickey' Lauer
mickey at vanille-media.de
Mon Feb 2 18:51:57 UTC 2009
The following commit has been merged in the master branch:
commit d871c761aca6a802b911be6284f69eca2838cbf1
Author: Michael 'Mickey' Lauer <mickey at vanille-media.de>
Date: Thu Jan 15 15:22:50 2009 +0100
tasklet: what do we do if the generator vanishes?
diff --git a/framework/patterns/tasklet.py b/framework/patterns/tasklet.py
index 64a4762..89a33f3 100644
--- a/framework/patterns/tasklet.py
+++ b/framework/patterns/tasklet.py
@@ -51,15 +51,15 @@ class Tasklet(object):
"""
This class can be used to write easy callback style functions using the 'yield'
python expression.
-
+
It is usefull in some cases where callback functions are the right thing to do,
but make the code too messy
-
+
This class is largely inspired by python PEP 0342:
http://www.python.org/dev/peps/pep-0342/
-
+
See the examples below to understand how to use it.
-
+
There is a very simple comunication mechanisme between tasklets :
A tasklet can wait for an incoming message using `yield WaitMessage()`,
an other tasklet can then send a message to this tasklet using the send_message method.
@@ -72,11 +72,11 @@ class Tasklet(object):
self.generator = self.do_run(*args, **kargs)
assert isinstance(self.generator, GeneratorType), type(self.generator)
self.stack = traceback.extract_stack()[:-2]
-
+
# The tasklet we are waiting for...
self.waiting = None
self.closed = False
-
+
# The two lists used for messages passing between tasklets
self.waiting_to_send_message = []
self.waiting_for_message = []
@@ -87,38 +87,38 @@ class Tasklet(object):
"Tasklet deleted without being executed\nTraceback to instantiation (most recent call last):\n%s",
''.join(traceback.format_list(self.stack)).rstrip()
)
-
+
def do_run(self, *args, **kargs):
return self.run(*args, **kargs)
-
+
def run(self):
"""The default task run by the tasklet"""
yield
-
+
def start(self, callback = None, err_callback = None, *args, **kargs):
"""Start the tasklet, connected to a callback and an error callback
-
+
:Parameters:
- - `callback`: a function that will be called with the
+ - `callback`: a function that will be called with the
returned value as argument
- `err_callback`: a function that is called if the tasklet raises an exception.
The function take 3 arguments as parameters, that are the standard python exception arguments.
- `*args`: any argument that will be passed to the callback function as well
- - `**kargs`: any kargs argument that will be passed to the callback function as well
+ - `**kargs`: any kargs argument that will be passed to the callback function as well
"""
self.callback = callback or self.default_callback
self.err_callback = err_callback or self.default_err_callback
self.args = args # possible additional args that will be passed to the callback
self.kargs = kargs # possible additional keywords args that will be passed to the callback
self.send(None) # And now we can initiate the task
-
+
def start_from(self, tasklet):
"""Start the tasklet from an other tasklet"""
self.start(tasklet.send, tasklet.throw)
-
+
def start_dbus(self, on_ok, on_err, *args, **kargs):
"""Like start, except that the callback methods comply to the dbus async signature
-
+
We should use this method instead of start when we want to connect to the callbacks
defined in the dbus async_callbacks keyword.
"""
@@ -132,11 +132,11 @@ class Tasklet(object):
def err_callback(type, e, trace):
on_err(e)
self.start(callback=callback, err_callback=err_callback, *args, **kargs)
-
+
def default_callback(self, value):
"""The default callback if None is specified"""
pass
-
+
def default_err_callback(self, type, value, traceback):
"""The default error call back if None is specified"""
if type is GeneratorExit:
@@ -147,7 +147,7 @@ class Tasklet(object):
import sys
tb.print_exception(*sys.exc_info())
sys.exit(-1)
-
+
def close(self):
if self.closed:
return
@@ -157,16 +157,21 @@ class Tasklet(object):
self.waiting.close()
self.generator.close()
self.closed = True
-
+
def exit(self): # TODO: is this really useful, or should we use close here ?
e = GeneratorExit()
self.err_callback(*sys.exc_info())
-
+
def send(self, value = None, *args):
"""Resume and send a value into the tasklet generator
"""
# This somehow complicated try switch is used to handle all possible return and exception
# from the generator function
+ ### <ML addition FIXME FIXME>
+ if self.generator == None:
+ logger.error( "generator has vanished!" )
+ return
+ ### </ML addition FIXME FIXME>
assert self.closed == False, "Trying to send to a closed tasklet"
try:
value = self.generator.send(value)
@@ -178,7 +183,7 @@ class Tasklet(object):
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
-
+
def throw(self, type, value = None, traceback = None):
"""Throw an exeption into the tasklet generator"""
try:
@@ -191,10 +196,10 @@ class Tasklet(object):
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
-
+
def handle_yielded_value(self, value):
"""This method is called after the waiting tasklet yielded a value
-
+
We have to take care of two cases:
- If the value is a Tasklet : we start it and connect the call back
to the 'parent' Tasklet send and throw hooks
@@ -210,7 +215,7 @@ class Tasklet(object):
assert self.callback, "%s has no callback !" % self
self.callback(value, *self.args, **self.kargs)
self.close()
-
+
@tasklet
def send_message(self, value = None):
"""Block until the tasklet accepts the incoming message"""
@@ -221,11 +226,11 @@ class Tasklet(object):
sender = WaitTrigger()
self.waiting_to_send_message.append((sender, value))
yield sender
-
+
@tasklet
def wait_message(self):
"""Block until the tasklet receive an incoming message
-
+
Since we usually don't have access to the tasklet `self` argument (when using generators based tasklets)
it is easier to use the WaitMessage class for this.
"""
@@ -238,10 +243,10 @@ class Tasklet(object):
self.waiting_for_message.append(waiter)
ret = yield waiter
yield ret
-
+
class WaitTrigger(Tasklet):
"""Special tasklet that will block until its `trigger` method is called
-
+
This is mostly used by the send_message and WaitMessage tasklet.
"""
def start(self, callback = None, err_callback = None, *args, **kargs):
@@ -252,20 +257,20 @@ class WaitTrigger(Tasklet):
self.close()
def close(self):
self.callback = None
-
+
class WaitMessage(Tasklet):
"""Special tasklet that will block until the caller tasklet receive a message."""
def start_from(self, tasklet):
tasklet.wait_message().start(tasklet.send)
def close(self):
pass
-
+
class Wait(Tasklet):
"""
A special tasklet that wait for an event to be emitted
-
+
If o is an Object that can emit a signal 'signal', then we can create a
- tasklet that waits for this event like this : Wait(o, 'signal')
+ tasklet that waits for this event like this : Wait(o, 'signal')
"""
def __init__(self, obj, event):
assert obj is not None
@@ -273,17 +278,17 @@ class Wait(Tasklet):
self.obj = obj
self.event = event
self.connect_id = None
-
+
def _callback(self, o, *args):
"""This is the callback that is triggered by the signal"""
assert o is self.obj
-
+
if not self.connect_id:
return # We have been closed already
# We need to remember to disconnect to the signal
o.disconnect(self.connect_id)
self.connect_id = None
-
+
# We can finally call our real callback
try:
self.callback(*args)
@@ -293,19 +298,19 @@ class Wait(Tasklet):
# We give a hint to the garbage collector
self.obj = self.callback = None
return False
-
+
def start(self, callback, err_callback, *args):
assert hasattr(self.obj, 'connect'), self.obj
self.callback = callback
self.err_callback = err_callback
self.connect_id = self.obj.connect(self.event, self._callback, *args)
-
+
def close(self):
# It is very important to disconnect the callback here !
if self.connect_id:
self.obj.disconnect(self.connect_id)
self.obj = self.callback = self.connect_id = None
-
+
class WaitFirst(Tasklet):
"""
A special tasklet that waits for the first to return of a list of tasklets.
@@ -314,7 +319,7 @@ class WaitFirst(Tasklet):
super(WaitFirst, self).__init__()
self.done = None
self.tasklets = tasklets
-
+
def _callback(self, *args):
i = args[-1]
values = args[:-1]
@@ -326,22 +331,22 @@ class WaitFirst(Tasklet):
t.close()
self.callback = None
self.tasklets = None
-
+
def start(self, callback = None, err_callback = None):
self.callback = callback
self.err_callback = Tasklet.default_err_callback
-
+
# We connect all the tasklets
for (i,t) in enumerate(self.tasklets):
t.start(self._callback, err_callback, i)
-
+
class WaitDBus(Tasklet):
"""Special tasket that wait for a DBus call"""
def __init__(self, method, *args):
super(WaitDBus, self).__init__()
self.method = method
self.args = args
- def start(self, callback, err_callback):
+ def start(self, callback, err_callback):
self.callback = callback
self.err_callback = err_callback
kargs = {'reply_handler':self._callback, 'error_handler':self._err_callback}
@@ -350,7 +355,7 @@ class WaitDBus(Tasklet):
self.callback(*args)
def _err_callback(self, e):
self.err_callback(type(e), e, sys.exc_info()[2])
-
+
class WaitDBusSignal(Tasklet):
"""A special tasklet that wait for a DBUs event to be emited"""
def __init__(self, obj, event, time_out = None):
@@ -359,16 +364,16 @@ class WaitDBusSignal(Tasklet):
self.event = event
self.time_out = time_out
self.connection = None
-
+
def _callback(self, *args):
if not self.connection:
return # We have been closed already
self.connection.remove()
-
+
if len(args) == 1: # What is going on here is that if we have a single value, we return it directly,
args = args[0] # but if we have several value we pack them in a tuple for the callback
# because the callback only accpet a single argument
-
+
try:
self.callback(args)
except:
@@ -377,24 +382,24 @@ class WaitDBusSignal(Tasklet):
self.obj = self.callback = None
return False
-
+
def _err_callback(self):
e = Exception("TimeOut")
self.err_callback(type(e), e, sys.exc_info()[2])
-
- def start(self, callback, err_callback):
+
+ def start(self, callback, err_callback):
self.callback = callback
self.err_callback = err_callback
self.connection = self.obj.connect_to_signal(self.event, self._callback)
if self.time_out:
gobject.timeout_add(self.time_out * 1000, self._err_callback)
-
+
def close(self):
# Note : it is not working very well !!!! Why ? I don't know...
if self.connection:
self.connection.remove()
self.obj = self.callback = self.connection = None
-
+
class WaitDBusName(Tasklet):
"""Special tasklet that blocks until a given DBus name is available on the system bus"""
def run(self, name):
@@ -407,16 +412,16 @@ class WaitDBusName(Tasklet):
var = yield WaitDBusSignal( bus_obj_iface, 'NameOwnerChanged' )
if var[0] == name:
yield None
-
+
class WaitFunc(Tasklet):
"""A special tasklet that will wait for a function to call a callback.
-
+
This is useful to reuse old style callback function.
- The function should take 2 parameters that are the callback to call
+ The function should take 2 parameters that are the callback to call
"""
def __init__(self, func):
"""Create the tasklet using a given function
-
+
`func` should have this signature : func(on_ok, on_err)
where :
on_ok is a callback to call on return.
@@ -434,13 +439,13 @@ class WaitFunc(Tasklet):
self.func(self.__callback, self.__err_callback)
def close(self):
pass
-
+
class Producer(Tasklet):
"""
A Producer is a modified Tasklet that is not automatically closed after
returing a value.
-
+
This is still expermimental...
"""
def send(self, value = None, *args):
@@ -455,7 +460,7 @@ class Producer(Tasklet):
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
-
+
def throw(self, type, value, traceback):
"""Throw an exeption into the tasklet generator"""
try:
@@ -465,10 +470,10 @@ class Producer(Tasklet):
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
-
+
def handle_yielded_value(self, value):
"""This method is called after the waiting tasklet yielded a value
-
+
We have to take care of two cases:
- If the value is a Tasklet : we start it and connect the call back
to the 'parent' Tasklet send and throw hooks
@@ -483,8 +488,8 @@ class Producer(Tasklet):
else:
assert self.callback, "%s has no callback !" % self
self.callback(value, *self.args, **self.kargs)
-
-
+
+
class Sleep(Tasklet):
""" This is a 'primitive' tasklet that will trigger our call back after a short time
"""
@@ -497,9 +502,9 @@ class Sleep(Tasklet):
def close(self):
# We cancel the event
gobject.source_remove(self.event_id)
-
+
class WaitFileReady(Tasklet):
- """This special Tasklet will block until a file descriptor is ready for reading or sending"""
+ """This special Tasklet will block until a file descriptor is ready for reading or sending"""
def __init__(self, fd, cond):
super(WaitFileReady, self).__init__()
self.fd = fd
@@ -516,14 +521,14 @@ class WaitFileReady(Tasklet):
if self.event_id:
gobject.source_remove(self.event_id)
self.event_id = None
-
-
+
+
if __name__ == '__main__':
# And here is a simple example application using our tasklet class
import gobject
-
+
def example1():
print "== Simple example that waits two times for an input event =="
loop = gobject.MainLoop()
@@ -539,8 +544,8 @@ if __name__ == '__main__':
task1(10).start()
print 'I do other things'
loop.run()
-
-
+
+
def example2():
print "== We can call a tasklet form an other tasklet =="
@tasklet
@@ -555,7 +560,7 @@ if __name__ == '__main__':
print "task2 returns"
yield 2 * x # Return value
task1().start()
-
+
def example3():
print "== We can pass exception through tasklets =="
@tasklet
@@ -580,9 +585,9 @@ if __name__ == '__main__':
def task4():
print 'task4'
yield 10
-
- task1().start()
-
+
+ task1().start()
+
def example4():
print "== We can cancel execution of a task before it ends =="
loop = gobject.MainLoop()
@@ -597,7 +602,7 @@ if __name__ == '__main__':
# At this point, we decide to cancel the task
task.close()
print "task canceled"
-
+
def example5():
print "== A task can choose to perform specific action if it is canceld =="
loop = gobject.MainLoop()
@@ -608,7 +613,7 @@ if __name__ == '__main__':
yield Sleep(1)
except GeneratorExit:
print "Executed before the task is canceled"
- raise
+ raise
print "task stopped"
loop.quit()
task = task()
@@ -616,7 +621,7 @@ if __name__ == '__main__':
# At this point, we decide to cancel the task
task.close()
print "task canceled"
-
+
def example6():
print "== Using WaitFirst, we can wait for several tasks at the same time =="
loop = gobject.MainLoop()
@@ -628,7 +633,7 @@ if __name__ == '__main__':
loop.quit()
task1(10).start()
loop.run()
-
+
def example7():
print "== Using Producer, we can create pipes =="
class MyProducer(Producer):
@@ -637,7 +642,7 @@ if __name__ == '__main__':
yield Sleep(1)
print "producing %d" % i
yield i
-
+
class MyConsumer(Tasklet):
def run(self, input):
print "start"
@@ -648,17 +653,17 @@ if __name__ == '__main__':
except StopIteration:
print "Stop"
loop.quit()
-
+
loop = gobject.MainLoop()
MyConsumer(MyProducer()).start()
print "We can do other things in the meanwhile"
-
+
loop.run()
-
+
def example8():
print "== Using messages to comunicate between tasklets =="
loop = gobject.MainLoop()
-
+
@tasklet
def task1():
while True:
@@ -668,7 +673,7 @@ if __name__ == '__main__':
print "got message %s" % msg
print "end task1"
loop.quit()
-
+
@tasklet
def task2(task):
for i in range(4):
@@ -677,7 +682,7 @@ if __name__ == '__main__':
yield Sleep(1)
yield task.send_message('end')
print "end task2"
-
+
task1 = task1()
task1.start()
task2(task1).start()
@@ -688,10 +693,10 @@ if __name__ == '__main__':
def task1():
yield None
import gc
-
+
gc.collect()
n = len(gc.get_objects())
-
+
for i in range(1000):
t = Tasklet(generator=task1())
t.start()
--
FSO frameworkd Debian packaging
More information about the pkg-fso-commits
mailing list