[apt-proxy-devel] r601 - in people/halls/rework: apt_proxy debian

Chris Halls halls at costa.debian.org
Sat Apr 8 15:42:34 UTC 2006


Author: halls
Date: Sat Apr  8 15:42:30 2006
New Revision: 601

Modified:
   people/halls/rework/apt_proxy/apt_proxy.py
   people/halls/rework/apt_proxy/cache.py
   people/halls/rework/apt_proxy/fetchers.py
   people/halls/rework/debian/changelog
Log:
Port rsync backend to new fetcher API (not yet debugged)


Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py	(original)
+++ people/halls/rework/apt_proxy/apt_proxy.py	Sat Apr  8 15:42:30 2006
@@ -228,7 +228,8 @@
             return
 
         backendName = self.uri[1:].split('/')[0]
-        log.debug("Request: %s %s backend=%s uri=%s"%(self.method, self.uri, backendName, self.uri),'Request')
+        log.debug("Request: %s %s backend=%s uri=%s"
+                    % (self.method, self.uri, backendName, self.uri),'Request')
 
         if self.factory.config.disable_pipelining:
             self.setHeader('Connection','close')

Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py	(original)
+++ people/halls/rework/apt_proxy/cache.py	Sat Apr  8 15:42:30 2006
@@ -353,6 +353,14 @@
         for request in self.requests:
             request.finishCode(http_code, reason)
 
+        # Remove directory if file was not created
+        if not os.path.exists(self.file_path):
+            try:
+                os.removedirs(self.filedir)
+            except:
+                pass
+
+
     def file_sent(self):
         """
         File has been sent successfully to at least one client

Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py	(original)
+++ people/halls/rework/apt_proxy/fetchers.py	Sat Apr  8 15:42:30 2006
@@ -21,7 +21,7 @@
 network backends
 """
 
-import re, os, string, time
+import re, os, string, time, glob, signal
 from twisted.web import static, http
 from twisted.internet import protocol, reactor, defer
 from twisted.python import failure
@@ -127,14 +127,10 @@
         self.deferred.callback((False, reason_msg))
 
     def cancel_download(self):
-        if self.fetcher and self.fetcher.transport:
+        if self.fetcher:
             log.debug(
-                "telling the transport to loseConnection",'Fetcher')
-            try:
-                self.fetcher.transport.loseConnection()
-            except KeyError:
-                # Rsync fetcher already loses conneciton for us
-                pass
+                "telling fetchers to disconnect",'Fetcher')
+            self.fetcher.disconnect()
         self.download_failed(None, "Download canceled")
 
     def data_received(self, data):
@@ -210,6 +206,10 @@
         # TODO - failover?
         self.download_failed(http.NOT_FOUND, "file not found on backend")
 
+    def fetcher_internal_error(self, reason):
+        log.msg("(%s) internal error: %s" % (self.backendServer.path, reason), 'fetcher')
+        self.download_failed(http.INTERNAL_SERVER_ERROR, reason)
+
     def send_complete_file(self, filename):
         """
         Send a complete file (used by FileFetcher)
@@ -366,7 +366,7 @@
                     else:
                         log.err("File transfer overrun! Expected size:%s Received size:%s" % 
                                 (self.server_size, self.fetcher.len_received), 'http_client')
-                        self.parent.download_failure(http.INTERNAL_SERVER_ERROR, "Data overrun")
+                        self.parent.fetcher_internal_error("Data overrun")
 
 #     def handleResponse(self, buffer):
 #         if self.length == 0:
@@ -463,7 +463,7 @@
             self.connection.transport.loseConnection()
             self.isConnected = False
 
-class FtpFetcher(Fetcher, protocol.Protocol):
+class FtpFetcher(protocol.Protocol):
     """
     This is the secuence here:
 
@@ -609,7 +609,7 @@
 
     def ftpListFailed(self, msgs):
         log.debug("ftp list failed: %s" % (msgs), 'ftp_client')
-        self.parent.download_failed(http.INTERNAL_SERVER_ERROR, "Could not list directory")
+        self.parent.fetcher_internal_error("Could not list directory")
 
     def ftpFetchFile(self):
         "And finally, we ask for the file."
@@ -756,56 +756,67 @@
         self.apDataReceived("")
         self.apDataEnd(self.transfered)
 
-class RsyncFetcher(Fetcher, protocol.ProcessProtocol):
+class RsyncFetcher(protocol.ProcessProtocol):
     """
-    I frequently am not called directly, Request.fetch makes the
-    arrangement for FetcherGzip to use us and gzip the result if needed.
+    Fetch a file using the rsync protocol
+    rsync is run as an external process
     """
