[python-debian/master 30/36] Fix up the rest of debian.deb822 for Python 3 string handling.

Colin Watson cjwatson at canonical.com
Mon Oct 8 07:41:26 UTC 2012


This gets messy because there are some contexts where either bytes or
Unicode strings might be required:

 * GpgInfo requires byte strings so that it can check signatures without
   being broken by recoding.
 * In order to support mixed-encoding files, we have to ask
   apt_pkg.TagFile to just give us bytes so that we can do our own
   encoding detection without interference (requires a python-apt patch;
   http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=656288).
 * However, file objects given to us might have been opened either in
   binary or text mode, so we might get either bytes or str when reading
   from them.

With this patch, the tests pass with the minimum amount of invasive work
I could manage.
---
 lib/debian/deb822.py |  162 +++++++++++++++++++++++++++++++-------------------
 tests/test_deb822.py |    9 ++-
 2 files changed, 105 insertions(+), 66 deletions(-)

diff --git a/lib/debian/deb822.py b/lib/debian/deb822.py
index ffcce12..04e94c3 100644
--- a/lib/debian/deb822.py
+++ b/lib/debian/deb822.py
@@ -43,8 +43,9 @@ import warnings
 
 try:
     from StringIO import StringIO
+    BytesIO = StringIO
 except ImportError:
-    from io import StringIO
+    from io import BytesIO, StringIO
 try:
     from collections import Mapping, MutableMapping
     _mapping_mixin = Mapping
@@ -102,11 +103,11 @@ class TagSectionWrapper(_mapping_mixin, object):
 
         # Get just the stuff after the first ':'
         # Could use s.partition if we only supported python >= 2.5
-        data = s[s.find(':')+1:]
+        data = s[s.find(b':')+1:]
 
         # Get rid of spaces and tabs after the ':', but not newlines, and strip
         # off any newline at the end of the data.
-        return data.lstrip(' \t').rstrip('\n')
+        return data.lstrip(b' \t').rstrip(b'\n')
 
 
 class OrderedSet(object):
@@ -205,7 +206,31 @@ class Deb822Dict(_mutable_mapping_mixin, object):
                 self.__keys.extend([ _strI(k) for k in self.__parsed ])
             else:
                 self.__keys.extend([ _strI(f) for f in _fields if f in self.__parsed ])
-        
+
+    def _detect_encoding(self, value):
+        """If value is not already Unicode, decode it intelligently."""
+        if isinstance(value, bytes):
+            try:
+                return value.decode(self.encoding)
+            except UnicodeDecodeError as e:
+                # Evidently, the value wasn't encoded with the encoding the
+                # user specified.  Try detecting it.
+                warnings.warn('decoding from %s failed; attempting to detect '
+                              'the true encoding' % self.encoding,
+                              UnicodeWarning)
+                result = chardet.detect(value)
+                try:
+                    return value.decode(result['encoding'])
+                except UnicodeDecodeError:
+                    raise e
+                else:
+                    # Assume the rest of the paragraph is in this encoding as
+                    # well (there's no sense in repeating this exercise for
+                    # every field).
+                    self.encoding = result['encoding']
+        else:
+            return value
+
     ### BEGIN _mutable_mapping_mixin methods
 
     def __iter__(self):
@@ -230,28 +255,7 @@ class Deb822Dict(_mutable_mapping_mixin, object):
             else:
                 raise
 
-        if isinstance(value, bytes):
-            # Always return unicode objects instead of strings
-            try:
-                value = value.decode(self.encoding)
-            except UnicodeDecodeError as e:
-                # Evidently, the value wasn't encoded with the encoding the
-                # user specified.  Try detecting it.
-                warnings.warn('decoding from %s failed; attempting to detect '
-                              'the true encoding' % self.encoding,
-                              UnicodeWarning)
-                result = chardet.detect(value)
-                try:
-                    value = value.decode(result['encoding'])
-                except UnicodeDecodeError:
-                    raise e
-                else:
-                    # Assume the rest of the paragraph is in this encoding as
-                    # well (there's no sense in repeating this exercise for
-                    # every field).
-                    self.encoding = result['encoding']
-
-        return value
+        return self._detect_encoding(value)
 
     def __delitem__(self, key):
         key = _strI(key)
@@ -352,7 +356,15 @@ class Deb822(Deb822Dict):
         """
 
         if _have_apt_pkg and use_apt_pkg and _is_real_file(sequence):
-            parser = apt_pkg.TagFile(sequence)
+            kwargs = {}
+            if sys.version >= '3':
+                # bytes=True is supported for both Python 2 and 3, but we
+                # only actually need it for Python 3, so this saves us from
+                # having to require a newer version of python-apt for Python
+                # 2 as well.  This allows us to apply our own encoding
+                # handling, which is more tolerant of mixed-encoding files.
+                kwargs['bytes'] = True
+            parser = apt_pkg.TagFile(sequence, **kwargs)
             for section in parser:
                 paragraph = cls(fields=fields,
                                 _parsed=TagSectionWrapper(section),
@@ -379,11 +391,24 @@ class Deb822(Deb822Dict):
         """
         at_beginning = True
         for line in sequence:
