[SCM] morituri/master: * morituri/common/checksum.py: Create a GstException to wrap a Gst.GError. Create a base GstPipelineTask class. Use it in Checksum and TRM tasks. Raise and don't proceed to call .paused() when a GstError happens. Should help debug https://bugs.launchpad.net/bugs/735053 * morituri/test/test_common_checksum.py: Adapt test.

js at users.alioth.debian.org js at users.alioth.debian.org
Sun Oct 19 20:09:21 UTC 2014


The following commit has been merged in the master branch:
commit e52d20c19a572c5ce07943994b8ac4866c448078
Author: Thomas Vander Stichele <thomas (at) apestaart (dot) org>
Date:   Tue Mar 15 22:44:05 2011 +0000

    	* morituri/common/checksum.py:
    	  Create a GstException to wrap a Gst.GError.
    	  Create a base GstPipelineTask class.
    	  Use it in Checksum and TRM tasks.
    	  Raise and don't proceed to call .paused() when a GstError happens.
    	  Should help debug https://bugs.launchpad.net/bugs/735053
    	* morituri/test/test_common_checksum.py:
    	  Adapt test.

diff --git a/ChangeLog b/ChangeLog
index e28194c..334f7ab 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,14 @@
+2011-03-15  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+	* morituri/common/checksum.py:
+	  Create a GstException to wrap a Gst.GError.
+	  Create a base GstPipelineTask class.
+	  Use it in Checksum and TRM tasks.
+	  Raise and don't proceed to call .paused() when a GstError happens.
+	  Should help debug https://bugs.launchpad.net/bugs/735053
+	* morituri/test/test_common_checksum.py:
+	  Adapt test.
+
 2011-01-09  Thomas Vander Stichele  <thomas at apestaart dot org>
 
 	patch by: Ross Burton
diff --git a/morituri/common/checksum.py b/morituri/common/checksum.py
index 00ecd63..c57af97 100644
--- a/morituri/common/checksum.py
+++ b/morituri/common/checksum.py
@@ -30,7 +30,80 @@ from morituri.common import common, task
 
 # checksums are not CRC's. a CRC is a specific type of checksum.
 
