[python-debian/master] Better implementation of apt_pkg handling in iter_paragraphs
John Wright
jsw at debian.org
Tue Nov 4 21:23:33 UTC 2008
- New test case for all three shared_storage/use_apt_pkg possibilities.
- Order is now preserved when use_apt_pkg is True and shared_storage is
False.
- Re-enabled the use of files/sequences of lines with the
_gpg_multivalued class, by using the some of the logic originally in
the Deb822.gpg_stripped_paragraph method (which has been refactored
into a more general method, split_gpg_and_payload).
---
debian_bundle/deb822.py | 109 +++++++++++++++++++++++++++++-----------------
tests/test_deb822.py | 49 +++++++++++++++++++--
2 files changed, 113 insertions(+), 45 deletions(-)
diff --git a/debian_bundle/deb822.py b/debian_bundle/deb822.py
index 08d89b8..4527ac7 100644
--- a/debian_bundle/deb822.py
+++ b/debian_bundle/deb822.py
@@ -4,7 +4,7 @@
# (.changes, .dsc, Packages, Sources, etc)
#
# Copyright (C) 2005-2006 dann frazier <dannf at dannf.org>
-# Copyright (C) 2006 John Wright <john at movingsucks.org>
+# Copyright (C) 2006-2008 John Wright <john at johnwright.org>
# Copyright (C) 2006 Adeodato Simó <dato at net.com.org.es>
# Copyright (C) 2008 Stefano Zacchiroli <zack at upsilon.cc>
#
@@ -190,9 +190,15 @@ class Deb822(Deb822Dict):
parser = apt_pkg.ParseTagFile(sequence)
while parser.Step() == 1:
if shared_storage:
- yield cls(fields=fields, _parsed=parser.Section)
+ parsed = parser.Section
else:
- yield cls(fields=fields, sequence=dict(parser.Section))
+ # Since parser.Section doesn't have an items method, we
+ # need to imitate that method here and make a Deb822Dict
+ # from the result in order to preserve order.
+ items = [(key, parser.Section[key])
+ for key in parser.Section.keys()]
+ parsed = Deb822Dict(items)
+ yield cls(fields=fields, _parsed=parsed)
else:
iterable = iter(sequence)
@@ -360,8 +366,16 @@ class Deb822(Deb822Dict):
return merged
###
- def gpg_stripped_paragraph(sequence):
+ def split_gpg_and_payload(sequence):
+ """Return a (gpg_pre, payload, gpg_post) tuple
+
+ Each element of the returned tuple is a list of lines (with trailing
+ whitespace stripped).
+ """
+
+ gpg_pre_lines = []
lines = []
+ gpg_post_lines = []
state = 'SAFE'
gpgre = re.compile(r'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$')
blank_line = re.compile('^$')
@@ -384,20 +398,40 @@ class Deb822(Deb822Dict):
if not blank_line.match(line):
lines.append(line)
else:
- break
- elif state == 'SIGNED MESSAGE' and blank_line.match(line):
- state = 'SAFE'
- elif m.group('action') == 'BEGIN':
- state = m.group('what')
- elif m.group('action') == 'END':
- state = 'SAFE'
+ if not gpg_pre_lines:
+ # There's no gpg signature, so we should stop at
+ # this blank line
+ break
+ elif state == 'SIGNED MESSAGE':
+ if blank_line.match(line):
+ state = 'SAFE'
+ else:
+ gpg_pre_lines.append(line)
+ elif state == 'SIGNATURE':
+ gpg_post_lines.append(line)
+ else:
+ if m.group('action') == 'BEGIN':
+ state = m.group('what')
+ elif m.group('action') == 'END':
+ gpg_post_lines.append(line)
+ break
+ if not blank_line.match(line):
+ if not lines:
+ gpg_pre_lines.append(line)
+ else:
+ gpg_post_lines.append(line)
if len(lines):
- return lines
+ return (gpg_pre_lines, lines, gpg_post_lines)
else:
raise EOFError('only blank lines found in input')
- gpg_stripped_paragraph = staticmethod(gpg_stripped_paragraph)
+ split_gpg_and_payload = staticmethod(split_gpg_and_payload)
+
+ def gpg_stripped_paragraph(cls, sequence):
+ return cls.split_gpg_and_payload(sequence)[1]
+
+ gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph)
def get_gpg_info(self):
"""Return a GpgInfo object with GPG signature information
@@ -778,9 +812,8 @@ class _gpg_multivalued(_multivalued):
gpg can verify the signature. Use it just like you would use the
_multivalued class.
- Currently, this class will only store the raw text if you pass it a string
- as input. Files and sequences of lines are not supported, until we can
- figure out a sane way to do it without breaking Deb822.iter_paragraphs.
+ This class only stores raw text if it detects a gpg signature (see
+ Deb822.split_gpg_and_payload for details).
"""
def __init__(self, *args, **kwargs):
@@ -792,30 +825,26 @@ class _gpg_multivalued(_multivalued):
if sequence is not None:
if isinstance(sequence, basestring):
self.raw_text = sequence
- # XXX: to support sequences, we need to not gobble up more than one
- # paragraph at a time, in case the sequence is really something
- # like a Packages or Sources file. But it's not as simple as
- # stopping at blank lines, since GPG signatures have blank lines,
- # too. So we just don't support it for now. Below is code that
- # would work if we didn't care about that...
- #elif hasattr(sequence, "items"):
- # # sequence is actually a dict(-like) object, so we don't have
- # # the raw text.
- # pass
- #else:
- # # If this is a sequence of lines without trailing \n's, we'll
- # # need to add them back for raw_text
- # lines = [line for line in sequence]
- # if lines and lines[0].endswith("\n"):
- # self.raw_text = "".join(lines)
- # else:
- # self.raw_text = "\n".join(lines)
- #
- # try:
- # args = list(args)
- # args[0] = lines
- # except IndexError:
- # kwargs["sequence"] = lines
+ elif hasattr(sequence, "items"):
+ # sequence is actually a dict(-like) object, so we don't have
+ # the raw text.
+ pass
+ else:
+ gpg_pre_lines, lines, gpg_post_lines = \
+ self.split_gpg_and_payload(sequence)
+ if gpg_pre_lines and gpg_post_lines:
+ raw_text = StringIO.StringIO()
+ raw_text.write("\n".join(gpg_pre_lines))
+ raw_text.write("\n\n")
+ raw_text.write("\n".join(lines))
+ raw_text.write("\n\n")
+ raw_text.write("\n".join(gpg_post_lines))
+ self.raw_text = raw_text.getvalue()
+ try:
+ args = list(args)
+ args[0] = lines
+ except IndexError:
+ kwargs["sequence"] = lines
_multivalued.__init__(self, *args, **kwargs)
diff --git a/tests/test_deb822.py b/tests/test_deb822.py
index a567bb7..c4ef1f6 100755
--- a/tests/test_deb822.py
+++ b/tests/test_deb822.py
@@ -345,17 +345,22 @@ class TestDeb822(unittest.TestCase):
return
unparsed_with_gpg = SIGNED_CHECKSUM_CHANGES_FILE % CHECKSUM_CHANGES_FILE
- deb822_ = deb822.Dsc(unparsed_with_gpg)
+ deb822_from_str = deb822.Dsc(unparsed_with_gpg)
+ result_from_str = deb822_from_str.get_gpg_info()
+ deb822_from_file = deb822.Dsc(StringIO(unparsed_with_gpg))
+ result_from_file = deb822_from_file.get_gpg_info()
+ deb822_from_lines = deb822.Dsc(unparsed_with_gpg.splitlines())
+ result_from_lines = deb822_from_lines.get_gpg_info()
valid = {'GOODSIG': ['D14219877A786561', 'John Wright <john.wright at hp.com>'],
'VALIDSIG': ['8FEFE900783CF175827C2F65D14219877A786561', '2008-05-01',
'1209623566', '0', '3', '0', '17', '2', '01',
'8FEFE900783CF175827C2F65D14219877A786561'],
'SIG_ID': ['mQFnUWWR1Gr6itMV7Bx5L4N60Wo', '2008-05-01', '1209623566']}
- result = deb822_.get_gpg_info()
- self.assertEqual(len(result.keys()), len(valid.keys()))
- for k,v in valid.items():
- self.assertEqual(''.join(v), ''.join(result[k]))
+ for result in result_from_str, result_from_file, result_from_lines:
+ self.assertEqual(len(result.keys()), len(valid.keys()))
+ for k,v in valid.items():
+ self.assertEqual(''.join(v), ''.join(result[k]))
def test_iter_paragraphs_array(self):
text = (UNPARSED_PACKAGE + '\n\n\n' + UNPARSED_PACKAGE).splitlines()
@@ -374,8 +379,42 @@ class TestDeb822(unittest.TestCase):
string = string % UNPARSED_PACKAGE
text = (string + '\n\n\n' + string).splitlines()
+ count = 0
for d in deb822.Deb822.iter_paragraphs(text):
+ count += 1
self.assertWellParsed(d, PARSED_PACKAGE)
+ self.assertEqual(count, 2)
+
+ def test_iter_paragraphs_shared_storage(self):
+ """Ensure consistency with the three possible iter_paragraph options"""
+
+ f = open("test_Packages")
+ packages_content = f.read()
+ f.close()
+
+ combinations = [
+ {"use_apt_pkg": True, "shared_storage": True},
+ {"use_apt_pkg": True, "shared_storage": False},
+ {"use_apt_pkg": False, "shared_storage": False},
+ ]
+
+ for kwargs in combinations:
+ s = StringIO()
+ l = []
+ for p in deb822.Packages.iter_paragraphs(open("test_Packages"),
+ **kwargs):
+ p.dump(s)
+ s.write("\n")
+ l.append(p)
+ self.assertEqual(s.getvalue(), packages_content)
+ if kwargs["shared_storage"] is False:
+ # If shared_storage is False, data should be consistent across
+ # iterations -- i.e. we can use "old" objects
+ s = StringIO()
+ for p in l:
+ p.dump(s)
+ s.write("\n")
+ self.assertEqual(s.getvalue(), packages_content)
def test_parser_empty_input(self):
self.assertEqual({}, deb822.Deb822([]))
--
1.5.5.GIT
More information about the pkg-python-debian-commits
mailing list