-    post_convert = re.compile(r"^Should not match anything$")
-    gzip_convert = re.compile(r"/Packages.gz$")
-    
-    "Temporary filename that rsync streams to"
-    rsyncTempFile = None
-    
-    "Number of bytes sent to client already"
-    bytes_sent = 0
+    rsyncCommand = '/usr/bin/rsync'
 
-    def activate (self, request):
-        Fetcher.activate(self, request)
-        if not request.apFetcher:
-            return
+    def __init__(self, backendServer):
+        self.backendServer = backendServer
+        self.rsyncProcess = None
+
+    def connect(self):
+        # We can't connect seperately so just return true
+        return defer.succeed(True)
+
+    def download(self, fetcher, uri, mtime):
+        """
+        Request download
+        %param fetcher: Fetcher class to receive callbacks
+        %param uri: URI of file to be downloaded within backend
+        %param mtime: Modification time of current file in cache
+        """
+        self.rsyncTempFile = None # Temporary filename that rsync streams to
+        self.bytes_sent = 0 #Number of bytes sent to client already
+        self.parent = fetcher
+        self.cache_mtime = mtime
+        self.request_uri = uri
+        self.cache_path = fetcher.CacheEntry.cache_path
+        self.cache_dir = fetcher.CacheEntry.filedir
+        self.remote_file = (self.backendServer.path + '/' 
+                            + uri)
 
         # Change /path/to/FILE -> /path/to/.FILE.* to match rsync tempfile
-        self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.local_file)
-        
+        self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.cache_path)
+
         for file in glob.glob(self.globpattern):
-          log.msg('Deleting stale tempfile:' + file)
+          log.msg('Deleting stale tempfile:' + file, 'rsyncFetcher')
           unlink(file)
-                
-        uri = 'rsync://'+request.backendServer.host\
-              +request.backendServer.path+'/'+request.backend_uri
-        self.local_dir=re.sub(r"/[^/]*$", "", self.local_file)+'/'
 
-        exe = '/usr/bin/rsync'
+        uri = 'rsync://'+ self.backendServer.host \
+              +self.backendServer.path+'/' + self.request_uri
+
         if(log.isEnabled('rsync',9)):