-            if line.startswith('#'):
-                continue
-            if at_beginning:
-                if not line.rstrip('\r\n'):
+            # The bytes/str polymorphism required here to support Python 3
+            # is unpleasant, but fortunately limited.  We need this because
+            # at this point we might have been given either bytes or
+            # Unicode, and we haven't yet got to the point where we can try
+            # to decode a whole paragraph and detect its encoding.
+            if isinstance(line, bytes):
+                if line.startswith(b'#'):
                     continue
+            else:
+                if line.startswith('#'):
+                    continue
+            if at_beginning:
+                if isinstance(line, bytes):
+                    if not line.rstrip(b'\r\n'):
+                        continue
+                else:
+                    if not line.rstrip('\r\n'):
+                        continue
                 at_beginning = False
             yield line
 
@@ -396,7 +421,7 @@ class Deb822(Deb822Dict):
 
         wanted_field = lambda f: fields is None or f in fields
 
-        if isinstance(sequence, six.string_types):
+        if isinstance(sequence, (six.string_types, bytes)):
             sequence = sequence.splitlines()
 
         curkey = None
@@ -404,6 +429,8 @@ class Deb822(Deb822Dict):
 
         for line in self.gpg_stripped_paragraph(
                 self._skip_useless_lines(sequence)):
+            line = self._detect_encoding(line)
+
             m = single.match(line)
             if m:
                 if curkey:
@@ -593,13 +620,20 @@ class Deb822(Deb822Dict):
         gpg_pre_lines = []
         lines = []
         gpg_post_lines = []
-        state = 'SAFE'
-        gpgre = re.compile(r'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$')
-        blank_line = re.compile('^$')
+        state = b'SAFE'
+        gpgre = re.compile(br'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$')
+        blank_line = re.compile(b'^$')
         first_line = True
 
         for line in sequence:
-            line = line.strip('\r\n')
+            # Some consumers of this method require bytes (encoding
+            # detection and signature checking).  However, we might have
+            # been given a file opened in text mode, in which case it's
+            # simplest to encode to bytes.
+            if sys.version >= '3' and isinstance(line, str):
+                line = line.encode()
+
+            line = line.strip(b'\r\n')
 
             # skip initial blank lines, if any
             if first_line:
@@ -611,7 +645,7 @@ class Deb822(Deb822Dict):
             m = gpgre.match(line)
 
             if not m:
-                if state == 'SAFE':
+                if state == b'SAFE':
                     if not blank_line.match(line):
                         lines.append(line)
                     else:
@@ -619,17 +653,17 @@ class Deb822(Deb822Dict):
                             # There's no gpg signature, so we should stop at
                             # this blank line
                             break
-                elif state == 'SIGNED MESSAGE':
+                elif state == b'SIGNED MESSAGE':
                     if blank_line.match(line):
-                        state = 'SAFE'
+                        state = b'SAFE'
                     else:
                         gpg_pre_lines.append(line)
-                elif state == 'SIGNATURE':
+                elif state == b'SIGNATURE':
                     gpg_post_lines.append(line)
             else:
-                if m.group('action') == 'BEGIN':
+                if m.group('action') == b'BEGIN':
                     state = m.group('what')
-                elif m.group('action') == 'END':
+                elif m.group('action') == b'END':
                     gpg_post_lines.append(line)
                     break
                 if not blank_line.match(line):
@@ -760,7 +794,7 @@ class GpgInfo(dict):
     def from_sequence(cls, sequence, keyrings=None, executable=None):
         """Create a new GpgInfo object from the given sequence.
 
-        :param sequence: sequence of lines or a string
+        :param sequence: sequence of lines of bytes or a single byte string
 
         :param keyrings: list of keyrings to use (default:
             ['/usr/share/keyrings/debian-keyring.gpg'])
@@ -787,32 +821,30 @@ class GpgInfo(dict):
                              universal_newlines=True)
         # XXX what to do with exit code?
 
-        if isinstance(sequence, six.string_types):
+        if isinstance(sequence, bytes):
             inp = sequence
         else:
-            inp = cls._get_full_string(sequence)
-        if sys.version >= '3':
-            inp = inp.encode('UTF-8')
+            inp = cls._get_full_bytes(sequence)
         out, err = p.communicate(inp)
 
         return cls.from_output(out, err)
 
     @staticmethod
-    def _get_full_string(sequence):
-        """Return a string from a sequence of lines.
+    def _get_full_bytes(sequence):
+        """Return a byte string from a sequence of lines of bytes.
 
         This method detects if the sequence's lines are newline-terminated, and
-        constructs the string appropriately.
+        constructs the byte string appropriately.
         """
         # Peek at the first line to see if it's newline-terminated.
         sequence_iter = iter(sequence)
         try:
             first_line = next(sequence_iter)
         except StopIteration:
-            return ""
-        join_str = '\n'
-        if first_line.endswith('\n'):
-            join_str = ''
+            return b""
+        join_str = b'\n'
+        if first_line.endswith(b'\n'):
+            join_str = b''
         return first_line + join_str + join_str.join(sequence_iter)
 
     @classmethod
@@ -821,7 +853,7 @@ class GpgInfo(dict):
 
         See GpgInfo.from_sequence.
         """
-        with open(target) as target_file:
+        with open(target, 'rb') as target_file:
             return cls.from_sequence(target_file, *args, **kwargs)
 
 
@@ -1095,8 +1127,14 @@ class _gpg_multivalued(_multivalued):
             sequence = kwargs.get("sequence", None)
 
         if sequence is not None:
-            if isinstance(sequence, six.string_types):
+            if isinstance(sequence, bytes):
                 self.raw_text = sequence
+            elif isinstance(sequence, six.string_types):
+                # If the file is really in some other encoding, then this
+                # probably won't verify correctly, but this is the best we
+                # can reasonably manage.  For accurate verification, the
+                # file should be opened in binary mode.
+                self.raw_text = sequence.encode('utf-8')
             elif hasattr(sequence, "items"):
                 # sequence is actually a dict(-like) object, so we don't have
                 # the raw text.
@@ -1109,12 +1147,12 @@ class _gpg_multivalued(_multivalued):
                     # Empty input
                     gpg_pre_lines = lines = gpg_post_lines = []
                 if gpg_pre_lines and gpg_post_lines:
-                    raw_text = StringIO()
-                    raw_text.write("\n".join(gpg_pre_lines))
-                    raw_text.write("\n\n")
-                    raw_text.write("\n".join(lines))
-                    raw_text.write("\n\n")
-                    raw_text.write("\n".join(gpg_post_lines))
+                    raw_text = BytesIO()
+                    raw_text.write(b"\n".join(gpg_pre_lines))
+                    raw_text.write(b"\n\n")
+                    raw_text.write(b"\n".join(lines))
+                    raw_text.write(b"\n\n")
+                    raw_text.write(b"\n".join(gpg_post_lines))
                     self.raw_text = raw_text.getvalue()
                 try:
                     args = list(args)
diff --git a/tests/test_deb822.py b/tests/test_deb822.py
index fecd4b2..f665264 100755
--- a/tests/test_deb822.py
+++ b/tests/test_deb822.py
@@ -709,7 +709,7 @@ Description: python modules to work with Debian-related data formats
             objects.extend(deb822.Packages.iter_paragraphs(f))
         with open_utf8('test_Sources') as f:
             objects.extend(deb822.Deb822.iter_paragraphs(f))
-        with open('test_Sources.iso8859-1') as f:
+        with open('test_Sources.iso8859-1', 'rb') as f:
             objects.extend(deb822.Deb822.iter_paragraphs(
                 f, encoding="iso8859-1"))
         for d in objects:
@@ -732,7 +732,7 @@ Description: python modules to work with Debian-related data formats
     def test_encoding_integrity(self):
         with open_utf8('test_Sources') as f:
             utf8 = list(deb822.Deb822.iter_paragraphs(f))
-        with open('test_Sources.iso8859-1') as f:
+        with open('test_Sources.iso8859-1', 'rb') as f:
             latin1 = list(deb822.Deb822.iter_paragraphs(
                 f, encoding='iso8859-1'))
 
@@ -976,6 +976,7 @@ class TestGpgInfo(unittest.TestCase):
             os.path.exists('/usr/share/keyrings/debian-keyring.gpg'))
 
         self.data = SIGNED_CHECKSUM_CHANGES_FILE % CHECKSUM_CHANGES_FILE
+        self.data = self.data.encode()
         self.valid = {
             'GOODSIG':
                 ['D14219877A786561', 'John Wright <john.wright at hp.com>'],
@@ -1008,7 +1009,7 @@ class TestGpgInfo(unittest.TestCase):
         if not self.should_run:
             return
 
-        sequence = StringIO(self.data)
+        sequence = BytesIO(self.data)
         gpg_info = deb822.GpgInfo.from_sequence(sequence)
         self._validate_gpg_info(gpg_info)
 
@@ -1025,7 +1026,7 @@ class TestGpgInfo(unittest.TestCase):
             return
 
         fd, filename = tempfile.mkstemp()
-        fp = os.fdopen(fd, 'w')
+        fp = os.fdopen(fd, 'wb')
         fp.write(self.data)
         fp.close()
 
-- 
1.7.2.5





More information about the pkg-python-debian-commits mailing list