[SCM] morituri/master: * morituri/test/test_common_renamer.py (added): * morituri/common/renamer.py (added): Add a way of doing transactional file renames, as well as their metafile updates.

js at users.alioth.debian.org js at users.alioth.debian.org
Sun Oct 19 20:08:48 UTC 2014


The following commit has been merged in the master branch:
commit 0bbcac6d063f05611478d04f562fef999dfb973b
Author: Thomas Vander Stichele <thomas (at) apestaart (dot) org>
Date:   Sat Apr 25 09:51:12 2009 +0000

    	* morituri/test/test_common_renamer.py (added):
    	* morituri/common/renamer.py (added):
    	  Add a way of doing transactional file renames, as well as their
    	  metafile updates.

diff --git a/ChangeLog b/ChangeLog
index df232b9..76fd581 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+2009-04-25  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+	* morituri/test/test_common_renamer.py (added):
+	* morituri/common/renamer.py (added):
+	  Add a way of doing transactional file renames, as well as their
+	  metafile updates.
+
 2009-04-21  Thomas Vander Stichele  <thomas at apestaart dot org>
 
 	* morituri/common/task.py:
diff --git a/morituri/common/renamer.py b/morituri/common/renamer.py
new file mode 100644
index 0000000..6bf5e49
--- /dev/null
+++ b/morituri/common/renamer.py
@@ -0,0 +1,224 @@
+# -*- Mode: Python; test-case-name: morituri.test.test_common_renamer -*-
+# vi:si:et:sw=4:sts=4:ts=4
+
+# Morituri - for those about to RIP
+
+# Copyright (C) 2009 Thomas Vander Stichele
+
+# This file is part of morituri.
+# 
+# morituri is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# 
+# morituri is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with morituri.  If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import tempfile
+
+"""
+Rename files on file system and inside metafiles in a resumable way.
+"""
+
+
+class Operator(object):
+    def __init__(self, statePath, key):
+        self._todo = []
+        self._done = []
+        self._statePath = statePath
+        self._key = key
+        self._resuming = False
+
+    def addOperation(self, operation):
+        """
+        Add an operation.
+        """
+        self._todo.append(operation)
+
+    def load(self):
+        """
+        Load state from the given state path using the given key.
+        Verifies the state.
+        """
+        todo = os.path.join(self._statePath, self._key + '.todo')
+        handle = open(todo, 'r')
+        lines = []
+        for line in handle.readlines():
+            lines.append(line)
+            name, data = line.split(' ', 1)
+            cls = globals()[name]
+            operation = cls.deserialize(data)
+            self._todo.append(operation)
+
+        
+        done = os.path.join(self._statePath, self._key + '.done')
+        i = 0
+        if os.path.exists(done):
+            handle = open(done, 'r')
+            for i, line in enumerate(handle.readlines()):
+                assert line == lines[i], "line %s is different than %s" % (
+                    line, lines[i])
+                self._done.append(self._todo[i])
+
+        # last task done is i; check if the next one might have gotten done.
+        self._resuming = True
+
+
+    def save(self):
+        """
+        Saves the state to the given state path using the given key.
+        """
+        # only save todo first time
+        todo = os.path.join(self._statePath, self._key + '.todo')
+        if not os.path.exists(todo):
+            handle = open(todo, 'w')
+            for o in self._todo:
+                name = o.__class__.__name__
+                data = o.serialize()
+                handle.write('%s %s\n' % (name, data))
+            handle.close()
+
+        # save done every time
+        done = os.path.join(self._statePath, self._key + '.done')
+        handle = open(done, 'w')
+        for o in self._done:
+            name = o.__class__.__name__
+            data = o.serialize()
+            handle.write('%s %s\n' % (name, data))
+        handle.close()
+
+    def start(self):
+        """
+        Execute the operations
+        """
+
+    def next(self):
+        operation = self._todo[len(self._done)]
+        if self._resuming:
+            operation.redo()
+            self._resuming = False
+        else:
+            operation.do()
+
+        self._done.append(operation)
+        self.save()
+
+
+class FileRenamer(Operator):
+    def addRename(self, source, destination):
+        """
+        Add a rename operation.
+
+        @param source:      source filename
+        @type  source:      str
+        @param destination: destination filename
+        @type  destination: str
+        """
+
+
+class Operation(object):
+    def verify(self):
+        """
+        Check if the operation will succeed in the current conditions.
+        Consider this a pre-flight check.
+
+        Does not eliminate the need to handle errors as they happen.
+        """
+
+    def do(self):
+        """
+        Perform the operation.
+        """
+        pass
+
+    def redo(self):
+        """
+        Perform the operation, without knowing if it already has been
+        (partly) performed.
+        """
+        self.do()
+
+    def serialize(self):
+        """
+        Serialize the operation.
+        The return value should bu usable with L{deserialize}
+
+        @rtype: str
+        """
+
+    def deserialize(cls, data):
+        """
+        Deserialize the operation with the given operation data.
+
+        @type  data: str
+        """
+        raise NotImplementedError
+    deserialize = classmethod(deserialize)
+
+
+class RenameFile(Operation):
+    def __init__(self, source, destination):
+        self._source = source
+        self._destination = destination
+
+    def verify(self):
+        assert os.path.exists(self._source)
+        assert not os.path.exists(self._destination)
+
+    def do(self):
+        os.rename(self._source, self._destination)
+
+    def serialize(self):
+        return '"%s" "%s"' % (self._source, self._destination)
+
+    def deserialize(cls, data):
+        _, source, __, destination, ___ = data.split('"')
+        return RenameFile(source, destination)
+    deserialize = classmethod(deserialize)
+
+    def __eq__(self, other):
+        return self._source == other._source \
+            and self._destination == other._destination
+
+
+class RenameInFile(Operation):
+    def __init__(self, path, source, destination):
+        self._path = path
+        self._source = source
+        self._destination = destination
+
+    def verify(self):
+        assert os.path.exists(self._path)
+        # check if the source exists in the given file
+
+    def do(self):
+        input = open(self._path)
+        (fd, name) = tempfile.mkstemp(suffix='.morituri')
+
+        for s in input:
+            os.write(fd, s.replace(self._source, self._destination))
+
+        input.close()
+        os.close(fd)
+        os.rename(name, self._path)
+
+    def serialize(self):
+        return '"%s" "%s" "%s"' % (self._path, self._source, self._destination)
+
+    def deserialize(cls, data):
+        _, path, __, source, ___, destination, ____ = data.split('"')
+        return RenameInFile(path, source, destination)
+    deserialize = classmethod(deserialize)
+
+    def __eq__(self, other):
+        return self._source == other._source \
+            and self._destination == other._destination \
+            and self._path == other._path
diff --git a/morituri/test/test_common_renamer.py b/morituri/test/test_common_renamer.py
new file mode 100644
index 0000000..e2d824e
--- /dev/null
+++ b/morituri/test/test_common_renamer.py
@@ -0,0 +1,135 @@
+# -*- Mode: Python; test-case-name: morituri.test.test_image_cue -*-
+# vi:si:et:sw=4:sts=4:ts=4
+
+import os
+import stat
+import tempfile
+
+import unittest
+
+from morituri.common import renamer
+
+class RenameInFileTestcase(unittest.TestCase):
+    def setUp(self):
+        (fd, self._path) = tempfile.mkstemp(suffix='morituri')
+        os.write(fd, 'This is a test\nThis is another\n')
+        os.close(fd)
+
+    def testVerify(self):
+        o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+        self.assertEquals(o.verify(), None)
+        os.unlink(self._path)
+        self.assertRaises(AssertionError, o.verify)
+
+    def testDo(self):
+        o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+        o.do()
+        output = open(self._path).read()
+        self.assertEquals(output, 'That was some test\nThat was somenother\n')
+
+    def testSerialize(self):
+        o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+        data = o.serialize()
+        o2 = renamer.RenameInFile.deserialize(data)
+        o2.do()
+        output = open(self._path).read()
+        self.assertEquals(output, 'That was some test\nThat was somenother\n')
+        
+class RenameFileTestcase(unittest.TestCase):
+    def setUp(self):
+        (fd, self._source) = tempfile.mkstemp(suffix='morituri')
+        os.write(fd, 'This is a test\nThis is another\n')
+        os.close(fd)
+        (fd, self._destination) = tempfile.mkstemp(suffix='morituri')
+        os.close(fd)
+        os.unlink(self._destination)
+        self._operation = renamer.RenameFile(self._source, self._destination)
+
+    def testVerify(self):
+        self.assertEquals(self._operation.verify(), None)
+
+        handle = open(self._destination, 'w')
+        handle.close()
+        self.assertRaises(AssertionError, self._operation.verify)
+
+        os.unlink(self._destination)
+        self.assertEquals(self._operation.verify(), None)
+
+        os.unlink(self._source)
+        self.assertRaises(AssertionError, self._operation.verify)
+
+    def testDo(self):
+        self._operation.do()
+        output = open(self._destination).read()
+        self.assertEquals(output, 'This is a test\nThis is another\n')
+
+    def testSerialize(self):
+        data = self._operation.serialize()
+        o = renamer.RenameFile.deserialize(data)
+        o.do()
+        output = open(self._destination).read()
+        self.assertEquals(output, 'This is a test\nThis is another\n')
+  
+class OperatorTestCase(unittest.TestCase):
+    def setUp(self):
+        self._statePath = tempfile.mkdtemp(suffix='.morituri')
+        self._operator = renamer.Operator(self._statePath, 'test')
+
+        (fd, self._source) = tempfile.mkstemp(suffix='morituri')
+        os.write(fd, 'This is a test\nThis is another\n')
+        os.close(fd)
+        (fd, self._destination) = tempfile.mkstemp(suffix='morituri')
+        os.close(fd)
+        os.unlink(self._destination)
+        self._operator.addOperation(
+            renamer.RenameInFile(self._source, 'is is a', 'at was some'))
+        self._operator.addOperation(
+            renamer.RenameFile(self._source, self._destination))
+
+    def testLoadNoneDone(self):
+        self._operator.save()
+
+        o = renamer.Operator(self._statePath, 'test')
+        o.load()
+
+        self.assertEquals(o._todo, self._operator._todo)
+        self.assertEquals(o._done, [])
+
+    def testLoadOneDone(self):
+        self.assertEquals(len(self._operator._done), 0)
+        self._operator.save()
+        self._operator.next()
+        self.assertEquals(len(self._operator._done), 1)
+
+        o = renamer.Operator(self._statePath, 'test')
+        o.load()
+
+        self.assertEquals(len(o._done), 1)
+        self.assertEquals(o._todo, self._operator._todo)
+        self.assertEquals(o._done, self._operator._done)
+
+        # now continue
+        o.next()
+        self.assertEquals(len(o._done), 2)
+
+    def testLoadOneInterrupted(self):
+        self.assertEquals(len(self._operator._done), 0)
+        self._operator.save()
+
+        # cheat by doing a task without saving
+        self._operator._todo[0].do()
+
+        self.assertEquals(len(self._operator._done), 0)
+
+        o = renamer.Operator(self._statePath, 'test')
+        o.load()
+
+        self.assertEquals(len(o._done), 0)
+        self.assertEquals(o._todo, self._operator._todo)
+        self.assertEquals(o._done, self._operator._done)
+
+        # now continue, resuming
+        o.next()
+        self.assertEquals(len(o._done), 1)
+        o.next()
+        self.assertEquals(len(o._done), 2)

-- 
morituri packaging



More information about the pkg-multimedia-commits mailing list