[SCM] morituri/master: * morituri/common/encode.py: Add tasks to read, write, and safely retag flac files. * morituri/test/test_common_encode.py: Add tests for this.

js at users.alioth.debian.org js at users.alioth.debian.org
Sun Oct 19 20:09:17 UTC 2014


The following commit has been merged in the master branch:
commit d9ca12d7cc9570c84040c13d47e71e1faef79f8f
Author: Thomas Vander Stichele <thomas (at) apestaart (dot) org>
Date:   Tue Apr 13 21:55:35 2010 +0000

    	* morituri/common/encode.py:
    	  Add tasks to read, write, and safely retag flac files.
    	* morituri/test/test_common_encode.py:
    	  Add tests for this.

diff --git a/ChangeLog b/ChangeLog
index 08058b1..8dc4a3e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,12 @@
 2010-04-13  Thomas Vander Stichele  <thomas at apestaart dot org>
 
+	* morituri/common/encode.py:
+	  Add tasks to read, write, and safely retag flac files.
+	* morituri/test/test_common_encode.py:
+	  Add tests for this.
+
+2010-04-13  Thomas Vander Stichele  <thomas at apestaart dot org>
+
 	* morituri/common/checksum.py:
 	  Style fixes.
 	* morituri/common/common.py:
diff --git a/morituri/common/encode.py b/morituri/common/encode.py
index f054a32..449ccb9 100644
--- a/morituri/common/encode.py
+++ b/morituri/common/encode.py
@@ -21,8 +21,10 @@
 # along with morituri.  If not, see <http://www.gnu.org/licenses/>.
 
 import math
+import os
+import tempfile
 
-from morituri.common import common, task
+from morituri.common import common, task, checksum
 
 from morituri.common import log
 log.init()
@@ -283,7 +285,7 @@ class TagReadTask(task.Task):
 
     logCategory = 'TagReadTask'
 
-    description = 'Reading Tags'
+    description = 'Reading tags'
 
     taglist = None
 
@@ -351,3 +353,152 @@ class TagReadTask(task.Task):
         self.debug('set state to NULL')
         task.Task.stop(self)
 
+class TagWriteTask(task.Task):
+    """
+    I am a task that retags an encoded file.
+    """
+
+    logCategory = 'TagWriteTask'
+
+    description = 'Writing tags'
+
+    def __init__(self, inpath, outpath, taglist=None):
+        """
+        """
+        assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
+        assert type(outpath) is unicode, "outpath %r is not unicode" % outpath
+        
+        self._inpath = inpath
+        self._outpath = outpath
+        self._taglist = taglist
+
+    def start(self, runner):
+        task.Task.start(self, runner)
+
+        # here to avoid import gst eating our options
+        import gst
+
+        self._pipeline = gst.parse_launch('''
+            filesrc location="%s" !
+            flactag name=tagger !
+            filesink location="%s"''' % (
+                common.quoteParse(self._inpath).encode('utf-8'),
+                common.quoteParse(self._outpath).encode('utf-8')))
+
+        # set tags
+        tagger = self._pipeline.get_by_name('tagger')
+        if self._taglist:
+            tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
+
+        self.debug('pausing pipeline')
+        self._pipeline.set_state(gst.STATE_PAUSED)
+        self._pipeline.get_state()
+        self.debug('paused pipeline')
+
+        # add eos handling
+        bus = self._pipeline.get_bus()
+        bus.add_signal_watch()
+        bus.connect('message::eos', self._message_eos_cb)
+
+        self.debug('scheduling setting to play')
+        # since set_state returns non-False, adding it as timeout_add
+        # will repeatedly call it, and block the main loop; so
+        #   gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
+        # would not work.
+
+        def play():
+            self._pipeline.set_state(gst.STATE_PLAYING)
+            return False
+        self.runner.schedule(0, play)
+
+        #self._pipeline.set_state(gst.STATE_PLAYING)
+        self.debug('scheduled setting to play')
+
+    def _message_eos_cb(self, bus, message):
+        self.debug('eos, scheduling stop')
+        self.runner.schedule(0, self.stop)
+
+    def stop(self):
+        # here to avoid import gst eating our options
+        import gst
+
+        self.debug('stopping')
+        self.debug('setting state to NULL')
+        self._pipeline.set_state(gst.STATE_NULL)
+        self.debug('set state to NULL')
+        task.Task.stop(self)
+
+class SafeRetagTask(task.MultiSeparateTask):
+    """
+    I am a task that retags an encoded file safely in place.
+    First of all, if the new tags are the same as the old ones, it doesn't
+    do anything.
+    If the tags are not the same, then the file gets retagged, but only
+    if the decodes of the original and retagged file checksum the same.
+
+    @ivar changed: True if the tags have changed (and hence an output file is
+                   generated)
+    """
+
+    logCategory = 'SafeRetagTask'
+
+    description = 'Retagging'
+
+    changed = False
+
+    def __init__(self, path, taglist=None):
+        """
+        """
+        assert type(path) is unicode, "path %r is not unicode" % path
+
+        task.MultiSeparateTask.__init__(self)
+        
+        self._path = path
+        self._taglist = taglist.copy()
+
+        self.tasks = [TagReadTask(path), ]
+
+    def stopped(self, taskk):
+        if not taskk.exception:
+            import gst
+            # Check if the tags are different or not
+            if taskk == self.tasks[0]:
+                taglist = taskk.taglist.copy()
+                if common.tagListEquals(taglist, self._taglist):
+                    self.debug('tags are already fine')
+                else:
+                    # need to retag
+                    self.debug('tags need to be rewritten')
+                    self.debug('Current tags: %r, new tags: %r',
+                        common.tagListToDict(taglist),
+                        common.tagListToDict(self._taglist))
+                    assert common.tagListToDict(taglist) != common.tagListToDict(self._taglist)
+                    self.tasks.append(checksum.CRC32Task(self._path))
+                    self._fd, self._tmppath = tempfile.mkstemp(
+                        dir=os.path.dirname(self._path), suffix=u'.morituri')
+                    self.tasks.append(TagWriteTask(self._path,
+                        self._tmppath, self._taglist))
+                    self.tasks.append(checksum.CRC32Task(self._tmppath))
+                    self.tasks.append(TagReadTask(self._tmppath))
+            elif len(self.tasks) > 1 and taskk == self.tasks[4]:
+                if common.tagListEquals(self.tasks[4].taglist, self._taglist):
+                    self.debug('tags written successfully')
+                    c1 = self.tasks[1].checksum
+                    c2 = self.tasks[3].checksum
+                    self.debug('comparing checksums %08x and %08x' % (c1, c2))
+                    if False: #c1 == c2:
+                        # data is fine, so we can now move
+                        self.debug('moving temporary file to %r' % self._path)
+                        os.rename(self._tmppath, self._path)
+                    else:
+                        # FIXME: don't raise TypeError
+                        e = TypeError("Checksums failed")
+                        self.setAndRaiseException(e)
+                else:
+                    os.unlink(self._tmppath)
+                    e = TypeError("Tags not written")
+                    self.setAndRaiseException(e)
+                   
+        task.MultiSeparateTask.stopped(self, taskk)
+
+      
diff --git a/morituri/test/test_common_encode.py b/morituri/test/test_common_encode.py
index 383b32c..0549f5a 100644
--- a/morituri/test/test_common_encode.py
+++ b/morituri/test/test_common_encode.py
@@ -49,3 +49,66 @@ class TagReadTestCase(common.TestCase):
         self.failUnless(t.taglist)
         self.assertEquals(t.taglist['audio-codec'], 'FLAC')
         self.assertEquals(t.taglist['description'], 'audiotest wave')
