[apt-proxy-devel] r586 - in people/halls/rework: apt_proxy debian
Chris Halls
halls at costa.debian.org
Tue Oct 11 15:38:57 UTC 2005
Author: halls
Date: Tue Oct 11 15:38:56 2005
New Revision: 586
Modified:
people/halls/rework/apt_proxy/apt_proxy.py
people/halls/rework/apt_proxy/apt_proxy_conf.py
people/halls/rework/apt_proxy/cache.py
people/halls/rework/apt_proxy/fetchers.py
people/halls/rework/debian/changelog
Log:
* Add support for per backend http_proxy auth
* More work on Fetcher rewrite - implementation of Http Fetcher
Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py (original)
+++ people/halls/rework/apt_proxy/apt_proxy.py Tue Oct 11 15:38:56 2005
@@ -245,19 +245,29 @@
self.cacheEntry.add_request(self)
-` def startStreaming(self, size, mtime):
- self.setResponseCode(http.OK, 'Streaming file')
- self.setHeader('last-modified', http.datetimeToString(mtime))
- self.setHeader('content-length', size)
-
+` def start_streaming(self, size, mtime):
+ """
+ Prepare client to stream file
+ Return false if streaming is not necessary (i.e. cache hit)
+ """
+ if request.local_mtime <= if_modified_since:
+ self.setResponseCode(http.OK, 'Streaming file')
+ self.setHeader('last-modified', http.datetimeToString(mtime))
+ self.setHeader('content-length', size)
+ return True
+ else
+ self.setHeader("content-length", 0)
+ self.finishCode(http.NOT_MODIFIED, 'File is up to date')
+ return False
def finishCode(self, responseCode, message=None):
- "Finish the request with an status code"
+ "Finish the request with a status code and no streamed data"
self.setResponseCode(responseCode, message)
self.write("")
self.finish()
def finish(self):
+ "Finish request after streaming"
http.Request.finish(self)
if self.factory.config.disable_pipelining:
if hasattr(self.transport, 'loseConnection'):
Modified: people/halls/rework/apt_proxy/apt_proxy_conf.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy_conf.py (original)
+++ people/halls/rework/apt_proxy/apt_proxy_conf.py Tue Oct 11 15:38:56 2005
@@ -58,6 +58,13 @@
return self.get(section,option)
def getstringlist(self, section, option):
return self.get(section,option).split()
+ def getproxyspec(self, section, option):
+ "Get http proxy info from string"
+ p = ProxyConfig(self.get(section,option))
+ if p.host is not None:
+ return p
+ else
+ return None
class apConfig:
"""
@@ -90,7 +97,7 @@
['disable_pipelining', '1', 'boolean'],
['passive_ftp', 'on', 'boolean'],
['dynamic_backends', 'on', 'boolean'],
- ['http_proxy', '' , 'string'],
+ ['http_proxy', '' , 'proxyspec'],
['username', 'aptproxy', 'string']
]
@@ -105,6 +112,7 @@
['timeout', None, 'time'],
['passive_ftp', None, 'boolean'],
['backends', '', 'stringlist']
+ ['http_proxy', None , 'proxyspec'],
]
DEFAULT_CONFIG_FILE = ['/etc/apt-proxy/apt-proxy-v2.conf',
@@ -261,3 +269,23 @@
name = "UNKNOWN"
def __init__(self, name):
self.name = name
+
+class ProxyConfig:
+ """
+ Configuration information for backend server proxies
+ """
+ host = None
+ port = None
+ user = None
+ password = None
+
+ def __init__(self, proxyspec):
+ if proxyspec=='':
+ return
+ m = re.match('^((?P<user>.*):(?P<password>.*)@)?(?P<host>[a-zA-Z0-9_.+=-]+):(?P<port>[0-9]+)',
+ proxyspec)
+ if m:
+ self.host = m.group('host')
+ self.port = m.group('port')
+ self.user = m.group('user')
+ self.password = m.group('password')
Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py (original)
+++ people/halls/rework/apt_proxy/cache.py Tue Oct 11 15:38:56 2005
@@ -111,6 +111,11 @@
# Cancel download in progress
self.fetcher.cancel_download()
+ if self.streamfile is not None:
+ # File was streamed to clients
+ self.streamfile.close()
+ self.streamfile = None
+
def start_request_stream(self, request):
"""
Prepare a request for streaming
@@ -182,16 +187,49 @@
def send_cached_file(self):
"""
- Send complete file from cache to clients
+ File is up to date - send complete file from cache to clients
"""
- log.debug("Sending cached file: " + self.path, 'CacheEntry')
- self.state = STATE_SENDFILE
+ log.msg("sending file from cache:" + self.file_path, "CachEntry")
+ self.transfer_file(self.path)
+
def end_send_cached(self):
"""
Processing continues here when the file has been sent from the cache
"""
pass
+ def transfer_file(self, filename):
+ """
+ Send given file to clients
+ """
+ stat_tuple = os.stat(filename)
+ self.file_mtime = stat_tuple[stat.ST_MTIME]
+ self.file_size = stat_tuple[stat.ST_SIZE]
+ size = os.stat(filename)[stat.ST_SIZE]
+
+ self.state = STATE_SENDFILE
+ if size > 0:
+ log.debug("Sending file to clients:%s size:%s" % (filename, size), 'CacheEntry')
+ self.streamfile = open(self.local_file,'rb')
+ fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
+
+ for request in self.requests:
+ if request.start_streaming(self.file_size, self.file_mtime):
+ basic.FileSender().beginFileTransfer(self.streamfile, request) \
+ .addBoth(self.file_transfer_complete, request, filename)
+ else:
+ log.debug("Sending empty file to clients:%s" % (filename), 'CacheEntry')
+ for request in self.requests:
+ if request.start_streaming(self.file_size, self.file_mtime):
+ request.finish()
+
+ def file_transfer_complete(self, result, request, filename):
+ log.debug("transfer complete: " + filename, 'CacheEntry')
+ request.finish()
+ if len(self.requests)==0:
+ # Last file was sent
+ self.file_sent()
+
def create_directory(self):
"""
Create directory for cache entry's file
@@ -248,9 +286,7 @@
Callback from Fetcher
File streaming is complete
"""
- self.state = STATE_SENT
- self.backend.file_served(self)
- self.factory.file_served(self.request.uri)
+ self.file_sent()
for req in self.requests:
req.finish()
@@ -260,6 +296,16 @@
Download is not possible
"""
self.state =
+
+ def file_sent(self):
+ """
+ File has been sent successfully to at least one client
+ Update databases with statistics for this file
+ """
+ self.state = STATE_SENT
+ self.backend.file_served(self)
+ self.factory.file_served(self.request.uri)
+
class FileType:
"""
This is just a way to distinguish between different filetypes.
Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py (original)
+++ people/halls/rework/apt_proxy/fetchers.py Tue Oct 11 15:38:56 2005
@@ -63,9 +63,12 @@
log.debug("no more Backends", "fetcher")
return False
- fetcher_class = self.backendServer.fetcher
log.debug('Trying next backendServer', 'fetcher')
- fetcher.apEndTransfer(fetcher_class)
+ self.fetcher = self.backendServer.fetcher(self.backendServer)
+ deferred = fetcher.connect()
+ deferred.callback(self.connected)
+ deferred.errback(self.connection_failed)
+ #fetcher.apEndTransfer(fetcher_class)
return True
@@ -79,7 +82,7 @@
to gzip/gunzip file before and after download.
"""
log.debug("Downloading: " + self.path, 'CacheEntry')
- init_tempfile()
+ #init_tempfile()
self.activateNextBackendServer(self.fetcher)
def download_complete(self):
@@ -89,9 +92,17 @@
self.cacheEntry.download_data_end()
self.deferred.callback()
- def download_failed(self):
- self.cacheEntry.download_data_end()
- self.deferred.errback()
+ def fail_over(self, reason_code, reason_msg):
+ """
+ A non-fatal download has occured. Attempt download from next
+ backend
+ """
+ if not self.activateNextBackendServer(self.fetcher):
+ self.download_failed()
+
+ def download_failed(self, reason_code, reason_msg):
+ #self.cacheEntry.download_data_end()
+ self.deferred.errback(reason_code, reason_msg)
def cancel_download(self):
if self.fetcher and self.fetcher.transport:
@@ -102,7 +113,7 @@
except KeyError:
# Rsync fetcher already loses conneciton for us
pass
- self.download_failed()
+ self.download_failed(None, "Download canceled")
def data_received(self, data):
"""
@@ -135,258 +146,105 @@
self.cacheEntry.download_data_end()
-class FetcherOld:
- """
- This is the base class for all Fetcher*, it tries to hold as much
- common code as posible.
-
- Subclasses of this class are the ones responsible for contacting
- the backend servers and fetching the actual data.
- """
- gzip_convert = re.compile(r"/Packages$")
- post_convert = re.compile(r"/Packages.gz$")
- status_code = http.OK
- status_message = None
- length = None
- transport = None
-
-
- def setResponseCode(self, code, message=None):
- "Set response code for all requests"
- #log.debug('Response code: %d - %s' % (code, message),'Fetcher')
- self.status_code = code
- self.status_message = message
- # TODO: notify cacheEntry?
-
- def apDataEnd(self, data, saveData=True):
- """
- Called by subclasses when the data transfer is over.
-
- -caches the received data if everyting went well (if saveData=True)
- -takes care of mtime and atime
- -finishes connection with server and the requests
-
- """
- import shutil
- log.debug("Finished receiving data, status:%d saveData:%d" %(self.status_code, saveData), 'Fetcher');
- if (self.status_code == http.OK):
- if saveData:
- dir = dirname(self.local_file)
- if(not os.path.exists(dir)):
- os.makedirs(dir)
- f = open(self.local_file, "w")
- fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
- f.truncate(0)
- if type(data) is StringType:
- f.write(data)
- else:
- data.seek(0, SEEK_SET)
- shutil.copyfileobj(data, f)
- f.close()
-
- #self.request.backend.get_packages_db().packages_file(self.request.uri)
-
- if self.transport:
- try:
- self.transport.loseConnection()
- except exceptions.KeyError:
- # Couldn't close connection - already closed?
- log.debug("transport.loseConnection() - "
- "connection already closed", 'Fetcher')
- pass
-
- for req in self.requests:
- req.finish()
-
- self.transfered.close()
- self.apEnd()
-
- def apEnd(self):
- """
- Called by subclasses when apDataEnd does too many things.
-
- Let's everyone know that we are not the active Fetcher for our uri.
- """
- try:
- del self.factory.runningFetchers[self.request.uri]
- except exceptions.KeyError:
- log.debug("We are not on runningFetchers!!!",'Fetcher')
- log.debug("Class is not in runningFetchers: "+str(self.__class__),
- 'Fetcher')
- if self.request:
- log.debug(' URI:' + self.request.uri, 'Fetcher')
- log.debug('Running fetchers: '
- +str(self.factory.runningFetchers),'Fetcher')
- #raise exceptions.KeyError
- for req in self.requests[:]:
- self.remove_request(req)
-
- import gc
- #Cleanup circular references
- reactor.callLater(5, gc.collect)
-
- def apEndCached(self):
- """
- A backend has indicated that this file has not changed,
- so serve the file from the disk cache
- """
- self.setResponseCode(http.OK)
- self.apEndTransfer(FetcherCachedFile)
-
- def apEndTransfer(self, fetcher_class):
- """
- Remove this Fetcher and transfer all it's requests to a new instance of
- 'fetcher_class'.
- """
- #Consider something like this:
- #req = dummyFetcher.fix_ref_request()
- #fetcher = fetcher_class()
- #dummyFetcher.transfer_requests(fetcher)
- #dummyFetcher.apEnd()
- #fetcher.activate(req)
-
- #self.setResponseCode(http.OK)
- requests = self.requests[:]
- self.apEnd() # Remove requests from this fetcher
- fetcher = None
- for req in requests:
- if (fetcher_class != FetcherCachedFile or req.serve_if_cached):
- running = req.factory.runningFetchers
- if (running.has_key(req.uri)):
- #If we have an active Fetcher just use that
- log.debug("have active Fetcher",'Fetcher')
- running[req.uri].insert_request(req)
- fetcher = running[req.uri]
- else:
- fetcher = fetcher_class(req)
- else:
- req.finish()
- return fetcher
-
- def connectionFailed(self, reason=None):
+ def conection_failed(self, reason = None):
"""
- Tell our requests that the connection with the server failed.
+ A fetcher has failed to connect to the backend server
"""
msg = '[%s] Connection Failed: %s/%s'%(
- self.request.backend.name,
- self.request.backendServer.path, self.request.backend_uri)
+ self.backend.name,
+ self.backendServer.path, self.request.backend_uri)
if reason:
msg = '%s (%s)'%(msg, reason.getErrorMessage())
log.debug("Connection Failed: "+str(reason), 'Fetcher')
log.err(msg)
+ self.fail_over(http.SERVICE_UNAVAILABLE, reason)
- # Look for alternative fetchers
- if not self.request.activateNextBackendServer(self):
- # No more backends, send error response back to client
- if reason.check(error.ConnectError):
- self.setResponseCode(http.SERVICE_UNAVAILABLE, "Connect Error")
- else:
- self.setResponseCode(http.SERVICE_UNAVAILABLE)
- self.apDataReceived("")
- self.apDataEnd(self.transfered, False)
- #Because of a bug in tcp.Client we may be called twice,
- #Make sure that next time nothing will happen
- #FIXME: This hack is probably not anymore pertinent.
- self.connectionFailed = lambda : log.debug('connectionFailed(2)',
- 'Fetcher','9')
+ def connected()
+ log.debug("Connected to "+ self.request.backend_uri, 'Fetcher')
+ log.debug('downloading:%s mtime:%s' % (uri, mtime), 'Fetcher')
+ self.fetcher.download(self.cacheEntry.path, self.cacheEntry.file_mtime)
-class FileFetcher(Fetcher):
+ def file_not_found()
+ log.msg("[%s] file not found: %s" % (self.backendServer.path, self.request.backend_uri))
+ # TODO - failover?
+ self.download_failed(reason)
- def activate(self, request):
- Fetcher.activate(self, request)
- log.debug("FetcherFile.activate(): uri='%s' server='%s'" % (request.uri, request.backendServer.uri))
- if not request.apFetcher:
- log.debug("no request.apFetcher")
- return
-
- self.factory.file_served(request.uri)
-
- # start the transfer
- self.local_file = request.backendServer.uri[len("file:"):]+ request.uri
- if not os.path.exists(self.local_file):
- log.debug("not found: %s" % self.local_file)
- request.setResponseCode(http.NOT_FOUND)
- request.write("")
- request.finish()
- self.remove_request(request)
- Fetcher.apEnd(self)
- return
- self.local_size = os.stat(self.local_file)[stat.ST_SIZE]
-
- log.debug("Serving local file: " + self.local_file + " size:" + str(self.local_size), 'FetcherCachedFile')
- file = open(self.local_file,'rb')
- fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
-
- request.setHeader("Content-Length", self.local_size)
- #request.setHeader("Last-modified",
- # http.datetimeToString(request.server_mtime))
- basic.FileSender().beginFileTransfer(file, request) \
- .addBoth(self.file_transfer_complete, request) \
- .addBoth(lambda r: file.close())
-
- # A file transfer has completed
- def file_transfer_complete(self, result, request):
- log.debug("transfer complete", 'FetcherCachedFile')
- request.finish()
- # Remove this client from request list
- self.remove_request(request)
- if len(self.requests) == 0:
- Fetcher.apEnd(self)
-
-class HttpFetcher(Fetcher, http.HTTPClient):
+ def send_complete_file(filename):
+ """
+ Send a complete file (used by FileFetcher)
+ """
+ self.cacheEntry.transfer_file(filename)
- forward_headers = [
- 'last-modified',
- 'content-length'
- ]
- log_headers = None
+ def up_to_date():
+ self.cacheEntry.send_cached_file()
- proxy_host = None
- proxy_port = None
+class FileFetcher:
+ """
+ A Fetcher that simply copies files from disk
+ """
+ def __init__(self, backendServer):
+ self.backendServer = backendServer
+ self.isConnected = True # Always connected
- def activate(self, request):
- Fetcher.activate(self, request)
+ def connect(self):
+ # We always conect
+ d = deferred()
+ d.runcallbacks()
+ return d
- if not self.factory.config.http_proxy is '':
- (self.proxy_host, self.proxy_port) = request.factory.config.http_proxy.split(':')
+ def download(self, fetcher, uri, mtime):
+ """
+ Request download
+ %param fetcher: Fetcher class to receive callbacks
+ %param uri: URI of file to be downloaded within backend
+ %param mtime: Modification time of current file in cache
+ """
+ self.parent = fetcher
+ self.cache_mtime = mtime
+ self.request_uri = uri
- if not request.apFetcher:
+ self.local_file = self.backendServer.uri[len("file:"):] + request_uri
+ if not os.path.exists(self.local_file):
+ self.parent.file_not_found()
return
- class ClientFactory(protocol.ClientFactory):
- "Dummy ClientFactory to comply with current twisted API"
- #FIXME: Double check this, haggai thinks it is to blame for the
- #hangs.
- def __init__(self, instance):
- self.instance = instance
- def buildProtocol(self, addr):
- return self.instance
- def clientConnectionFailed(self, connector, reason):
- self.instance.connectionFailed(reason)
- def clientConnectionLost(self, connector, reason):
- log.debug("XXX clientConnectionLost", "http-client")
-
- if not self.proxy_host:
- reactor.connectTCP(request.backendServer.host, request.backendServer.port,
- ClientFactory(self), request.backend.config.timeout)
- else:
- reactor.connectTCP(self.proxy_host, int(self.proxy_port),
- ClientFactory(self), request.backend.config.timeout)
+ # start the transfer
+ self.parent.send_complete_file(self.local_file)
+
+class FetcherHttpClient(http.HTTPClient):
+ """
+ This class represents an Http conncetion to a backend
+ server
+ """
+ def __init__(self, parent):
+ self.parent = parent # HttpFetcher
+ self.proxy = parent.backendServer.config.http_proxy
def connectionMade(self):
- if not self.proxy_host:
- self.sendCommand(self.request.method, self.request.backendServer.path
- + "/" + self.request.backend_uri)
- else:
- self.sendCommand(self.request.method, "http://"
- + self.request.backendServer.host + ":" + str(self.request.backendServer.port)
- + "/" + self.request.backendServer.path
- + "/" + self.request.backend_uri)
-
- self.sendHeader('host', self.request.backendServer.host)
+ self.parent.connected(self)
+
+ def download(self, fetcher, uri, mtime):
+ # Request file from backend
+ self.fetcher = fetcher
+ backendServer = self.parent.backendServer
+ if not self.proxy.host:
+ serverpath = backendServer.path
+ else:
+ serverpath = "http://" + backendServer.host
+ if backendServer.port != 80:
+ serverpath = serverpath + ":" + str(backendServer.port)
+ serverpath = serverpath + "/" + backendServer.path
+
+ #self.sendCommand(self.request.method,
+ self.sendCommand("GET" serverpath + "/" + uri)
+
+ self.sendHeader('host', backendServer.host)
+ if self.proxy.user:
+ self.sendHeader('Proxy-Authorization', "Basic " +
+ encodestring(self.proxy.user + ":" + self.proxy.password))
- if self.XXXXXXXXXXXXX_mtime != None:
- datetime = http.datetimeToString(self.local_mtime)
+ if mtime is not None:
+ datetime = http.datetimeToString(mtime)
self.sendHeader('if-modified-since', datetime)
self.endHeaders()
@@ -403,19 +261,18 @@
def handleHeader(self, key, value):
- log.debug("Received: " + key + " " + str(value))
+ log.debug("Received: " + key + " " + str(value), 'http_client')
key = string.lower(key)
if key == 'last-modified':
self.server_mtime = http.stringToDatetime(value)
-
- if key in self.forward_headers:
- self.setResponseHeader(key, value)
+ elsif key == 'content-length':
+ self.server_size = http.stringToDatetime(value)
def handleEndHeaders(self):
if self.http_status == http.NOT_MODIFIED:
log.debug("NOT_MODIFIED " + str(self.status_code),'http_client')
- self.apEndCached()
+ self.parent.up_to_date()
def rawDataReceived(self, data):
self.apDataReceived(data)
@@ -431,21 +288,13 @@
def lineReceived(self, line):
"""
- log the line and handle it to the appropriate the base classe.
-
- The location header gave me trouble at some point, so I filter it just
- in case.
-
- Note: when running a class method directly and not from an object you
- have to give the 'self' parameter manualy.
+ log each line as received from server
"""
- #log.debug(line,'http_client')
- if self.log_headers == None:
+ if self.log_headers is None:
self.log_headers = line
else:
self.log_headers += ", " + line;
- if not re.search('^Location:', line):
- http.HTTPClient.lineReceived(self, line)
+ http.HTTPClient.lineReceived(self, line)
def sendCommand(self, command, path):
"log the line and handle it to the base class."
@@ -464,6 +313,48 @@
log.debug(name + ":" + value,'http_client')
http.HTTPClient.sendHeader(self, name, value)
+class HttpFetcher(protocol.ClientFactory):
+ """
+ A Fetcher that retrieves files via HTTP
+ """
+ def __init__(self, backendServer):
+ self.backendServer = backendServer
+ self.isConnected = False
+
+ def connect(self):
+ self.connectCallback = deferred()
+ if not self.proxy.host:
+ host = request.backendServer.host
+ port = request.backendServer.port
+ else
+ host = self.proxy.host
+ port = self.proxy.port
+ reactor.connectTCP(host, port, self, request.backend.config.timeout)
+
+ def buildProtocol(self, addr):
+ return FetcherHttpClient(self)
+
+ def connected(self, connection):
+ "Connection was made to HTTP backend (callback from HTTP client)"
+ self.connection = connection
+ self.isConnected = True
+ self.connectCallback.runcallbacks()
+
+ def clientConnectionFailed(self, connector, reason):
+ self.instance.connectionFailed(reason)
+ def clientConnectionLost(self, connector, reason):
+ log.debug("XXX clientConnectionLost", "http-client")
+
+ def download(self, fetcher, uri, mtime):
+ """
+ Request download
+ %param fetcher: Fetcher class to receive callbacks
+ %param uri: URI of file to be downloaded within backend
+ %param mtime: Modification time of current file in cache
+ """
+ self.connection.download(fetcher, uri, mtime)
+
+
class FtpFetcher(Fetcher, protocol.Protocol):
"""
This is the secuence here:
Modified: people/halls/rework/debian/changelog
==============================================================================
--- people/halls/rework/debian/changelog (original)
+++ people/halls/rework/debian/changelog Tue Oct 11 15:38:56 2005
@@ -2,10 +2,13 @@
(New changelog because these changes are for post 1.9.32
[ Chris Halls ]
- * Move fetchers and cache management into seperate files
+ * http_proxy can now be set in each [backend] section
+ * Add support for username and password in http_proxy parameter.
+ Thanks to Thomas Champagne for the patch (Closes: #323147)
+ * Move fetchers and cache management into separate files
* Add more unit tests
- -- Chris Halls <chris.halls at credativ.co.uk> Wed, 10 Aug 2005 09:18:30 +0100
+ -- Chris Halls <halls at debian.org> Tue, 11 Oct 2005 16:25:06 +0100
apt-proxy (1.9.31+svn) unstable; urgency=low
More information about the apt-proxy-devel
mailing list