[python-debian/jsw/apt-pkg-without-shared-storage] Better implementation of apt_pkg handling in iter_paragraphs

John Wright jsw at debian.org
Tue Nov 4 21:23:33 UTC 2008


- New test case for all three shared_storage/use_apt_pkg possibilities.
- Order is now preserved when use_apt_pkg is True and shared_storage is
  False.
- Re-enabled the use of files/sequences of lines with the
  _gpg_multivalued class, by using the some of the logic originally in
  the Deb822.gpg_stripped_paragraph method (which has been refactored
  into a more general method, split_gpg_and_payload).
---
 debian_bundle/deb822.py |  109 +++++++++++++++++++++++++++++-----------------
 tests/test_deb822.py    |   49 +++++++++++++++++++--
 2 files changed, 113 insertions(+), 45 deletions(-)

diff --git a/debian_bundle/deb822.py b/debian_bundle/deb822.py
index 08d89b8..4527ac7 100644
--- a/debian_bundle/deb822.py
+++ b/debian_bundle/deb822.py
@@ -4,7 +4,7 @@
 # (.changes, .dsc, Packages, Sources, etc)
 #
 # Copyright (C) 2005-2006  dann frazier <dannf at dannf.org>
-# Copyright (C) 2006       John Wright <john at movingsucks.org>
+# Copyright (C) 2006-2008  John Wright <john at johnwright.org>
 # Copyright (C) 2006       Adeodato Simó <dato at net.com.org.es>
 # Copyright (C) 2008       Stefano Zacchiroli <zack at upsilon.cc>
 #
@@ -190,9 +190,15 @@ class Deb822(Deb822Dict):
             parser = apt_pkg.ParseTagFile(sequence)
             while parser.Step() == 1:
                 if shared_storage:
-                    yield cls(fields=fields, _parsed=parser.Section)
+                    parsed = parser.Section
                 else:
-                    yield cls(fields=fields, sequence=dict(parser.Section))
+                    # Since parser.Section doesn't have an items method, we
+                    # need to imitate that method here and make a Deb822Dict
+                    # from the result in order to preserve order.
+                    items = [(key, parser.Section[key])
+                             for key in parser.Section.keys()]
+                    parsed = Deb822Dict(items)
+                yield cls(fields=fields, _parsed=parsed)
 
         else:
             iterable = iter(sequence)
@@ -360,8 +366,16 @@ class Deb822(Deb822Dict):
         return merged
     ###
 
-    def gpg_stripped_paragraph(sequence):
+    def split_gpg_and_payload(sequence):
+        """Return a (gpg_pre, payload, gpg_post) tuple
+        
+        Each element of the returned tuple is a list of lines (with trailing
+        whitespace stripped).
+        """
+
+        gpg_pre_lines = []
         lines = []
+        gpg_post_lines = []
         state = 'SAFE'
         gpgre = re.compile(r'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$')
         blank_line = re.compile('^$')
@@ -384,20 +398,40 @@ class Deb822(Deb822Dict):
                     if not blank_line.match(line):
                         lines.append(line)
                     else:
-                        break
-                elif state == 'SIGNED MESSAGE' and blank_line.match(line):
-                    state = 'SAFE'
-            elif m.group('action') == 'BEGIN':
-                state = m.group('what')
-            elif m.group('action') == 'END':
-                state = 'SAFE'
+                        if not gpg_pre_lines:
+                            # There's no gpg signature, so we should stop at
+                            # this blank line
+                            break
+                elif state == 'SIGNED MESSAGE':
+                    if blank_line.match(line):
+                        state = 'SAFE'
+                    else:
+                        gpg_pre_lines.append(line)
+                elif state == 'SIGNATURE':
+                    gpg_post_lines.append(line)
+            else:
+                if m.group('action') == 'BEGIN':
+                    state = m.group('what')
+                elif m.group('action') == 'END':
+                    gpg_post_lines.append(line)
+                    break
+                if not blank_line.match(line):
+                    if not lines:
+                        gpg_pre_lines.append(line)
+                    else:
+                        gpg_post_lines.append(line)
 
         if len(lines):
-            return lines
+            return (gpg_pre_lines, lines, gpg_post_lines)
         else:
             raise EOFError('only blank lines found in input')
 
-    gpg_stripped_paragraph = staticmethod(gpg_stripped_paragraph)
+    split_gpg_and_payload = staticmethod(split_gpg_and_payload)
+
+    def gpg_stripped_paragraph(cls, sequence):
+        return cls.split_gpg_and_payload(sequence)[1]
+
+    gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph)
 
     def get_gpg_info(self):
         """Return a GpgInfo object with GPG signature information
@@ -778,9 +812,8 @@ class _gpg_multivalued(_multivalued):
     gpg can verify the signature.  Use it just like you would use the
     _multivalued class.
 
-    Currently, this class will only store the raw text if you pass it a string
-    as input.  Files and sequences of lines are not supported, until we can
-    figure out a sane way to do it without breaking Deb822.iter_paragraphs.
+    This class only stores raw text if it detects a gpg signature (see
+    Deb822.split_gpg_and_payload for details).
     """
 
     def __init__(self, *args, **kwargs):
@@ -792,30 +825,26 @@ class _gpg_multivalued(_multivalued):
         if sequence is not None:
             if isinstance(sequence, basestring):
                 self.raw_text = sequence