+
+class TagWriteTestCase(common.TestCase):
+    def testWrite(self):
+        fd, inpath = tempfile.mkstemp(suffix=u'.morituri.tagwrite.flac')
+        
+        os.system('gst-launch '
+            'audiotestsrc num-buffers=10 samplesperbuffer=588 ! '
+            'audioconvert ! '
+            'audio/x-raw-int,channels=2,width=16,height=16,rate=44100 ! '
+            'flacenc ! filesink location=%s > /dev/null 2>&1' % inpath)
+        os.close(fd)
+
+        fd, outpath = tempfile.mkstemp(suffix=u'.morituri.tagwrite.flac')
+        self.runner = task.SyncRunner(verbose=False)
+        taglist = gst.TagList()
+        taglist[gst.TAG_ARTIST] = 'Artist'
+        taglist[gst.TAG_TITLE] = 'Title'
+
+        t = encode.TagWriteTask(inpath, outpath, taglist)
+        self.runner.run(t)
+
+        t = encode.TagReadTask(outpath)
+        self.runner.run(t)
+        self.failUnless(t.taglist)
+        self.assertEquals(t.taglist['audio-codec'], 'FLAC')
+        self.assertEquals(t.taglist['description'], 'audiotest wave')
+        self.assertEquals(t.taglist[gst.TAG_ARTIST], 'Artist')
+        self.assertEquals(t.taglist[gst.TAG_TITLE], 'Title')
+
+        os.unlink(inpath)
+        os.unlink(outpath)
+        
+class SafeRetagTestCase(common.TestCase):
+    def setUp(self):
+        self._fd, self._path = tempfile.mkstemp(suffix=u'.morituri.retag.flac')
+        
+        os.system('gst-launch '
+            'audiotestsrc num-buffers=10 samplesperbuffer=588 ! '
+            'audioconvert ! '
+            'audio/x-raw-int,channels=2,width=16,height=16,rate=44100 ! '
+            'flacenc ! filesink location=%s > /dev/null 2>&1' % self._path)
+        os.close(self._fd)
+        self.runner = task.SyncRunner(verbose=False)
+
+    def tearDown(self):
+        os.unlink(self._path)
+
+    def testNoChange(self):
+        taglist = gst.TagList()
+        taglist[gst.TAG_DESCRIPTION] = 'audiotest wave'
+        taglist[gst.TAG_AUDIO_CODEC] = 'FLAC'
+
+        t = encode.SafeRetagTask(self._path, taglist)
+        self.runner.run(t)
+
+    def testChange(self):
+        taglist = gst.TagList()
+        taglist[gst.TAG_DESCRIPTION] = 'audiotest retagged'
+        taglist[gst.TAG_AUDIO_CODEC] = 'FLAC'
+        taglist[gst.TAG_ARTIST] = 'Artist'
+
+        t = encode.SafeRetagTask(self._path, taglist)
+        self.runner.run(t)

-- 
morituri packaging



More information about the pkg-multimedia-commits mailing list