-class ChecksumTask(task.Task):
+# FIXME: probably this should move higher up the module hierarchy and
+# be used wider
+class GstException(Exception):
+    def __init__(self, gerror, debug):
+        self.args = (gerror, debug, )
+        self.gerror = gerror
+        self.debug = debug
+
+# FIXME: this should move up too; other tasks might have use for it.
+class GstPipelineTask(task.Task):
+    """
+    I am a base class for tasks that use a GStreamer pipeline.
+
+    I handle errors and raise them appropriately.
+    """
+    def start(self, runner):
+        task.Task.start(self, runner)
+        desc = self.getPipelineDesc()
+
+        self.debug('creating pipeline %r', desc)
+        self.pipeline = gst.parse_launch(desc)
+
+        self._bus = self.pipeline.get_bus()
+        gst.debug('got bus %r' % self._bus)
+
+        # a signal watch calls callbacks from an idle loop
+        # self._bus.add_signal_watch()
+
+        # sync emission triggers sync-message signals which calls callbacks
+        # from the thread that signals, but happens immediately
+        self._bus.enable_sync_message_emission()
+        self._bus.connect('sync-message::eos', self.bus_eos_cb)
+        self._bus.connect('sync-message::tag', self.bus_tag_cb)
+        self._bus.connect('sync-message::error', self.bus_error_cb)
+
+        self.parsed()
+
+        self.debug('pausing pipeline')
+        self.pipeline.set_state(gst.STATE_PAUSED)
+        self.pipeline.get_state()
+        self.debug('paused pipeline')
+
+        if not self.exception:
+            self.paused()
+
+    def getPipelineDesc(self):
+        raise NotImplementedError
+
+    def parsed(self):
+        """
+        Called after parsing the pipeline but before setting it to paused.
+        """
+        pass
+
+    def paused(self):
+        """
+        Called after pipeline is paused
+        """
+        pass
+
+    def bus_eos_cb(self, bus, message):
+        pass
+
+    def bus_tag_cb(self, bus, message):
+        pass
+
+    def bus_error_cb(self, bus, message):
+        exc = GstException(*message.parse_error())
+        self.setAndRaiseException(exc)
+        gst.debug('error, scheduling stop')
+        #self.runner.schedule(0, self.stop)
+
+
+class ChecksumTask(GstPipelineTask):
     """
     I am a task that calculates a checksum of the decoded audio data.
 
@@ -72,20 +145,15 @@ class ChecksumTask(task.Task):
 
         self.checksum = None # result
 
-    def start(self, runner):
-        task.Task.start(self, runner)
-        self._pipeline = gst.parse_launch('''
+    def getPipelineDesc(self):
+        return '''
             filesrc location="%s" !
             decodebin ! audio/x-raw-int !
-            appsink name=sink sync=False emit-signals=True''' %
-                common.quoteParse(self._path).encode('utf-8'))
+            appsink name=sink sync=False emit-signals=True
+            ''' % common.quoteParse(self._path).encode('utf-8')
 
-        self.debug('pausing pipeline')
-        self._pipeline.set_state(gst.STATE_PAUSED)
-        self._pipeline.get_state()
-        self.debug('paused pipeline')
-
-        sink = self._pipeline.get_by_name('sink')
+    def paused(self):
+        sink = self.pipeline.get_by_name('sink')
 
         if self._frameLength < 0:
             self.debug('query duration')
@@ -122,23 +190,22 @@ class ChecksumTask(task.Task):
         # FIXME: sending it with frameEnd set screws up the seek, we don't get
         # everything for flac; fixed in recent -good
         result = sink.send_event(event)
-        self.debug('event sent')
-        self.debug(result)
+        self.debug('event sent, result %r', result)
         sink.connect('new-buffer', self._new_buffer_cb)
         sink.connect('eos', self._eos_cb)
 
         self.debug('scheduling setting to play')
         # since set_state returns non-False, adding it as timeout_add
         # will repeatedly call it, and block the main loop; so
-        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
+        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
         # would not work.
 
         def play():
-            self._pipeline.set_state(gst.STATE_PLAYING)
+            self.pipeline.set_state(gst.STATE_PLAYING)
             return False
         self.runner.schedule(0, play)
 
-        #self._pipeline.set_state(gst.STATE_PLAYING)
+        #self.pipeline.set_state(gst.STATE_PLAYING)
         self.debug('scheduled setting to play')
 
     def _new_buffer_cb(self, sink):
@@ -185,7 +252,7 @@ class ChecksumTask(task.Task):
     def stop(self):
         self.debug('stopping')
         self.debug('setting state to NULL')
-        self._pipeline.set_state(gst.STATE_NULL)
+        self.pipeline.set_state(gst.STATE_NULL)
 
         if not self._last:
             # see http://bugzilla.gnome.org/show_bug.cgi?id=578612
@@ -272,7 +339,7 @@ class AccurateRipChecksumTask(ChecksumTask):
 
         return checksum
 
-class TRMTask(task.Task):
+class TRMTask(GstPipelineTask):
     """
     I calculate a MusicBrainz TRM fingerprint.
 
@@ -288,63 +355,52 @@ class TRMTask(task.Task):
 
         self.path = path
         self._trm = None
-        self._pipeline = None
         self._bus = None
 
