[Python-apps-commits] r12283 - in packages/s3ql/trunk/debian/patches (2 files)

nikratio-guest at users.alioth.debian.org nikratio-guest at users.alioth.debian.org
Sun Aug 23 04:24:25 UTC 2015


    Date: Sunday, August 23, 2015 @ 04:24:23
  Author: nikratio-guest
Revision: 12283

Added (still untested) patch to allow upgrade of file systems
created with S3QL in jessie.

Added:
  packages/s3ql/trunk/debian/patches/support_jessie_upgrade.diff
Modified:
  packages/s3ql/trunk/debian/patches/series

Modified: packages/s3ql/trunk/debian/patches/series
===================================================================
--- packages/s3ql/trunk/debian/patches/series	2015-08-22 18:55:58 UTC (rev 12282)
+++ packages/s3ql/trunk/debian/patches/series	2015-08-23 04:24:23 UTC (rev 12283)
@@ -1,2 +1,3 @@
 proc_mount.diff
 clock-granularity.diff
+support_jessie_upgrade.diff

Added: packages/s3ql/trunk/debian/patches/support_jessie_upgrade.diff
===================================================================
--- packages/s3ql/trunk/debian/patches/support_jessie_upgrade.diff	                        (rev 0)
+++ packages/s3ql/trunk/debian/patches/support_jessie_upgrade.diff	2015-08-23 04:24:23 UTC (rev 12283)
@@ -0,0 +1,837 @@
+Description: Allow upgrade of file systems created with jessie's s3ql
+Origin: debian
+Forwarded: not-needed
+Last-Update: <2015-08-22>
+Author: Nikolaus Rath <Nikolaus at rath.org>
+
+Upstream has dropped support for upgrading file systems created with
+the S3QL version in jessie. This patch forward-ports this capability.
+It is based on upstream's Mercurial commit 773931c43368.
+
+
+--- a/src/s3ql/adm.py
++++ b/src/s3ql/adm.py
+@@ -11,11 +11,15 @@
+ from .backends.comprenc import ComprencBackend
+ from .database import Connection
+ from .common import (get_backend_cachedir, get_seq_no, is_mounted, get_backend,
+-                     freeze_basic_mapping, load_params)
++                     freeze_basic_mapping, load_params, AsyncFn,
++                     get_backend_factory, pretty_print_size, split_by_n,
++                     handle_on_return)
+ from .metadata import dump_and_upload_metadata, download_metadata
+ from .parse_args import ArgumentParser
+ from datetime import datetime as Datetime
+ from getpass import getpass
++from base64 import b64encode
++from queue import Queue, Full as QueueFull
+ import os
+ import shutil
+ import sys
+@@ -83,12 +87,11 @@
+     if options.action == 'clear':
+         with get_backend(options, raw=True) as backend:
+             return clear(backend, options)
++    elif options.action == 'upgrade':
++        return upgrade(options)
+ 
+     with get_backend(options) as backend:
+-        if options.action == 'upgrade':
+-            return upgrade(backend, get_backend_cachedir(options.storage_url,
+-                                                          options.cachedir))
+-        elif options.action == 'passphrase':
++        if options.action == 'passphrase':
+             return change_passphrase(backend)
+ 
+         elif options.action == 'download-metadata':
+@@ -204,11 +207,24 @@
+         ''' % { 'version': REV_VER_MAP[rev],
+                 'prog': prog })
+ 
+-def upgrade(backend, cachepath):
++
++ at handle_on_return
++def upgrade(options, on_return):
+     '''Upgrade file system to newest revision'''
+ 
+     log.info('Getting file system parameters..')
+ 
++    from . import backends
++    backends.local.SUPPORT_LEGACY_FORMAT = True
++    backends.s3c.SUPPORT_LEGACY_FORMAT = True
++    backends.comprenc.SUPPORT_LEGACY_FORMAT = True
++
++    cachepath = get_backend_cachedir(options.storage_url, options.cachedir)
++
++    backend_factory = get_backend_factory(options.storage_url, options.backend_options,
++                                          options.authfile)
++    backend = on_return.enter_context(backend_factory())
++
+     # Check for cached metadata
+     db = None
+     seq_no = get_seq_no(backend)
+@@ -247,10 +263,7 @@
+         raise QuietError()
+ 
+     # Check revision
+-    # Upgrade from 21 to 22 is only possible with release 2.13,
+-    # because we removed support for reading the old storage object
+-    # format after 2.13.
+-    if param['revision'] == 21 or param['revision'] < CURRENT_FS_REV-1:
++    if param['revision'] < CURRENT_FS_REV-1:
+         print(textwrap.dedent('''
+             File system revision too old to upgrade!
+ 
+@@ -292,7 +305,19 @@
+     param['last-modified'] = time.time()
+     param['seq_no'] += 1
+ 
+-    # Upgrade code goes here
++    # Ensure that there are backups of the master key
++    if backend.passphrase is not None:
++        data_pw = backend.passphrase
++        backend.passphrase = backend.fs_passphrase
++        for i in range(1,4):
++            obj_id = 's3ql_passphrase_bak%d' % i
++            if obj_id not in backend:
++                backend[obj_id] = data_pw
++        backend.passphrase = data_pw
++
++    # Upgrade all objects, so that we can remove legacy conversion
++    # routines in the next release.
++    update_obj_metadata(backend, backend_factory, db, options.threads)
+ 
+     dump_and_upload_metadata(backend, db, param)
+ 
+@@ -307,5 +332,153 @@
+ 
+     print('File system upgrade complete.')
+ 
++    if backend.passphrase is not None:
++        print('Please store the following master key in a safe location. It allows ',
++              'decryption of the S3QL file system in case the storage objects holding ',
++              'this information get corrupted:',
++              '---BEGIN MASTER KEY---',
++              ' '.join(split_by_n(b64encode(backend.passphrase).decode(), 4)),
++              '---END MASTER KEY---',
++              sep='\n')
++
++def update_obj_metadata(backend, backend_factory, db, thread_count):
++    '''Upgrade metadata of storage objects'''
++
++    plain_backend = backend.backend
++
++    # No need to update sequence number, since we are going to
++    # write out a new one after the upgrade.
++    if backend.passphrase is None:
++        extra_objects = { 's3ql_metadata' }
++    else:
++        extra_objects = { 's3ql_metadata',
++                          's3ql_passphrase', 's3ql_passphrase_bak1',
++                          's3ql_passphrase_bak2', 's3ql_passphrase_bak3' }
++
++    for i in range(30):
++        obj_id = 's3ql_metadata_bak_%d' % i
++        if obj_id in plain_backend:
++            extra_objects.add(obj_id)
++
++    def yield_objects():
++        for (id_,) in db.query('SELECT id FROM objects'):
++            yield 's3ql_data_%d' % id_
++        for obj_id in extra_objects:
++            yield obj_id
++    total = db.get_val('SELECT COUNT(id) FROM objects') + len(extra_objects)
++
++    queue = Queue(maxsize=thread_count)
++    threads = []
++    for _ in range(thread_count):
++        t = AsyncFn(upgrade_loop, queue, backend_factory)
++        # Don't wait for worker threads, gives deadlock if main thread
++        # terminates with exception
++        t.daemon = True
++        t.start()
++        threads.append(t)
++
++    # Updating this value is prone to race conditions. However,
++    # we don't care because this is for an approximate progress
++    # output only.
++    queue.rewrote_size = 0
++    stamp = 0
++    for (i, obj_id) in enumerate(yield_objects()):
++        stamp2 = time.time()
++        if stamp2 - stamp > 1:
++            sys.stdout.write('\r..processed %d/%d objects (%.1f%%, %s rewritten)..'
++                             % (i, total, i/total*100,
++                                pretty_print_size(queue.rewrote_size)))
++            sys.stdout.flush()
++            stamp = stamp2
++
++            # Terminate early if any thread failed with an exception
++            for t in threads:
++                if not t.is_alive():
++                    t.join_and_raise()
++
++        # Avoid blocking if all threads terminated
++        while True:
++            try:
++                queue.put(obj_id, timeout=1)
++            except QueueFull:
++                pass
++            else:
++                break
++            for t in threads:
++                if not t.is_alive():
++                    t.join_and_raise()
++
++    queue.maxsize += len(threads)
++    for t in threads:
++        queue.put(None)
++
++    for t in threads:
++        t.join_and_raise()
++
++    sys.stdout.write('\n')
++
++def upgrade_loop(queue, backend_factory):
++
++    with backend_factory() as backend:
++        plain_backend = backend.backend
++        while True:
++            obj_id = queue.get()
++            if obj_id is None:
++                break
++
++            meta = plain_backend.lookup(obj_id)
++            if meta.get('format_version', 0) == 2:
++                continue
++
++            # For important objects, we make a copy first (just to be safe)
++            if not obj_id.startswith('s3ql_data'):
++                plain_backend.copy(obj_id, 's3ql_pre2.13' + obj_id[4:])
++
++            # When reading passphrase objects, we have to use the
++            # "outer" password
++            if obj_id.startswith('s3ql_passphrase'):
++                data_pw = backend.passphrase
++                backend.passphrase = backend.fs_passphrase
++
++            meta = backend._convert_legacy_metadata(meta)
++            if meta['encryption'] == 'AES':
++                # Two statements to reduce likelihood of update races
++                size = rewrite_legacy_object(backend, obj_id)
++                queue.rewrote_size += size
++            else:
++                plain_backend.update_meta(obj_id, meta)
++
++            if obj_id.startswith('s3ql_passphrase'):
++                backend.passphrase = data_pw
++
++def rewrite_legacy_object(backend, obj_id):
++    with tempfile.TemporaryFile() as tmpfh:
++
++        # Read object
++        def do_read(fh):
++            tmpfh.seek(0)
++            tmpfh.truncate()
++            while True:
++                buf = fh.read(BUFSIZE)
++                if not buf:
++                    break
++                tmpfh.write(buf)
++            return fh.metadata
++
++        meta = backend.perform_read(do_read, obj_id)
++
++        # Write object
++        def do_write(fh):
++            tmpfh.seek(0)
++            while True:
++                buf = tmpfh.read(BUFSIZE)
++                if not buf:
++                    break
++                fh.write(buf)
++            return fh
++        out_fh = backend.perform_write(do_write, obj_id, meta)
++
++        return out_fh.get_obj_size()
++
+ if __name__ == '__main__':
+     main(sys.argv[1:])
+--- a/src/s3ql/backends/comprenc.py
++++ b/src/s3ql/backends/comprenc.py
+@@ -25,6 +25,11 @@
+ 
+ log = logging.getLogger(__name__)
+ 
++from ..upgrade_support import safe_unpickle, pickle
++from base64 import b64decode, b64encode
++import binascii
++SUPPORT_LEGACY_FORMAT=False
++
+ HMAC_SIZE = 32
+ 
+ def sha256(s):
+@@ -67,6 +72,8 @@
+     @copy_ancestor_docstring
+     def lookup(self, key):
+         meta_raw = self.backend.lookup(key)
++        if SUPPORT_LEGACY_FORMAT and meta_raw.get('format_version', 0) < 2:
++            meta_raw = self._convert_legacy_metadata(meta_raw)
+         return self._verify_meta(key, meta_raw)[1]
+ 
+     @prepend_ancestor_docstring
+@@ -147,43 +154,80 @@
+         """
+ 
+         fh = self.backend.open_read(key)
++        checksum_warning = False
+         try:
+-            meta_raw = fh.metadata
+-            (nonce, meta) = self._verify_meta(key, meta_raw)
+-            if nonce:
+-                data_key = sha256(self.passphrase + nonce)
+-
+-            # The `payload_offset` key only exists if the storage object was
+-            # created with on old S3QL version. In order to avoid having to
+-            # download and re-upload the entire object during the upgrade, the
+-            # upgrade procedure adds this header to tell us how many bytes at
+-            # the beginning of the object we have to skip to get to the payload.
+-            if 'payload_offset' in meta_raw:
+-                to_skip = meta_raw['payload_offset']
+-                while to_skip:
+-                    to_skip -= len(fh.read(to_skip))
+-
+-            encr_alg = meta_raw['encryption']
+-            if encr_alg == 'AES_v2':
+-                fh = DecryptFilter(fh, data_key)
+-            elif encr_alg != 'None':
+-                raise RuntimeError('Unsupported encryption: %s' % encr_alg)
+-
+-            compr_alg = meta_raw['compression']
+-            if compr_alg == 'BZIP2':
+-                fh = DecompressFilter(fh, bz2.BZ2Decompressor())
+-            elif compr_alg == 'LZMA':
+-                fh = DecompressFilter(fh, lzma.LZMADecompressor())
+-            elif compr_alg == 'ZLIB':
+-                fh = DecompressFilter(fh,zlib.decompressobj())
+-            elif compr_alg != 'None':
+-                raise RuntimeError('Unsupported compression: %s' % compr_alg)
++            if SUPPORT_LEGACY_FORMAT:
++                if fh.metadata.get('format_version', 0) < 2:
++                    meta_raw = self._convert_legacy_metadata(fh.metadata)
++                else:
++                    meta_raw = fh.metadata
++                (nonce, meta) = self._verify_meta(key, meta_raw)
++                if nonce:
++                    data_key = sha256(self.passphrase + nonce)
++                compr_alg = meta_raw['compression']
++                encr_alg = meta_raw['encryption']
++                if compr_alg == 'BZIP2':
++                    decompressor = bz2.BZ2Decompressor()
++                elif compr_alg == 'LZMA':
++                    decompressor = lzma.LZMADecompressor()
++                elif compr_alg == 'ZLIB':
++                    decompressor = zlib.decompressobj()
++                elif compr_alg == 'None':
++                    decompressor = None
++                else:
++                    raise RuntimeError('Unsupported compression: %s' % compr_alg)
++                if 'payload_offset' in meta_raw:
++                    to_skip = meta_raw['payload_offset']
++                    while to_skip:
++                        to_skip -= len(fh.read(to_skip))
++                checksum_warning = True
++                if encr_alg == 'AES':
++                    fh = LegacyDecryptDecompressFilter(fh, data_key, decompressor)
++                    decompressor = None
++                elif encr_alg == 'AES_v2':
++                    fh = DecryptFilter(fh, data_key)
++                elif encr_alg != 'None':
++                    raise RuntimeError('Unsupported encryption: %s' % encr_alg)
++                if decompressor:
++                    fh = DecompressFilter(fh, decompressor)
++            else:
++                meta_raw = fh.metadata
++
++                (nonce, meta) = self._verify_meta(key, meta_raw)
++                if nonce:
++                    data_key = sha256(self.passphrase + nonce)
++
++                # The `payload_offset` key only exists if the storage object was
++                # created with on old S3QL version. In order to avoid having to
++                # download and re-upload the entire object during the upgrade, the
++                # upgrade procedure adds this header to tell us how many bytes at
++                # the beginning of the object we have to skip to get to the payload.
++                if 'payload_offset' in meta_raw:
++                    to_skip = meta_raw['payload_offset']
++                    while to_skip:
++                        to_skip -= len(fh.read(to_skip))
++
++                encr_alg = meta_raw['encryption']
++                if encr_alg == 'AES_v2':
++                    fh = DecryptFilter(fh, data_key)
++                elif encr_alg != 'None':
++                    raise RuntimeError('Unsupported encryption: %s' % encr_alg)
++
++                compr_alg = meta_raw['compression']
++                if compr_alg == 'BZIP2':
++                    fh = DecompressFilter(fh, bz2.BZ2Decompressor())
++                elif compr_alg == 'LZMA':
++                    fh = DecompressFilter(fh, lzma.LZMADecompressor())
++                elif compr_alg == 'ZLIB':
++                    fh = DecompressFilter(fh,zlib.decompressobj())
++                elif compr_alg != 'None':
++                    raise RuntimeError('Unsupported compression: %s' % compr_alg)
+ 
+             fh.metadata = meta
+         except:
+             # Don't emit checksum warning, caller hasn't even
+             # started reading anything.
+-            fh.close(checksum_warning=False)
++            fh.close(checksum_warning=checksum_warning)
+             raise
+ 
+         return fh
+@@ -275,6 +319,8 @@
+ 
+     def _copy_or_rename(self, src, dest, rename, metadata=None):
+         meta_raw = self.backend.lookup(src)
++        if SUPPORT_LEGACY_FORMAT and meta_raw.get('format_version', 0) < 2:
++            meta_raw = self._convert_legacy_metadata(meta_raw)
+         (nonce, meta_old) = self._verify_meta(src, meta_raw)
+ 
+         if nonce:
+@@ -303,6 +349,165 @@
+     def close(self):
+         self.backend.close()
+ 
++    def _convert_legacy_metadata(self, meta):
++        '''Convert metadata to newest format
++
++        This method ensures that we can read objects written
++        by older S3QL versions.
++        '''
++
++        format_version = meta.get('format_version', 0)
++        assert format_version in (0,1)
++        if format_version == 0:
++            meta = self._convert_legacy_metadata0(meta)
++        return self._convert_legacy_metadata1(meta)
++
++    def _convert_legacy_metadata0(self, meta,
++                                 LEN_BYTES = struct.calcsize(b'<B'),
++                                 TIME_BYTES = struct.calcsize(b'<f')):
++        meta_new = dict(format_version=1)
++
++        if ('encryption' in meta and
++            'compression' in meta):
++            meta_new['encryption'] = meta['encryption']
++            meta_new['compression'] = meta['compression']
++
++        elif 'encrypted' in meta:
++            s = meta['encrypted']
++            if s == 'True':
++                meta_new['encryption'] = 'AES'
++                meta_new['compression'] = 'BZIP2'
++
++            elif s == 'False':
++                meta_new['encryption'] = 'None'
++                meta_new['compression'] = 'None'
++
++            elif s.startswith('AES/'):
++                meta_new['encryption'] = 'AES'
++                meta_new['compression'] = s[4:]
++
++            elif s.startswith('PLAIN/'):
++                meta_new['encryption'] = 'None'
++                meta_new['compression'] = s[6:]
++            else:
++                raise RuntimeError('Unsupported encryption')
++
++            if meta_new['compression'] == 'BZ2':
++                meta_new['compression'] = 'BZIP2'
++
++            if meta_new['compression'] == 'NONE':
++                meta_new['compression'] = 'None'
++        else:
++            meta_new['encryption'] = 'None'
++            meta_new['compression'] = 'None'
++
++        # Extract metadata (pre 2.x versions use multiple headers)
++        if any(k.startswith('meta') for k in meta):
++            parts = [ meta[k] for k in sorted(meta.keys())
++                      if k.startswith('meta') ]
++            meta_new['data'] = ''.join(parts)
++        else:
++            try:
++                meta_new['data'] = meta['data']
++            except KeyError:
++                raise CorruptedObjectError('meta key data is missing')
++
++        if not self.passphrase:
++            return meta_new
++
++        meta_buf = b64decode(meta_new['data'])
++        off = 0
++        def read(len_):
++            nonlocal off
++            tmp = meta_buf[off:off+len_]
++            off += len_
++            return tmp
++
++        len_ = struct.unpack(b'<B', read(LEN_BYTES))[0]
++        nonce = read(len_)
++        key = sha256(self.passphrase + nonce)
++        cipher = aes_cipher(key)
++        hmac_ = hmac.new(key, digestmod=hashlib.sha256)
++        hash_ = read(HMAC_SIZE)
++        meta_buf = meta_buf[off:]
++        meta_buf_plain = cipher.decrypt(meta_buf)
++        hmac_.update(meta_buf_plain)
++        hash_ = cipher.decrypt(hash_)
++
++        if not hmac.compare_digest(hash_, hmac_.digest()):
++            raise CorruptedObjectError('HMAC mismatch')
++
++        obj_id = nonce[TIME_BYTES:].decode('utf-8')
++        meta_key = sha256(self.passphrase + nonce + b'meta')
++        meta_new['nonce'] = b64encode(nonce)
++        meta_new['payload_offset'] = LEN_BYTES + len(nonce)
++        meta_new['data'] = b64encode(aes_cipher(meta_key).encrypt(meta_buf_plain))
++        meta_new['object_id'] = b64encode(obj_id.encode('utf-8'))
++        meta_new['signature'] = calc_legacy_meta_checksum(meta_new, meta_key)
++
++        return meta_new
++
++    def _convert_legacy_metadata1(self, metadata):
++        if not isinstance(metadata, dict):
++            raise CorruptedObjectError('metadata should be dict, not %s' % type(metadata))
++
++        for mkey in ('encryption', 'compression', 'data'):
++            if mkey not in metadata:
++                raise CorruptedObjectError('meta key %s is missing' % mkey)
++
++        encr_alg = metadata['encryption']
++        encrypted = (encr_alg != 'None')
++
++        if encrypted and self.passphrase is None:
++            raise CorruptedObjectError('Encrypted object and no passphrase supplied')
++
++        elif not encrypted and self.passphrase is not None:
++            raise ObjectNotEncrypted()
++
++        try:
++            meta_buf = b64decode(metadata['data'])
++        except binascii.Error:
++            raise CorruptedObjectError('Invalid metadata, b64decode failed')
++
++        if not encrypted:
++            try:
++                meta2 = safe_unpickle(meta_buf, encoding='latin1')
++            except pickle.UnpicklingError as exc:
++                raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
++            if meta2 is None:
++                meta2 = dict()
++            metadata['data'] = freeze_basic_mapping(meta2)
++            metadata['format_version'] = 2
++            return metadata
++
++        # Encrypted
++        for mkey in ('nonce', 'signature', 'object_id'):
++            if mkey not in metadata:
++                raise CorruptedObjectError('meta key %s is missing' % mkey)
++
++        nonce = b64decode(metadata['nonce'])
++        meta_key = sha256(self.passphrase + nonce + b'meta')
++        meta_sig = calc_legacy_meta_checksum(metadata, meta_key)
++        if not hmac.compare_digest(metadata['signature'], meta_sig):
++            raise CorruptedObjectError('HMAC mismatch')
++
++        buf = aes_cipher(meta_key).decrypt(meta_buf)
++        try:
++            meta2 = safe_unpickle(buf, encoding='latin1')
++        except pickle.UnpicklingError as exc:
++            raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
++        if meta2 is None:
++            meta2 = dict()
++
++        meta_buf = freeze_basic_mapping(meta2)
++        metadata['nonce'] = nonce
++        metadata['object_id'] = b64decode(metadata['object_id']).decode('utf-8')
++        metadata['data'] = aes_cipher(meta_key).encrypt(meta_buf)
++        metadata['format_version'] = 2
++        metadata['signature'] = checksum_basic_mapping(metadata, meta_key)
++
++        return metadata
++
+ class CompressFilter(object):
+     '''Compress data while writing'''
+ 
+@@ -675,3 +880,107 @@
+     '''
+ 
+     pass
++
++
++def calc_legacy_meta_checksum(metadata, key):
++    # This works most of the time, so we still try to validate the
++    # signature. But in general, the pickle output is not unique so this is
++    # not a good way to compute a checksum.
++    chk = hmac.new(key, digestmod=hashlib.sha256)
++    for mkey in sorted(metadata.keys()):
++        assert isinstance(mkey, str)
++        if mkey == 'signature':
++            continue
++        val = metadata[mkey]
++        if isinstance(val, str):
++            val = val.encode('utf-8')
++        elif not isinstance(val, (bytes, bytearray)):
++            val = pickle.dumps(val, 2)
++        chk.update(mkey.encode('utf-8') + val)
++    return b64encode(chk.digest())
++
++class LegacyDecryptDecompressFilter(io.RawIOBase):
++    '''Decrypt and Decompress data while reading
++
++    Reader has to read the entire stream in order for HMAC
++    checking to work.
++    '''
++
++    def __init__(self, fh, key, decomp):
++        '''Initialize
++
++        *fh* should be a file-like object and may be unbuffered.
++        '''
++        super().__init__()
++
++        self.fh = fh
++        self.decomp = decomp
++        self.hmac_checked = False
++        self.cipher = aes_cipher(key)
++        self.hmac = hmac.new(key, digestmod=hashlib.sha256)
++        self.hash = fh.read(HMAC_SIZE)
++
++    def discard_input(self):
++        while True:
++            buf = self.fh.read(BUFSIZE)
++            if not buf:
++                break
++
++    def _decrypt(self, buf):
++        # Work around https://bugs.launchpad.net/pycrypto/+bug/1256172
++        # cipher.decrypt refuses to work with anything but bytes
++        if not isinstance(buf, bytes):
++            buf = bytes(buf)
++
++        len_ = len(buf)
++        buf = self.cipher.decrypt(buf)
++        assert len(buf) == len_
++        return buf
++
++    def read(self, size=-1):
++        '''Read up to *size* bytes
++
++        This method is currently buggy and may also return *more*
++        than *size* bytes. Callers should be prepared to handle
++        that. This is because some of the used (de)compression modules
++        don't support output limiting.
++        '''
++
++        if size == -1:
++            return self.readall()
++        elif size == 0:
++            return b''
++
++        buf = None
++        while not buf:
++            buf = self.fh.read(size)
++            if not buf and not self.hmac_checked:
++                if not hmac.compare_digest(self._decrypt(self.hash),
++                                           self.hmac.digest()):
++                    raise CorruptedObjectError('HMAC mismatch')
++                elif self.decomp and self.decomp.unused_data:
++                    raise CorruptedObjectError('Data after end of compressed stream')
++                else:
++                    self.hmac_checked = True
++                    return b''
++            elif not buf:
++                return b''
++
++            buf = self._decrypt(buf)
++            if not self.decomp:
++                break
++
++            buf = decompress(self.decomp, buf)
++
++        self.hmac.update(buf)
++        return buf
++
++    def close(self, *a, **kw):
++        self.fh.close(*a, **kw)
++
++    def __enter__(self):
++        return self
++
++    def __exit__(self, *a):
++        self.close()
++        return False
+--- a/src/s3ql/backends/local.py
++++ b/src/s3ql/backends/local.py
+@@ -18,6 +18,9 @@
+ import os
+ import shutil
+ 
++from ..upgrade_support import safe_unpickle_fh, pickle
++SUPPORT_LEGACY_FORMAT=False
++
+ log = logging.getLogger(__name__)
+ 
+ class Backend(AbstractBackend, metaclass=ABCDocstMeta):
+@@ -241,7 +244,14 @@
+ def _read_meta(fh):
+     buf = fh.read(9)
+     if not buf.startswith(b's3ql_1\n'):
+-        raise CorruptedObjectError('Invalid object header: %r' % buf)
++        if SUPPORT_LEGACY_FORMAT:
++            fh.seek(0)
++            try:
++                return safe_unpickle_fh(fh, encoding='latin1')
++            except pickle.UnpicklingError as exc:
++                raise CorruptedObjectError('Invalid metadata, pickle says: %s' % exc)
++        else:
++            raise CorruptedObjectError('Invalid object header: %r' % buf)
+ 
+     len_ = struct.unpack('<H', buf[-2:])[0]
+     try:
+--- a/src/s3ql/backends/s3c.py
++++ b/src/s3ql/backends/s3c.py
+@@ -33,6 +33,9 @@
+ import time
+ import urllib.parse
+ 
++from ..upgrade_support import safe_unpickle, pickle
++SUPPORT_LEGACY_FORMAT=False
++
+ C_DAY_NAMES = [ 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun' ]
+ C_MONTH_NAMES = [ 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec' ]
+ 
+@@ -383,7 +386,7 @@
+         if extra_headers is not None:
+             headers.update(extra_headers)
+         headers[self.hdr_prefix + 'copy-source'] = \
+-            '/%s/%s%s' % (self.bucket_name, self.prefix, src)
++            urllib.parse.quote('/%s/%s%s' % (self.bucket_name, self.prefix, src))
+ 
+         if metadata is None:
+             headers[self.hdr_prefix + 'metadata-directive'] = 'COPY'
+@@ -712,7 +715,31 @@
+ 
+         format_ = resp.headers.get('%smeta-format' % self.hdr_prefix, 'raw')
+         if format_ != 'raw2': # Current
+-            raise CorruptedObjectError('Invalid metadata format: %s' % format_)
++            if SUPPORT_LEGACY_FORMAT:
++                meta = CaseInsensitiveDict()
++                pattern = re.compile(r'^%smeta-(.+)$' % re.escape(self.hdr_prefix),
++                                     re.IGNORECASE)
++                for fname in resp.headers:
++                    hit = pattern.search(fname)
++                    if hit:
++                        meta[hit.group(1)] = resp.headers[fname]
++
++                if format_ == 'raw':
++                    return meta
++
++                # format_ == pickle
++                buf = ''.join(meta[x] for x in sorted(meta) if x.lower().startswith('data-'))
++                if 'md5' in meta and md5sum_b64(buf.encode('us-ascii')) != meta['md5']:
++                    log.warning('MD5 mismatch in metadata for %s', obj_key)
++                    raise BadDigestError('BadDigest', 'Meta MD5 for %s does not match' % obj_key)
++                try:
++                    return safe_unpickle(b64decode(buf), encoding='latin1')
++                except binascii.Error:
++                    raise CorruptedObjectError('Corrupted metadata, b64decode failed')
++                except pickle.UnpicklingError as exc:
++                    raise CorruptedObjectError('Corrupted metadata, pickle says: %s' % exc)
++            else:
++                raise CorruptedObjectError('Invalid metadata format: %s' % format_)
+ 
+         parts = []
+         for i in count():
+--- /dev/null
++++ b/src/s3ql/upgrade_support.py
+@@ -0,0 +1,75 @@
++'''
++Routines for reading old metadata to allow upgrade.
++Forward-ported from Mercurial commit 773931c43368.
++'''
++
++from ..logging import logging # Ensure use of custom logger class
++import pickletools
++import pickle
++import codecs
++import io
++
++log = logging.getLogger(__name__)
++
++SAFE_UNPICKLE_OPCODES = {'BININT', 'BININT1', 'BININT2', 'LONG1', 'LONG4',
++                         'BINSTRING', 'SHORT_BINSTRING', 'GLOBAL',
++                         'NONE', 'NEWTRUE', 'NEWFALSE', 'BINUNICODE',
++                         'BINFLOAT', 'EMPTY_LIST', 'APPEND', 'APPENDS',
++                         'LIST', 'EMPTY_TUPLE', 'TUPLE', 'TUPLE1', 'TUPLE2',
++                         'TUPLE3', 'EMPTY_DICT', 'DICT', 'SETITEM',
++                         'SETITEMS', 'POP', 'DUP', 'MARK', 'POP_MARK',
++                         'BINGET', 'LONG_BINGET', 'BINPUT', 'LONG_BINPUT',
++                         'PROTO', 'STOP', 'REDUCE'}
++
++SAFE_UNPICKLE_GLOBAL_NAMES = { ('__builtin__', 'bytearray'),
++                               ('__builtin__', 'set'),
++                               ('__builtin__', 'frozenset'),
++                               ('_codecs', 'encode') }
++SAFE_UNPICKLE_GLOBAL_OBJS = { bytearray, set, frozenset, codecs.encode }
++
++class SafeUnpickler(pickle.Unpickler):
++    def find_class(self, module, name):
++        if (module, name) not in SAFE_UNPICKLE_GLOBAL_NAMES:
++            raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
++                                         (module, name))
++        ret = super().find_class(module, name)
++        if ret not in SAFE_UNPICKLE_GLOBAL_OBJS:
++            raise pickle.UnpicklingError("global '%s.%s' is unsafe" %
++                                         (module, name))
++        return ret
++
++
++def safe_unpickle_fh(fh, fix_imports=True, encoding="ASCII",
++                  errors="strict"):
++    '''Safely unpickle untrusted data from *fh*
++
++    *fh* must be seekable.
++    '''
++
++    if not fh.seekable():
++        raise TypeError('*fh* must be seekable')
++    pos = fh.tell()
++
++    # First make sure that we know all used opcodes
++    try:
++        for (opcode, arg, _) in pickletools.genops(fh):
++            if opcode.proto > 2 or opcode.name not in SAFE_UNPICKLE_OPCODES:
++                raise pickle.UnpicklingError('opcode %s is unsafe' % opcode.name)
++    except (ValueError, EOFError):
++        raise pickle.UnpicklingError('corrupted data')
++
++    fh.seek(pos)
++
++    # Then use a custom Unpickler to ensure that we only give access to
++    # specific, whitelisted globals. Note that with the above opcodes, there is
++    # no way to trigger attribute access, so "brachiating" from a white listed
++    # object to __builtins__ is not possible.
++    return SafeUnpickler(fh, fix_imports=fix_imports,
++                         encoding=encoding, errors=errors).load()
++
++def safe_unpickle(buf, fix_imports=True, encoding="ASCII",
++                  errors="strict"):
++    '''Safely unpickle untrusted data in *buf*'''
++
++    return safe_unpickle_fh(io.BytesIO(buf), fix_imports=fix_imports,
++                            encoding=encoding, errors=errors)




More information about the Python-apps-commits mailing list