[apt-proxy-devel] r585 - people/halls/rework/apt_proxy
Chris Halls
halls at costa.debian.org
Thu Sep 22 12:59:05 UTC 2005
Author: halls
Date: Thu Sep 22 12:59:04 2005
New Revision: 585
Modified:
people/halls/rework/apt_proxy/apt_proxy.py
people/halls/rework/apt_proxy/cache.py
people/halls/rework/apt_proxy/fetchers.py
people/halls/rework/apt_proxy/misc.py
Log:
More work on the new Fetcher
Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py (original)
+++ people/halls/rework/apt_proxy/apt_proxy.py Thu Sep 22 12:59:04 2005
@@ -97,7 +97,7 @@
return e
def entry_done(self, entry):
"A cache entry is finished and clients are disconnected"
-
+
def get_packages_db(self):
"Return packages parser object for the backend, creating one if necessary"
if self.packages == None:
@@ -107,12 +107,16 @@
def get_path(self, path):
"""
'path' is the original uri of the request.
-
+
We return the path to be appended to the backend path to
request the file from the backend server
"""
return path[len(self.base)+2:]
-
+
+ def file_served(self, entry):
+ "A cache entry has served a file in this backend"
+ self.get_packages_db().packages_file(entry.file_path)
+
class BackendServer:
"""
A repository server. A BackendServer is created for each URI defined in 'backends'
@@ -259,6 +263,8 @@
if hasattr(self.transport, 'loseConnection'):
self.transport.loseConnection()
+ self.cacheEntry.remove_request(self)
+ self.cacheEntry = None
def connectionLost(self, reason=None):
"""
Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py (original)
+++ people/halls/rework/apt_proxy/cache.py Thu Sep 22 12:59:04 2005
@@ -49,8 +49,6 @@
bytesDownloaded = 0
- streamFile = None # TempFile File which receives data in transit
-
def __init__(self, backend, path):
"""
Create a new cache entry
@@ -66,12 +64,17 @@
# File in cache '/var/cache/apt-proxy/debian/dists/stable/Release.gpg'
self.file_path = self.factory.config.cache_dir + path
+ # Directory of cache file '/var/cache/apt-proxy/debian/dists/stable'
+ self.filedir = os.dirname(self.file_path)
+
self.filetype = findFileType(path)
self.filename = os.path.basename(path) # 'Release.gpg'
# filebase='Release' fileext='gpg'
(self.filebase, self.fileext) = os.path.splitext(self.filename)
+ self.create_directory()
+
def add_reqest(self, request):
"""
A new request has been received for this file
@@ -123,7 +126,7 @@
Update current version of file in cache
"""
if self.state == STATE_NEW:
- if os.path.exists(self.local_file):
+ if os.path.exists(self.file_path):
if self.check_age():
self.verify()
return
@@ -147,10 +150,10 @@
"""
stat_tuple = os.stat(self.path)
- self.local_mtime = stat_tuple[stat.ST_MTIME]
- self.local_size = stat_tuple[stat.ST_SIZE]
+ self.file_mtime = stat_tuple[stat.ST_MTIME]
+ self.file_size = stat_tuple[stat.ST_SIZE]
log.debug("Modification time:" +
- time.asctime(time.localtime(self.local_mtime)),
+ time.asctime(time.localtime(self.file_mtime)),
"file_ok")
update_times = self.factory.update_times
@@ -160,21 +163,21 @@
time.asctime(time.localtime(last_access)),
"file_ok")
else:
- last_access = self.local_mtime
+ last_access = self.file_mtime
cur_time = time.time()
min_time = cur_time - self.factory.config.min_refresh_delay
if not self.filetype.mutable:
- log.debug("file is immutable: "+self.local_file, 'CacheEntry')
+ log.debug("file is immutable: "+self.file_path, 'CacheEntry')
deferred.callback(None)
elif last_access < min_time:
- log.debug("file is too old: "+self.local_file, 'CacheEntry')
+ log.debug("file is too old: "+self.file_path, 'CacheEntry')
update_times[self.uri] = cur_time
deferred.errback()
else:
- log.debug("file is ok: "+self.local_file, 'CacheEntry')
+ log.debug("file is ok: "+self.file_path, 'CacheEntry')
deferred.callback(None)
def send_cached_file(self):
@@ -189,6 +192,12 @@
"""
pass
+ def create_directory(self):
+ """
+ Create directory for cache entry's file
+ """
+ if(not os.path.exists(self.filedir)):
+ os.makedirs(self.filedir)
def download_started(self, fetcher):
"""
@@ -227,15 +236,30 @@
for req in self.requests:
req.write(data)
+ def rename_tempfile(self, filename):
+ """
+ When a Fetcher has streamed to a temporary file, rename this file to
+ the final name
+ """
+ os.rename(filename, self.file_path)
+
def download_data_end(self):
"""
Callback from Fetcher
File streaming is complete
"""
- pass
-
+ self.state = STATE_SENT
+ self.backend.file_served(self)
+ self.factory.file_served(self.request.uri)
+ for req in self.requests:
+ req.finish()
+ def download_failure(self, reason):
+ """
+ Download is not possible
+ """
+ self.state =
class FileType:
"""
This is just a way to distinguish between different filetypes.
Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py (original)
+++ people/halls/rework/apt_proxy/fetchers.py Thu Sep 22 12:59:04 2005
@@ -35,13 +35,15 @@
fetcher = None # connection-specific fetcher
def init_tempfile(self):
- self.transfered = TempFile()
+ self.streamFilename = self.cacheEntry.file_path + ".apDownload"
+ self.transfered = StreamFile(self.streamFilename)
def start(self, cacheEntry):
- self.init_tempfile()
self.cacheEntry = cacheEntry
self.backend = cacheEntry.backend
self.len_received = 0
+ self.deferred = Deferred()
+ return deferred
def activateNextBackendServer(self, fetcher):
"""
@@ -53,11 +55,11 @@
log.err("No backend server found for backend " + self.backend.name, "fetcher")
return False
else:
- # Look for the next possible BackendServer and transfer requests to that
- # The attempt to retrieve a file from the BackendServer failed.
+ # Look for the next possible BackendServer
self.backendServer = self.backend.get_next_server(self.backendServer)
if(self.backendServer == None):
+ # The attempt to retrieve a file from the BackendServer failed.
log.debug("no more Backends", "fetcher")
return False
@@ -85,8 +87,11 @@
Download was successful
"""
self.cacheEntry.download_data_end()
+ self.deferred.callback()
+
def download_failed(self):
self.cacheEntry.download_data_end()
+ self.deferred.errback()
def cancel_download(self):
if self.fetcher and self.fetcher.transport:
@@ -97,6 +102,7 @@
except KeyError:
# Rsync fetcher already loses conneciton for us
pass
+ self.download_failed()
def data_received(self, data):
"""
@@ -105,10 +111,30 @@
if self.len_received == 0:
self.cacheEntry.download_started(self)
self.len_received = self.len_received + len(data)
+
if self.transfered:
+ # save to tempfile (if it in use)
self.transfered.append(data)
self.cacheEntry.download_data_received(data)
+ def transfer_complete(self):
+ """
+ All data has been transferred
+ """
+ log.debug("Finished receiving data: " + self.cacheEntry.filename, 'Fetcher');
+ if self.transferred:
+ self.transferred.close()
+ self.transferred = None
+
+ if self.fetcher.server_mtime != None:
+ os.utime(self.local_file, (time.time(), self.fetcher.server_mtime))
+ else:
+ log.debug("no local time: "+self.local_file,'Fetcher')
+ os.utime(self.local_file, (time.time(), 0))
+ self.cacheEntry.rename_file(self.streamFilename)
+
+ self.cacheEntry.download_data_end()
+
class FetcherOld:
"""
This is the base class for all Fetcher*, it tries to hold as much
@@ -157,13 +183,6 @@
data.seek(0, SEEK_SET)
shutil.copyfileobj(data, f)
f.close()
- if self.local_mtime != None:
- os.utime(self.local_file, (time.time(), self.local_mtime))
- else:
- log.debug("no local time: "+self.local_file,'Fetcher')
- os.utime(self.local_file, (time.time(), 0))
-
- self.factory.file_served(self.request.uri)
#self.request.backend.get_packages_db().packages_file(self.request.uri)
@@ -270,7 +289,7 @@
#Make sure that next time nothing will happen
#FIXME: This hack is probably not anymore pertinent.
self.connectionFailed = lambda : log.debug('connectionFailed(2)',
- 'Fetcher','9')
+ 'Fetcher','9')
class FileFetcher(Fetcher):
@@ -301,7 +320,7 @@
request.setHeader("Content-Length", self.local_size)
#request.setHeader("Last-modified",
- # http.datetimeToString(request.local_mtime))
+ # http.datetimeToString(request.server_mtime))
basic.FileSender().beginFileTransfer(file, request) \
.addBoth(self.file_transfer_complete, request) \
.addBoth(lambda r: file.close())
@@ -366,7 +385,7 @@
self.sendHeader('host', self.request.backendServer.host)
- if self.local_mtime != None:
+ if self.XXXXXXXXXXXXX_mtime != None:
datetime = http.datetimeToString(self.local_mtime)
self.sendHeader('if-modified-since', datetime)
@@ -388,7 +407,7 @@
key = string.lower(key)
if key == 'last-modified':
- self.local_mtime = http.stringToDatetime(value)
+ self.server_mtime = http.stringToDatetime(value)
if key in self.forward_headers:
self.setResponseHeader(key, value)
@@ -526,12 +545,12 @@
time_tuple = time_tuple[:8] + (-1,)
#correct the result to GMT
mtime = time.mktime(time_tuple) - time.altzone
- if (fetcher.local_mtime and mtime
- and fetcher.local_mtime >= mtime):
- fetcher.ftpFinishCached()
- else:
- fetcher.local_mtime = mtime
- fetcher.ftpFetchSize()
+ fetcher.server_mtime = mtime
+ if (fetcher.XXXXXXXXXXXXXX_mtime
+ and fetcher.XXXXXXXXlocal_mtime >= mtime):
+ fetcher.ftpFinishCached()
+ return
+ fetcher.ftpFetchSize()
d = self.ftpclient.queueStringCommand('MDTM ' + self.remote_file)
d.addCallbacks(apFtpMtimeFinish, apFtpMtimeFinish,
@@ -840,7 +859,7 @@
if exitcode == 0:
# File received. Send to clients.
- self.local_mtime = os.stat(self.local_file)[stat.ST_MTIME]
+ self.server_mtime = os.stat(self.local_file)[stat.ST_MTIME]
reactor.callLater(0, self.sendData)
else:
if exitcode == 10:
Modified: people/halls/rework/apt_proxy/misc.py
==============================================================================
--- people/halls/rework/apt_proxy/misc.py (original)
+++ people/halls/rework/apt_proxy/misc.py Thu Sep 22 12:59:04 2005
@@ -157,15 +157,16 @@
if not self.pending:
log.msg("Pruning empty directory: "+path,'recycle')
os.removedirs(path)
- else:
- if os.path.isfile(path):
+ elif os.path.isfile(path):
+ ext = os.path.splitext(path)[1]
+ if not ext == 'apDownload':
#print "PATH:", path
#print "URI: ", uri
if not self.factory.access_times.has_key(uri):
log.msg("Adopting new file: "+ uri,'recycle')
self.factory.access_times[uri] = os.path.getatime(path)
- else:
- log.msg("UNKNOWN:"+path,'recycle')
+ else:
+ log.msg("UNKNOWN:"+path,'recycle')
if not self.pending:
self.pop()
More information about the apt-proxy-devel
mailing list