-    def start(self, runner):
-        task.Task.start(self, runner)
-        self._pipeline = gst.parse_launch('''
+    def getPipelineDesc(self):
+        return '''
             filesrc location="%s" !
             decodebin ! audioconvert ! audio/x-raw-int !
             trm name=trm !
-            appsink name=sink sync=False emit-signals=True''' % self.path)
-        self._bus = self._pipeline.get_bus()
-        self._bus.add_signal_watch()
-        self._bus.connect('message::eos', self._bus_eos_cb)
-        self._bus.connect('message::tag', self._bus_tag_cb)
-        self._bus.connect('message::error', self._bus_error_cb)
-        sink = self._pipeline.get_by_name('sink')
-        sink.connect('new-buffer', self._new_buffer_cb)
+            appsink name=sink sync=False emit-signals=True''' % self.path
 
-        gst.debug('pausing')
-        self._pipeline.set_state(gst.STATE_PAUSED)
-        gst.debug('paused')
-        self._pipeline.get_state()
-        gst.debug('paused')
 
+    def parsed(self):
+        sink = self.pipeline.get_by_name('sink')
+        sink.connect('new-buffer', self._new_buffer_cb)
+
+    def paused(self):
         gst.debug('query duration')
-        sink = self._pipeline.get_by_name('sink')
+        sink = self.pipeline.get_by_name('sink')
 
-        self._length, qformat = self._pipeline.query_duration(gst.FORMAT_TIME)
+        self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME)
         gst.debug('total length: %r' % self._length)
         gst.debug('scheduling setting to play')
         # since set_state returns non-False, adding it as timeout_add
         # will repeatedly call it, and block the main loop; so
-        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
+        #   gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
         # would not work.
 
         def play():
-            self._pipeline.set_state(gst.STATE_PLAYING)
+            self.pipeline.set_state(gst.STATE_PLAYING)
             return False
         self.runner.schedule(0, play)
 
-        #self._pipeline.set_state(gst.STATE_PLAYING)
+        #self.pipeline.set_state(gst.STATE_PLAYING)
         gst.debug('scheduled setting to play')
 
-    def _bus_eos_cb(self, bus, message):
+    # FIXME: can't move this to base class because it triggers too soon
+    # in the case of checksum
+    def bus_eos_cb(self, bus, message):
         gst.debug('eos, scheduling stop')
         self.runner.schedule(0, self.stop)
 
-    def _bus_tag_cb(self, bus, message):
+
+    def bus_tag_cb(self, bus, message):
         taglist = message.parse_tag()
         if 'musicbrainz-trmid' in taglist.keys():
             self._trm = taglist['musicbrainz-trmid']
 
-    def _bus_error_cb(self, bus, message):
-        error = message.parse_error()
-        # FIXME: handle properly
-        print error
-
     def _new_buffer_cb(self, sink):
         # this is just for counting progress
         buf = sink.emit('pull-buffer')
@@ -356,7 +412,7 @@ class TRMTask(task.Task):
     def stop(self):
         gst.debug('stopping')
         gst.debug('setting state to NULL')
-        self._pipeline.set_state(gst.STATE_NULL)
+        self.pipeline.set_state(gst.STATE_NULL)
 
         # publicize and stop
         self.trm = self._trm
diff --git a/morituri/test/test_common_checksum.py b/morituri/test/test_common_checksum.py
index d32b9d0..6e802d7 100644
--- a/morituri/test/test_common_checksum.py
+++ b/morituri/test/test_common_checksum.py
@@ -27,7 +27,7 @@ class EmptyTestCase(common.TestCase):
         # FIXME: do we want a specific error for this ?
         e = self.assertRaises(task.TaskException, self.runner.run,
             checksumtask, verbose=False)
-        self.failUnless(isinstance(e.exception, gst.QueryError))
+        self.failUnless(isinstance(e.exception, checksum.GstException))
         os.unlink(path)
 
 class PathTestCase(common.TestCase):
@@ -37,7 +37,7 @@ class PathTestCase(common.TestCase):
         checksumtask = checksum.ChecksumTask(path) 
         e = self.assertRaises(task.TaskException, self.runner.run,
             checksumtask, verbose=False)
-        self.failUnless(isinstance(e.exception, gst.QueryError))
+        self.failUnless(isinstance(e.exception, checksum.GstException))
         os.unlink(path)
 
 class UnicodePathTestCase(PathTestCase, common.UnicodeTestMixin):

-- 
morituri packaging



More information about the pkg-multimedia-commits mailing list