-            # XXX: to support sequences, we need to not gobble up more than one
-            # paragraph at a time, in case the sequence is really something
-            # like a Packages or Sources file.  But it's not as simple as
-            # stopping at blank lines, since GPG signatures have blank lines,
-            # too.  So we just don't support it for now.  Below is code that
-            # would work if we didn't care about that...
-            #elif hasattr(sequence, "items"):
-            #    # sequence is actually a dict(-like) object, so we don't have
-            #    # the raw text.
-            #    pass
-            #else:
-            #    # If this is a sequence of lines without trailing \n's, we'll
-            #    # need to add them back for raw_text
-            #    lines = [line for line in sequence]
-            #    if lines and lines[0].endswith("\n"):
-            #        self.raw_text = "".join(lines)
-            #    else:
-            #        self.raw_text = "\n".join(lines)
-            #
-            #    try:
-            #        args = list(args)
-            #        args[0] = lines
-            #    except IndexError:
-            #        kwargs["sequence"] = lines
+            elif hasattr(sequence, "items"):
+                # sequence is actually a dict(-like) object, so we don't have
+                # the raw text.
+                pass
+            else:
+                gpg_pre_lines, lines, gpg_post_lines = \
+                        self.split_gpg_and_payload(sequence)
+                if gpg_pre_lines and gpg_post_lines:
+                    raw_text = StringIO.StringIO()
+                    raw_text.write("\n".join(gpg_pre_lines))
+                    raw_text.write("\n\n")
+                    raw_text.write("\n".join(lines))
+                    raw_text.write("\n\n")
+                    raw_text.write("\n".join(gpg_post_lines))
+                    self.raw_text = raw_text.getvalue()
+                try:
+                    args = list(args)
+                    args[0] = lines
+                except IndexError:
+                    kwargs["sequence"] = lines
 
         _multivalued.__init__(self, *args, **kwargs)
 
diff --git a/tests/test_deb822.py b/tests/test_deb822.py
index a567bb7..c4ef1f6 100755
--- a/tests/test_deb822.py
+++ b/tests/test_deb822.py
@@ -345,17 +345,22 @@ class TestDeb822(unittest.TestCase):
             return
 
         unparsed_with_gpg = SIGNED_CHECKSUM_CHANGES_FILE % CHECKSUM_CHANGES_FILE
-        deb822_ = deb822.Dsc(unparsed_with_gpg)
+        deb822_from_str = deb822.Dsc(unparsed_with_gpg)
+        result_from_str = deb822_from_str.get_gpg_info()
+        deb822_from_file = deb822.Dsc(StringIO(unparsed_with_gpg))
+        result_from_file = deb822_from_file.get_gpg_info()
+        deb822_from_lines = deb822.Dsc(unparsed_with_gpg.splitlines())
+        result_from_lines = deb822_from_lines.get_gpg_info()
         valid = {'GOODSIG':  ['D14219877A786561', 'John Wright <john.wright at hp.com>'],
                  'VALIDSIG': ['8FEFE900783CF175827C2F65D14219877A786561', '2008-05-01',
                               '1209623566', '0', '3', '0', '17', '2', '01',
                               '8FEFE900783CF175827C2F65D14219877A786561'],
                  'SIG_ID':   ['mQFnUWWR1Gr6itMV7Bx5L4N60Wo', '2008-05-01', '1209623566']}
-        result = deb822_.get_gpg_info()
 
-        self.assertEqual(len(result.keys()), len(valid.keys()))
-        for k,v in valid.items():
-            self.assertEqual(''.join(v), ''.join(result[k]))
+        for result in result_from_str, result_from_file, result_from_lines:
+            self.assertEqual(len(result.keys()), len(valid.keys()))
+            for k,v in valid.items():
+                self.assertEqual(''.join(v), ''.join(result[k]))
 
     def test_iter_paragraphs_array(self):
         text = (UNPARSED_PACKAGE + '\n\n\n' + UNPARSED_PACKAGE).splitlines()
@@ -374,8 +379,42 @@ class TestDeb822(unittest.TestCase):
             string = string % UNPARSED_PACKAGE
             text = (string + '\n\n\n' + string).splitlines()
 
+            count = 0
             for d in deb822.Deb822.iter_paragraphs(text):
+                count += 1
                 self.assertWellParsed(d, PARSED_PACKAGE)
+            self.assertEqual(count, 2)
+
+    def test_iter_paragraphs_shared_storage(self):
+        """Ensure consistency with the three possible iter_paragraph options"""
+        
+        f = open("test_Packages")
+        packages_content = f.read()
+        f.close()
+
+        combinations = [
+            {"use_apt_pkg": True, "shared_storage": True},
+            {"use_apt_pkg": True, "shared_storage": False},
+            {"use_apt_pkg": False, "shared_storage": False},
+        ]
+
+        for kwargs in combinations:
+            s = StringIO()
+            l = []
+            for p in deb822.Packages.iter_paragraphs(open("test_Packages"),
+                                                     **kwargs):
+                p.dump(s)
+                s.write("\n")
+                l.append(p)
+            self.assertEqual(s.getvalue(), packages_content)
+            if kwargs["shared_storage"] is False:
+                # If shared_storage is False, data should be consistent across
+                # iterations -- i.e. we can use "old" objects
+                s = StringIO()
+                for p in l:
+                    p.dump(s)
+                    s.write("\n")
+                self.assertEqual(s.getvalue(), packages_content)
 
     def test_parser_empty_input(self):
         self.assertEqual({}, deb822.Deb822([]))
-- 
1.5.5.GIT





More information about the pkg-python-debian-commits mailing list