[apt-proxy-devel] r585 - people/halls/rework/apt_proxy

Chris Halls halls at costa.debian.org
Thu Sep 22 12:59:05 UTC 2005


Author: halls
Date: Thu Sep 22 12:59:04 2005
New Revision: 585

Modified:
   people/halls/rework/apt_proxy/apt_proxy.py
   people/halls/rework/apt_proxy/cache.py
   people/halls/rework/apt_proxy/fetchers.py
   people/halls/rework/apt_proxy/misc.py
Log:
More work on the new Fetcher


Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py	(original)
+++ people/halls/rework/apt_proxy/apt_proxy.py	Thu Sep 22 12:59:04 2005
@@ -97,7 +97,7 @@
             return e
     def entry_done(self, entry):
         "A cache entry is finished and clients are disconnected"
-        
+
     def get_packages_db(self):
         "Return packages parser object for the backend, creating one if necessary"
         if self.packages == None:
@@ -107,12 +107,16 @@
     def get_path(self, path):
         """
         'path' is the original uri of the request.
-        
+
         We return the path to be appended to the backend path to
         request the file from the backend server
         """
         return path[len(self.base)+2:]
-        
+
+    def file_served(self, entry):
+        "A cache entry has served a file in this backend"
+        self.get_packages_db().packages_file(entry.file_path)
+
 class BackendServer:
     """
     A repository server.  A BackendServer is created for each URI defined in 'backends'
@@ -259,6 +263,8 @@
             if hasattr(self.transport, 'loseConnection'):
                 self.transport.loseConnection()
 
+        self.cacheEntry.remove_request(self)
+        self.cacheEntry = None
 
     def connectionLost(self, reason=None):
         """

Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py	(original)
+++ people/halls/rework/apt_proxy/cache.py	Thu Sep 22 12:59:04 2005
@@ -49,8 +49,6 @@
 
     bytesDownloaded = 0
 
-    streamFile = None # TempFile File which receives data in transit
-
     def __init__(self, backend, path):
         """
         Create a new cache entry
@@ -66,12 +64,17 @@
         # File in cache '/var/cache/apt-proxy/debian/dists/stable/Release.gpg'
         self.file_path = self.factory.config.cache_dir + path
 
+        # Directory of cache file '/var/cache/apt-proxy/debian/dists/stable'
+        self.filedir = os.dirname(self.file_path)
+
         self.filetype = findFileType(path)
         self.filename = os.path.basename(path) # 'Release.gpg'
 
         # filebase='Release' fileext='gpg'
         (self.filebase, self.fileext) = os.path.splitext(self.filename)
 
+        self.create_directory()
+
     def add_reqest(self, request):
         """
         A new request has been received for this file
@@ -123,7 +126,7 @@
         Update current version of file in cache
         """
         if self.state == STATE_NEW:
-            if os.path.exists(self.local_file):
+            if os.path.exists(self.file_path):
                 if self.check_age():
                     self.verify()
                     return
@@ -147,10 +150,10 @@
         """
         stat_tuple = os.stat(self.path)
 
-        self.local_mtime = stat_tuple[stat.ST_MTIME]
-        self.local_size = stat_tuple[stat.ST_SIZE]
+        self.file_mtime = stat_tuple[stat.ST_MTIME]
+        self.file_size = stat_tuple[stat.ST_SIZE]
         log.debug("Modification time:" + 
-                  time.asctime(time.localtime(self.local_mtime)), 
+                  time.asctime(time.localtime(self.file_mtime)), 
                   "file_ok")
         update_times = self.factory.update_times
 
@@ -160,21 +163,21 @@
                       time.asctime(time.localtime(last_access)), 
                       "file_ok")
         else:
-            last_access = self.local_mtime
+            last_access = self.file_mtime
 
 
         cur_time = time.time()
         min_time = cur_time - self.factory.config.min_refresh_delay
 
         if not self.filetype.mutable:
-            log.debug("file is immutable: "+self.local_file, 'CacheEntry')
+            log.debug("file is immutable: "+self.file_path, 'CacheEntry')
             deferred.callback(None)
         elif last_access < min_time:
-            log.debug("file is too old: "+self.local_file, 'CacheEntry')
+            log.debug("file is too old: "+self.file_path, 'CacheEntry')
             update_times[self.uri] = cur_time
             deferred.errback()
         else:
-            log.debug("file is ok: "+self.local_file, 'CacheEntry')
+            log.debug("file is ok: "+self.file_path, 'CacheEntry')
             deferred.callback(None)
 
     def send_cached_file(self):
@@ -189,6 +192,12 @@
         """
         pass
 
+    def create_directory(self):
+        """
+        Create directory for cache entry's file
+        """
+        if(not os.path.exists(self.filedir)):
+            os.makedirs(self.filedir)
 
     def download_started(self, fetcher):
         """
@@ -227,15 +236,30 @@
         for req in self.requests:
             req.write(data)
 
+    def rename_tempfile(self, filename):
+        """
+        When a Fetcher has streamed to a temporary file, rename this file to
+        the final name
+        """
+        os.rename(filename, self.file_path)
+
     def download_data_end(self):
         """
         Callback from Fetcher
         File streaming is complete
         """
-        pass
-
+        self.state = STATE_SENT
+        self.backend.file_served(self)
+        self.factory.file_served(self.request.uri)
 
+        for req in self.requests:
+            req.finish()
 
+    def download_failure(self, reason):
+        """
+        Download is not possible
+        """
+        self.state = 
 class FileType:
     """
     This is just a way to distinguish between different filetypes.

Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py	(original)
+++ people/halls/rework/apt_proxy/fetchers.py	Thu Sep 22 12:59:04 2005
@@ -35,13 +35,15 @@
     fetcher = None   # connection-specific fetcher
 
     def init_tempfile(self):
-        self.transfered = TempFile()
+        self.streamFilename = self.cacheEntry.file_path + ".apDownload"
+        self.transfered = StreamFile(self.streamFilename)
 
     def start(self, cacheEntry):
-        self.init_tempfile()
         self.cacheEntry = cacheEntry
         self.backend = cacheEntry.backend
         self.len_received = 0
+        self.deferred = Deferred()
+        return deferred
 
     def activateNextBackendServer(self, fetcher):
         """
@@ -53,11 +55,11 @@
                 log.err("No backend server found for backend " + self.backend.name, "fetcher")
                 return False
         else:
-            # Look for the next possible BackendServer and transfer requests to that
-            # The attempt to retrieve a file from the BackendServer failed.
+            # Look for the next possible BackendServer
             self.backendServer = self.backend.get_next_server(self.backendServer)
 
             if(self.backendServer == None):
+                # The attempt to retrieve a file from the BackendServer failed.
                 log.debug("no more Backends", "fetcher")
                 return False
 
@@ -85,8 +87,11 @@
         Download was successful
         """
         self.cacheEntry.download_data_end()
+        self.deferred.callback()
+
     def download_failed(self):
         self.cacheEntry.download_data_end()
+        self.deferred.errback()
 
     def cancel_download(self):
         if self.fetcher and self.fetcher.transport:
@@ -97,6 +102,7 @@
             except KeyError:
                 # Rsync fetcher already loses conneciton for us
                 pass
+        self.download_failed()
 
     def data_received(self, data):
         """
@@ -105,10 +111,30 @@
         if self.len_received == 0:
             self.cacheEntry.download_started(self)
         self.len_received = self.len_received + len(data)
+
         if self.transfered:
+            # save to tempfile (if it in use)
             self.transfered.append(data)
         self.cacheEntry.download_data_received(data)
 
