[pkg-eucalyptus-commits] [SCM] managing cloud instances for Eucalyptus branch, master, updated. 3.0.0-alpha3-257-g1da8e3a

Garrett Holmstrom gholms at fedoraproject.org
Sun Jun 16 02:31:03 UTC 2013


The following commit has been merged in the master branch:
commit 3b718ff00f35f448acab54679ebe3c0b85aa7d0f
Author: Garrett Holmstrom <gholms at fedoraproject.org>
Date:   Tue Apr 16 14:37:56 2013 -0700

    Switch PutObject over from redirection hack to generic retry logic

diff --git a/euca2ools/commands/walrus/putobject.py b/euca2ools/commands/walrus/putobject.py
index dc908e0..8cc8a95 100644
--- a/euca2ools/commands/walrus/putobject.py
+++ b/euca2ools/commands/walrus/putobject.py
@@ -41,7 +41,10 @@ import time
 
 
 class PutObject(WalrusRequest):
-    DESCRIPTION = 'Upload objects to the server'
+    DESCRIPTION = ('Upload objects to the server\n\nNote that uploading a '
+                   'large file to a region other than the one the bucket is '
+                   'may result in "Broken pipe" errors or other connection '
+                   'problems that this program cannot detect.')
     ARGS = [Arg('sources', metavar='FILE', nargs='+', route_to=None,
                 help='file(s) to upload'),
             Arg('dest', metavar='BUCKET/PREFIX', route_to=None,
@@ -53,22 +56,34 @@ class PutObject(WalrusRequest):
             Arg('--guess-mime-type', action='store_true', route_to=None,
                 help='''automatically select MIME types for the files being
                 uploaded'''),
+            Arg('--retry', dest='retries', action='store_const', const=5,
+                default=1, route_to=None,
+                help='retry interrupted uploads up to 5 times'),
             Arg('--progress', action='store_true', route_to=None,
                 help='show upload progress')]
     METHOD = 'PUT'
 
+    def __init__(self, **kwargs):
+        WalrusRequest.__init__(self, **kwargs)
+        self.last_upload_error = None
+        self._lock = threading.Lock()
+
     def configure(self):
         WalrusRequest.configure(self)
-        if self.args['literal_dest'] and len(self.args['sources']) != 1:
+        if (self.args.get('literal_dest', False) and
+            len(self.args['sources']) != 1):
+            # Can't explicitly specify dest file names when we're uploading
+            # more than one thing
             raise ArgumentError('argument -T: only allowed with one file')
         if self.args['dest'].startswith('/'):
             raise ArgumentError('destination must begin with a bucket name')
 
     def main(self):
+        sources = list(self.args['sources'])
         max_source_len = max(len(os.path.basename(source)) for source
-                             in self.args['sources'])
-        for source_filename in self.args['sources']:
-            if self.args['literal_dest']:
+                             in sources)
+        for source_filename in sources:
+            if self.args.get('literal_dest', False):
                 (bucket, __, keyname) = self.args['dest'].partition('/')
                 if not keyname:
                     raise ArgumentError('destination must contain a key name')
@@ -78,15 +93,18 @@ class PutObject(WalrusRequest):
             self.path = bucket + '/' + keyname
             self.headers['Content-Length'] = os.path.getsize(source_filename)
             self.headers.pop('Content-Type', None)
-            if self.args['guess_mime_type']:
+            if self.args.get('guess_mime_type', False):
                 mtype = mimetypes.guess_type(source_filename)
                 if mtype:
                     self.headers['Content-Type'] = mtype
 
             self.log.info('uploading %s to %s', source_filename, self.path)
+            with self._lock:
+                self.last_upload_error = None
             with open(source_filename) as source:
                 upload_thread = threading.Thread(target=self.try_send,
-                                                 args=(source,))
+                    args=(source,),
+                    kwargs={'retries_left': self.args['retries']})
                 # The upload thread is daemonic so ^C will kill the program
                 # more cleanly.
                 upload_thread.daemon = True
@@ -94,7 +112,7 @@ class PutObject(WalrusRequest):
                 if self.args['progress']:
                     import progressbar
                     filename_padding = ' ' * (max_source_len -
-                                              len(source_filename))
+                        len(os.path.basename(source_filename)))
                     widgets = [os.path.basename(source_filename),
                                filename_padding, ' ', progressbar.Percentage(),
                                ' ', progressbar.Bar(marker='='), ' ',
@@ -116,45 +134,24 @@ class PutObject(WalrusRequest):
                     while upload_thread.is_alive():
                         time.sleep(0.01)
                 upload_thread.join()
+            with self._lock:
+                if self.last_upload_error is not None:
+                    raise self.last_upload_error
 
-    def try_send(self, source):
+    def try_send(self, source, retries_left=0):
         self.body = source
         try:
             self.send()
         except ClientError as err:
-            # When you start uploading something to the wrong region S3 will
-            # try to send an error response, but requests doesn't listen for
-            # that until it's done uploading the whole file.  If the file takes
-            # more than a couple seconds to upload then S3 will kill the
-            # connection before then, leaving us with a broken pipe and a
-            # ConnectionError from requests, but no error response.
-            #
-            # We should be using Expect: 100-continue so we can deal with that
-            # before we send anything, but requests doesn't support that and
-            # it isn't trivial to implement from outside.
-            # (See https://github.com/kennethreitz/requests/issues/713)
-            #
-            # Plan C is to retry the bad request with something small emough to
-            # give us the contents of the error and then retry with the URL
-            # that that error contains.
             if len(err.args) > 0 and isinstance(err.args[0], socket.error):
-                self.log.warn('upload interrupted by socket error')
-                self.log.debug('retrieving error message from server')
-                self.headers['Content-Length'] = 0
-                self.body = ''
-                self.saved_body = source
-                self.send()
-            else:
-                raise
-
-    def handle_server_error(self, err):
-        if (300 <= err.status_code < 400 and 'Endpoint' in err.elements and
-            hasattr(self, 'saved_body')):
-            # This was a redirect-probing request; restore the original body to
-            # make the usual redirect handling work
-            print >> sys.stderr, 'retrying upload with endpoint', \
-                err.elements['Endpoint']
-            self.body = self.saved_body
-            del self.saved_body
-            self.headers['Content-Length'] = os.path.getsize(self.body.name)
-        return WalrusRequest.handle_server_error(self, err)
+                self.log.warn('socket error')
+                if retries_left > 0:
+                    self.log.info('retrying upload (%i retries remaining)',
+                                  retries_left)
+                    self.log.debug('re-seeking body to beginning of file')
+                    source.seek(0)
+                    return self.try_send(source, retries_left - 1)
+                else:
+                    with self._lock:
+                        self.last_upload_error = err
+                    raise

-- 
managing cloud instances for Eucalyptus



More information about the pkg-eucalyptus-commits mailing list