-            args = (exe, '--partial', '--progress', '--verbose', '--times',
-                    '--timeout', "%d"%(request.backend.config.timeout),
+            args = (self.rsyncCommand, '--partial', '--progress', '--verbose', '--times',
+                    '--timeout', "%d"%(self.backendServer.backend.config.timeout),
                     uri, '.',)
         else:
-            args = (exe, '--quiet', '--times', uri, '.',
-                    '--timeout',  "%d"%(request.backend.config.timeout),
+            args = (self.rsyncCommand, '--quiet', '--times', uri, '.',
+                    '--timeout',  "%d"%(self.backendServer.backend.config.timeout),
                     )
-        if(not os.path.exists(self.local_dir)):
-            os.makedirs(self.local_dir)
-        self.process = reactor.spawnProcess(self, exe, args, None,
-                                            self.local_dir)
+        #if(not os.path.exists(self.cache_dir)):
+        #    os.makedirs(self.cache_dir)
+        self.rsyncProcess = reactor.spawnProcess(self, self.rsyncCommand, args, None,
+                                            self.cache_dir)
 
     def findRsyncTempFile(self):
         """
         Look for temporary file created by rsync during streaming
         """
         files = glob.glob(self.globpattern)
-        
+
         if len(files)==1:
             self.rsyncTempFile = files[0]
             log.debug('tempfile: ' + self.rsyncTempFile, 'rsync_client')
@@ -814,13 +825,13 @@
             pass
         else:
             log.err('found more than one tempfile, abort rsync')
-            self.transport.loseConnection()
-             
-    def connectionMade(self):
-        pass
+            self.parent.fetcher_internal_error("Found more than one rsync temporary file")
+
+    #def connectionMade(self):
+    #    pass
 
-    "Data received from rsync process to stdout"
     def outReceived(self, data):
+        "Data received from rsync process to stdout"
         for s in string.split(data, '\n'):
             if len(s):
                 log.debug('rsync: ' + s, 'rsync_client')
@@ -828,14 +839,13 @@
         if not self.rsyncTempFile:
             self.findRsyncTempFile()
             # Got tempfile?
-            if self.rsyncTempFile:
-                self.setResponseCode(http.OK)
+            #if self.rsyncTempFile:
+            #    self.setResponseCode(http.OK)
         if self.rsyncTempFile:
             self.sendData()
 
-
-    "Data received from rsync process to stderr"
     def errReceived(self, data):
+        "Data received from rsync process to stderr"
         for s in string.split(data, '\n'):
             if len(s):
                 log.err('rsync error: ' + s, 'rsync_client')
@@ -850,64 +860,58 @@
         else:
             # Tempfile has gone, stream main file
             #log.debug("sendData open dest " + str(self.bytes_sent))
-            f = open(self.local_file, 'rb')
-            
+            f = open(self.cache_path, 'rb')
+
         if f:
             f.seek(self.bytes_sent)
             data = f.read(abstract.FileDescriptor.bufferSize)
             #log.debug("sendData got " + str(len(data)))
             f.close()
             if data:
-                self.apDataReceived(data)
+                self.parent.data_received(data)
                 self.bytes_sent = self.bytes_sent + len(data)
                 reactor.callLater(0, self.sendData)
             elif not self.rsyncTempFile:
                 # Finished reading final file
-                #self.transport = None
                 log.debug("sendData complete")
                 # Tell clients, but data is already saved by rsync so don't
                 # write file again
-                self.apDataEnd(self.transfered, False)
-                
-        
+                self.parent.download_complete()
+
     def processEnded(self, status_object):
         __pychecker__ = 'unusednames=reason'
-        log.debug("Status: %d" %(status_object.value.exitCode)
-                  ,'rsync_client')
         self.rsyncTempFile = None
-        
-        # Success?
-        exitcode = status_object.value.exitCode
-        
-        if exitcode == 0:
-            # File received.  Send to clients.
-            self.server_mtime = os.stat(self.local_file)[stat.ST_MTIME]
-            reactor.callLater(0, self.sendData)
-        else:
-            if exitcode == 10:
-                # Host not found
-                self.setResponseCode(http.INTERNAL_SERVER_ERROR)
+        self.rsyncProcess = None
+
+        if isinstance(status_object, failure.Failure):
+            log.debug("rsync failure: %s" %(status_object)
+                  ,'rsync_client')
+            self.parent.fetcher_internal_error("Error in rsync")
+        else:
+            log.debug("Status: %d" %(status_object.value.exitCode)
+                      ,'rsync_client')
+    
+            # Success?
+            exitcode = status_object.value.exitCode
+    
+            if exitcode == 0:
+                # File received.  Send to clients.
+                self.parent.server_mtime(os.stat(self.cache_path)[stat.ST_MTIME])
+                reactor.callLater(0, self.sendData)
             else:
-                self.setResponseCode(http.NOT_FOUND)
-                
-            if not os.path.exists(self.local_file):
-                try:
-                    os.removedirs(self.local_dir)
-                except:
-                    pass
-            self.apDataReceived("")
-            self.apDataEnd(self.transfered)
+                if exitcode == 10:
+                    # Host not found
+                    self.parent.connection_failed('rsync connection to %s failed'
+                                                   % (self.backendServer.host))
+                else:
+                    self.parent.file_not_found()
 
-    def loseConnection(self):
+    def disconnect(self):
         "Kill rsync process"
-        if self.transport:
-            if self.transport.pid:
-                log.debug("killing rsync child" + 
-                          str(self.transport.pid), 'rsync_client')
-                os.kill(self.transport.pid, signal.SIGTERM)
-            #self.transport.loseConnection()
-        
-        
+        if self.rsyncProcess and self.rsyncProcess.pid:
+            log.debug("disconnect: killing rsync child pid " + 
+                      str(self.rsyncProcess.pid), 'rsync_client')
+            os.kill(self.rsyncProcess.pid, signal.SIGTERM)
 
 class FetcherCachedFile(Fetcher):
     """

Modified: people/halls/rework/debian/changelog
==============================================================================
--- people/halls/rework/debian/changelog	(original)
+++ people/halls/rework/debian/changelog	Sat Apr  8 15:42:30 2006
@@ -6,9 +6,10 @@
   * Add support for username and password in http_proxy parameter.
     Thanks to Thomas Champagne for the patch (Closes: #323147)
   * Move fetchers and cache management into separate files
+  * Add bandwidth limiting for http backends (Part of #306095)
   * Add more unit tests
 
- -- Chris Halls <halls at debian.org>  Tue, 11 Oct 2005 16:25:06 +0100
+ -- Chris Halls <chris.halls at credativ.co.uk>  Wed, 29 Mar 2006 11:04:41 +0100
 
 apt-proxy (1.9.33) unstable; urgency=low
 



More information about the apt-proxy-devel mailing list