+    def transfer_complete(self):
+        """
+        All data has been transferred
+        """
+        log.debug("Finished receiving data: " + self.cacheEntry.filename, 'Fetcher');
+        if self.transferred:
+            self.transferred.close()
+            self.transferred = None
+
+            if self.fetcher.server_mtime != None:
+                os.utime(self.local_file, (time.time(), self.fetcher.server_mtime))
+            else:
+                log.debug("no local time: "+self.local_file,'Fetcher')
+                os.utime(self.local_file, (time.time(), 0))
+            self.cacheEntry.rename_file(self.streamFilename)
+
+        self.cacheEntry.download_data_end()
+
 class FetcherOld:
     """
     This is the base class for all Fetcher*, it tries to hold as much
@@ -157,13 +183,6 @@
                     data.seek(0, SEEK_SET)
                     shutil.copyfileobj(data, f)
                 f.close()
-                if self.local_mtime != None:
-                    os.utime(self.local_file, (time.time(), self.local_mtime))
-                else:
-                    log.debug("no local time: "+self.local_file,'Fetcher')
-                    os.utime(self.local_file, (time.time(), 0))
-
-            self.factory.file_served(self.request.uri)
 
             #self.request.backend.get_packages_db().packages_file(self.request.uri)
         
@@ -270,7 +289,7 @@
             #Make sure that next time nothing will happen
             #FIXME: This hack is probably not anymore pertinent.
             self.connectionFailed = lambda : log.debug('connectionFailed(2)',
-                                                    'Fetcher','9')
+                                                     'Fetcher','9')
 
 class FileFetcher(Fetcher):
 
@@ -301,7 +320,7 @@
             
         request.setHeader("Content-Length", self.local_size)
         #request.setHeader("Last-modified",
-        #                  http.datetimeToString(request.local_mtime))
+        #                  http.datetimeToString(request.server_mtime))
         basic.FileSender().beginFileTransfer(file, request) \
                           .addBoth(self.file_transfer_complete, request) \
                           .addBoth(lambda r: file.close())
@@ -366,7 +385,7 @@
             
         self.sendHeader('host', self.request.backendServer.host)
 
-        if self.local_mtime != None:
+        if self.XXXXXXXXXXXXX_mtime != None:
             datetime = http.datetimeToString(self.local_mtime)
             self.sendHeader('if-modified-since', datetime)
 
@@ -388,7 +407,7 @@
         key = string.lower(key)
 
         if key == 'last-modified':
-            self.local_mtime = http.stringToDatetime(value)
+            self.server_mtime = http.stringToDatetime(value)
 
         if key in self.forward_headers:
             self.setResponseHeader(key, value)
@@ -526,12 +545,12 @@
                 time_tuple = time_tuple[:8] + (-1,)
                 #correct the result to GMT
                 mtime = time.mktime(time_tuple) - time.altzone
-            if (fetcher.local_mtime and mtime
-                and fetcher.local_mtime >= mtime):
-                fetcher.ftpFinishCached()
-            else:
-                fetcher.local_mtime = mtime
-                fetcher.ftpFetchSize()
+                fetcher.server_mtime = mtime
+                if (fetcher.XXXXXXXXXXXXXX_mtime
+                      and fetcher.XXXXXXXXlocal_mtime >= mtime):
+                    fetcher.ftpFinishCached()
+                    return
+            fetcher.ftpFetchSize()
 
         d = self.ftpclient.queueStringCommand('MDTM ' + self.remote_file)
         d.addCallbacks(apFtpMtimeFinish, apFtpMtimeFinish,
@@ -840,7 +859,7 @@
         
         if exitcode == 0:
             # File received.  Send to clients.
-            self.local_mtime = os.stat(self.local_file)[stat.ST_MTIME]
+            self.server_mtime = os.stat(self.local_file)[stat.ST_MTIME]
             reactor.callLater(0, self.sendData)
         else:
             if exitcode == 10:

Modified: people/halls/rework/apt_proxy/misc.py
==============================================================================
--- people/halls/rework/apt_proxy/misc.py	(original)
+++ people/halls/rework/apt_proxy/misc.py	Thu Sep 22 12:59:04 2005
@@ -157,15 +157,16 @@
             if not self.pending:
                 log.msg("Pruning empty directory: "+path,'recycle')
                 os.removedirs(path)
-        else:
-            if os.path.isfile(path):
+        elif os.path.isfile(path):
+            ext = os.path.splitext(path)[1]
+            if not ext == 'apDownload':
                 #print "PATH:", path
                 #print "URI: ", uri
                 if not self.factory.access_times.has_key(uri):
                     log.msg("Adopting new file: "+ uri,'recycle')
                     self.factory.access_times[uri] = os.path.getatime(path)
-            else:
-                log.msg("UNKNOWN:"+path,'recycle')
+        else:
+            log.msg("UNKNOWN:"+path,'recycle')
 
         if not self.pending:
             self.pop()



More information about the apt-proxy-devel mailing list