[SCM] morituri/master: * morituri/test/test_common_renamer.py (added): * morituri/common/renamer.py (added): Add a way of doing transactional file renames, as well as their metafile updates.
js at users.alioth.debian.org
js at users.alioth.debian.org
Sun Oct 19 20:08:48 UTC 2014
The following commit has been merged in the master branch:
commit 0bbcac6d063f05611478d04f562fef999dfb973b
Author: Thomas Vander Stichele <thomas (at) apestaart (dot) org>
Date: Sat Apr 25 09:51:12 2009 +0000
* morituri/test/test_common_renamer.py (added):
* morituri/common/renamer.py (added):
Add a way of doing transactional file renames, as well as their
metafile updates.
diff --git a/ChangeLog b/ChangeLog
index df232b9..76fd581 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+2009-04-25 Thomas Vander Stichele <thomas at apestaart dot org>
+
+ * morituri/test/test_common_renamer.py (added):
+ * morituri/common/renamer.py (added):
+ Add a way of doing transactional file renames, as well as their
+ metafile updates.
+
2009-04-21 Thomas Vander Stichele <thomas at apestaart dot org>
* morituri/common/task.py:
diff --git a/morituri/common/renamer.py b/morituri/common/renamer.py
new file mode 100644
index 0000000..6bf5e49
--- /dev/null
+++ b/morituri/common/renamer.py
@@ -0,0 +1,224 @@
+# -*- Mode: Python; test-case-name: morituri.test.test_common_renamer -*-
+# vi:si:et:sw=4:sts=4:ts=4
+
+# Morituri - for those about to RIP
+
+# Copyright (C) 2009 Thomas Vander Stichele
+
+# This file is part of morituri.
+#
+# morituri is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# morituri is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with morituri. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import tempfile
+
+"""
+Rename files on file system and inside metafiles in a resumable way.
+"""
+
+
+class Operator(object):
+ def __init__(self, statePath, key):
+ self._todo = []
+ self._done = []
+ self._statePath = statePath
+ self._key = key
+ self._resuming = False
+
+ def addOperation(self, operation):
+ """
+ Add an operation.
+ """
+ self._todo.append(operation)
+
+ def load(self):
+ """
+ Load state from the given state path using the given key.
+ Verifies the state.
+ """
+ todo = os.path.join(self._statePath, self._key + '.todo')
+ handle = open(todo, 'r')
+ lines = []
+ for line in handle.readlines():
+ lines.append(line)
+ name, data = line.split(' ', 1)
+ cls = globals()[name]
+ operation = cls.deserialize(data)
+ self._todo.append(operation)
+
+
+ done = os.path.join(self._statePath, self._key + '.done')
+ i = 0
+ if os.path.exists(done):
+ handle = open(done, 'r')
+ for i, line in enumerate(handle.readlines()):
+ assert line == lines[i], "line %s is different than %s" % (
+ line, lines[i])
+ self._done.append(self._todo[i])
+
+ # last task done is i; check if the next one might have gotten done.
+ self._resuming = True
+
+
+ def save(self):
+ """
+ Saves the state to the given state path using the given key.
+ """
+ # only save todo first time
+ todo = os.path.join(self._statePath, self._key + '.todo')
+ if not os.path.exists(todo):
+ handle = open(todo, 'w')
+ for o in self._todo:
+ name = o.__class__.__name__
+ data = o.serialize()
+ handle.write('%s %s\n' % (name, data))
+ handle.close()
+
+ # save done every time
+ done = os.path.join(self._statePath, self._key + '.done')
+ handle = open(done, 'w')
+ for o in self._done:
+ name = o.__class__.__name__
+ data = o.serialize()
+ handle.write('%s %s\n' % (name, data))
+ handle.close()
+
+ def start(self):
+ """
+ Execute the operations
+ """
+
+ def next(self):
+ operation = self._todo[len(self._done)]
+ if self._resuming:
+ operation.redo()
+ self._resuming = False
+ else:
+ operation.do()
+
+ self._done.append(operation)
+ self.save()
+
+
+class FileRenamer(Operator):
+ def addRename(self, source, destination):
+ """
+ Add a rename operation.
+
+ @param source: source filename
+ @type source: str
+ @param destination: destination filename
+ @type destination: str
+ """
+
+
+class Operation(object):
+ def verify(self):
+ """
+ Check if the operation will succeed in the current conditions.
+ Consider this a pre-flight check.
+
+ Does not eliminate the need to handle errors as they happen.
+ """
+
+ def do(self):
+ """
+ Perform the operation.
+ """
+ pass
+
+ def redo(self):
+ """
+ Perform the operation, without knowing if it already has been
+ (partly) performed.
+ """
+ self.do()
+
+ def serialize(self):
+ """
+ Serialize the operation.
+ The return value should bu usable with L{deserialize}
+
+ @rtype: str
+ """
+
+ def deserialize(cls, data):
+ """
+ Deserialize the operation with the given operation data.
+
+ @type data: str
+ """
+ raise NotImplementedError
+ deserialize = classmethod(deserialize)
+
+
+class RenameFile(Operation):
+ def __init__(self, source, destination):
+ self._source = source
+ self._destination = destination
+
+ def verify(self):
+ assert os.path.exists(self._source)
+ assert not os.path.exists(self._destination)
+
+ def do(self):
+ os.rename(self._source, self._destination)
+
+ def serialize(self):
+ return '"%s" "%s"' % (self._source, self._destination)
+
+ def deserialize(cls, data):
+ _, source, __, destination, ___ = data.split('"')
+ return RenameFile(source, destination)
+ deserialize = classmethod(deserialize)
+
+ def __eq__(self, other):
+ return self._source == other._source \
+ and self._destination == other._destination
+
+
+class RenameInFile(Operation):
+ def __init__(self, path, source, destination):
+ self._path = path
+ self._source = source
+ self._destination = destination
+
+ def verify(self):
+ assert os.path.exists(self._path)
+ # check if the source exists in the given file
+
+ def do(self):
+ input = open(self._path)
+ (fd, name) = tempfile.mkstemp(suffix='.morituri')
+
+ for s in input:
+ os.write(fd, s.replace(self._source, self._destination))
+
+ input.close()
+ os.close(fd)
+ os.rename(name, self._path)
+
+ def serialize(self):
+ return '"%s" "%s" "%s"' % (self._path, self._source, self._destination)
+
+ def deserialize(cls, data):
+ _, path, __, source, ___, destination, ____ = data.split('"')
+ return RenameInFile(path, source, destination)
+ deserialize = classmethod(deserialize)
+
+ def __eq__(self, other):
+ return self._source == other._source \
+ and self._destination == other._destination \
+ and self._path == other._path
diff --git a/morituri/test/test_common_renamer.py b/morituri/test/test_common_renamer.py
new file mode 100644
index 0000000..e2d824e
--- /dev/null
+++ b/morituri/test/test_common_renamer.py
@@ -0,0 +1,135 @@
+# -*- Mode: Python; test-case-name: morituri.test.test_image_cue -*-
+# vi:si:et:sw=4:sts=4:ts=4
+
+import os
+import stat
+import tempfile
+
+import unittest
+
+from morituri.common import renamer
+
+class RenameInFileTestcase(unittest.TestCase):
+ def setUp(self):
+ (fd, self._path) = tempfile.mkstemp(suffix='morituri')
+ os.write(fd, 'This is a test\nThis is another\n')
+ os.close(fd)
+
+ def testVerify(self):
+ o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+ self.assertEquals(o.verify(), None)
+ os.unlink(self._path)
+ self.assertRaises(AssertionError, o.verify)
+
+ def testDo(self):
+ o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+ o.do()
+ output = open(self._path).read()
+ self.assertEquals(output, 'That was some test\nThat was somenother\n')
+
+ def testSerialize(self):
+ o = renamer.RenameInFile(self._path, 'is is a', 'at was some')
+ data = o.serialize()
+ o2 = renamer.RenameInFile.deserialize(data)
+ o2.do()
+ output = open(self._path).read()
+ self.assertEquals(output, 'That was some test\nThat was somenother\n')
+
+class RenameFileTestcase(unittest.TestCase):
+ def setUp(self):
+ (fd, self._source) = tempfile.mkstemp(suffix='morituri')
+ os.write(fd, 'This is a test\nThis is another\n')
+ os.close(fd)
+ (fd, self._destination) = tempfile.mkstemp(suffix='morituri')
+ os.close(fd)
+ os.unlink(self._destination)
+ self._operation = renamer.RenameFile(self._source, self._destination)
+
+ def testVerify(self):
+ self.assertEquals(self._operation.verify(), None)
+
+ handle = open(self._destination, 'w')
+ handle.close()
+ self.assertRaises(AssertionError, self._operation.verify)
+
+ os.unlink(self._destination)
+ self.assertEquals(self._operation.verify(), None)
+
+ os.unlink(self._source)
+ self.assertRaises(AssertionError, self._operation.verify)
+
+ def testDo(self):
+ self._operation.do()
+ output = open(self._destination).read()
+ self.assertEquals(output, 'This is a test\nThis is another\n')
+
+ def testSerialize(self):
+ data = self._operation.serialize()
+ o = renamer.RenameFile.deserialize(data)
+ o.do()
+ output = open(self._destination).read()
+ self.assertEquals(output, 'This is a test\nThis is another\n')
+
+class OperatorTestCase(unittest.TestCase):
+ def setUp(self):
+ self._statePath = tempfile.mkdtemp(suffix='.morituri')
+ self._operator = renamer.Operator(self._statePath, 'test')
+
+ (fd, self._source) = tempfile.mkstemp(suffix='morituri')
+ os.write(fd, 'This is a test\nThis is another\n')
+ os.close(fd)
+ (fd, self._destination) = tempfile.mkstemp(suffix='morituri')
+ os.close(fd)
+ os.unlink(self._destination)
+ self._operator.addOperation(
+ renamer.RenameInFile(self._source, 'is is a', 'at was some'))
+ self._operator.addOperation(
+ renamer.RenameFile(self._source, self._destination))
+
+ def testLoadNoneDone(self):
+ self._operator.save()
+
+ o = renamer.Operator(self._statePath, 'test')
+ o.load()
+
+ self.assertEquals(o._todo, self._operator._todo)
+ self.assertEquals(o._done, [])
+
+ def testLoadOneDone(self):
+ self.assertEquals(len(self._operator._done), 0)
+ self._operator.save()
+ self._operator.next()
+ self.assertEquals(len(self._operator._done), 1)
+
+ o = renamer.Operator(self._statePath, 'test')
+ o.load()
+
+ self.assertEquals(len(o._done), 1)
+ self.assertEquals(o._todo, self._operator._todo)
+ self.assertEquals(o._done, self._operator._done)
+
+ # now continue
+ o.next()
+ self.assertEquals(len(o._done), 2)
+
+ def testLoadOneInterrupted(self):
+ self.assertEquals(len(self._operator._done), 0)
+ self._operator.save()
+
+ # cheat by doing a task without saving
+ self._operator._todo[0].do()
+
+ self.assertEquals(len(self._operator._done), 0)
+
+ o = renamer.Operator(self._statePath, 'test')
+ o.load()
+
+ self.assertEquals(len(o._done), 0)
+ self.assertEquals(o._todo, self._operator._todo)
+ self.assertEquals(o._done, self._operator._done)
+
+ # now continue, resuming
+ o.next()
+ self.assertEquals(len(o._done), 1)
+ o.next()
+ self.assertEquals(len(o._done), 2)
--
morituri packaging
More information about the pkg-multimedia-commits
mailing list