r257 - in /debtorrent/branches/http1.1: DebTorrent/HTTPCache.py DebTorrent/HTTPHandler.py DebTorrent/SocketHandler.py test.py
camrdale-guest at users.alioth.debian.org
camrdale-guest at users.alioth.debian.org
Thu Aug 16 09:59:54 UTC 2007
Author: camrdale-guest
Date: Thu Aug 16 09:59:54 2007
New Revision: 257
URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=257
Log:
Update the HTTP cache to not thread too many requests at a time.
Modified:
debtorrent/branches/http1.1/DebTorrent/HTTPCache.py
debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py
debtorrent/branches/http1.1/DebTorrent/SocketHandler.py
debtorrent/branches/http1.1/test.py
Modified: debtorrent/branches/http1.1/DebTorrent/HTTPCache.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/HTTPCache.py?rev=257&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/HTTPCache.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/HTTPCache.py Thu Aug 16 09:59:54 2007
@@ -13,12 +13,16 @@
@var VERSION: the UserAgent identifier sent to all sites
@type alas: C{string}
@var alas: the message to send when the data is not found
+ at type TIMEOUT: C{float}
+ at var TIMEOUT: the number of seconds after which an idle connection is closed
"""
-from httplib import HTTPConnection
+from httplib import HTTPConnection, BadStatusLine
+from socket import gaierror
from threading import Thread
from DebTorrent.__init__ import product_name,version_short
+from clock import clock
from os.path import join, split, getmtime, getsize, exists
from os import utime, makedirs, listdir
from time import strftime, strptime, gmtime
@@ -35,41 +39,24 @@
time_format = '%a, %d %b %Y %H:%M:%S'
VERSION = product_name+'/'+version_short
alas = 'your file may exist elsewhere in the universe\nbut alas, not here\n'
+TIMEOUT = 60.0
class CacheRequest:
- """Download a file needed for the HTTP download cache.
-
- @type handler: L{HTTPCache}
- @ivar handler: the cache manager for the download
+ """A new request to send to the server for the cache.
+
@type path: C{list} of C{string}
@ivar path: the server and path to download
- @type server: C{string}
- @ivar server: the webserver address and port to connect to
- @type url: C{string}
- @ivar url: the URL to request from the site
@type func: C{method}
@ivar func: the method to call when the download completes
- @type connection: C{HTTPConnection}
- @ivar connection: the connection to the HTTP server
- @type headers: C{dictionary}
- @ivar headres: the HTTP headers to send in the request, and the headers
- returned by the response
- @type active: C{boolean}
- @ivar active: whether there is a download underway
- @type received_data: C{string}
- @ivar received_data: the data returned from the server
- @type connection_status: C{int}
- @ivar connection_status: the status code returned by the server
- @type connection_response: C{string}
- @ivar connection_response: the status message returned by the server
+ @type response: (C{int}, C{string}, C{dictionary}, C{string})
+ @ivar response: the HTTP status code, status message, headers, and
+ downloaded data
"""
- def __init__(self, handler, path, func):
+ def __init__(self, path, func):
"""Initialize the instance.
- @type handler: L{HTTPCache}
- @param handler: the cache manager for the download
@type path: C{list} of C{string}
@param path: the server and path to download
@type func: C{method}
@@ -77,24 +64,124 @@
"""
+ self.path = path
+ self.func = func
+ self.response = None
+
+ def save_response(self, r):
+ """Save a returned response from the server.
+
+ @type r: C{httplib.HTTPResponse}
+ @param r: the response from the server
+
+ """
+
+ self.response = (r.status, r.reason, dict(r.getheaders()), r.read())
+
+ def error(self, error_msg):
+ """Save an error response.
+
+ @type error_msg: C{string}
+ @param error_msg: the error that occurred
+
+ """
+
+ self.response = (502, 'Bad Gateway', {},
+ 'error accessing http server: '+error_msg)
+
+class CacheConnection:
+ """Download files needed for the HTTP download cache from a single server.
+
+ @type handler: L{HTTPCache}
+ @ivar handler: the cache manager for the download
+ @type server: C{string}
+ @ivar server: the webserver address and port to connect to
+ @type request: L{CacheRequest}
+ @ivar request: the request currently in progress
+ @type request_queue: C{list} of L{CacheRequest}
+ @ivar request_queue: the waiting requests
+ @type connection: C{HTTPConnection}
+ @ivar connection: the connection to the HTTP server
+ @type url: C{string}
+ @ivar url: the URL to request from the site
+ @type headers: C{dictionary}
+ @ivar headers: the HTTP headers to send in the request
+ @type active: C{boolean}
+ @ivar active: whether there is a download underway
+ @type closed: C{boolean}
+ @ivar closed: whether ther connection has been closed
+ @type last_action: C{float}
+ @ivar last_action: the last time an action occurred
+
+ """
+
+ def __init__(self, handler, server):
+ """Initialize the instance.
+
+ @type handler: L{HTTPCache}
+ @param handler: the cache manager for the download
+ @type server: C{string}
+ @param server: the server name to send the requests to
+
+ """
+
self.handler = handler
- self.path = path
- self.server = path[0]
- self.url = '/' + '/'.join(path[1:])
- self.func = func
+ self.server = server
+ self.request = None
+ self.request_queue = []
+ self.headers = {'User-Agent': VERSION}
+ self.active = False
+ self.closed = False
+ self.last_action = clock()
+
try:
self.connection = HTTPConnection(self.server)
except:
- logger.exception('cannot connect to http seed: '+self.server)
+ logger.exception('cannot connect to http server: '+self.server)
+ self.close()
+
+ def queue(self, path, func):
+ """Queue a download for later starting.
+
+ @type path: C{list} of C{string}
+ @param path: the server and path to download
+ @type func: C{method}
+ @param func: the method to call when the download completes
+ @rtype: C{boolean}
+ @return: whether the download was successfully queued
+
+ """
+
+ assert path[0] == self.server
+ if self.closed:
+ return False
+
+ logger.debug('queueing request for '+'/'.join(path))
+ self.request_queue.append(CacheRequest(path, func))
+ self._run_queue()
+
+ return True
+
+ def _run_queue(self):
+ """Start the next element in the queue downloading."""
+
+ # Check if one is already running
+ if self.active or self.closed:
return
- self.headers = {'User-Agent': VERSION}
- self.active = False
+ # If the queue is empty, then we are done
+ if not self.request_queue:
+ self.handler.rawserver.add_task(self.auto_close, int(TIMEOUT)+1)
+ return
+
+ self.active = True
+ self.last_action = clock()
+ self.request = self.request_queue.pop(0)
+ self.url = '/' + '/'.join(self.request.path[1:])
logger.debug('starting thread to download '+self.url)
- rq = Thread(target = self._request, name = 'HTTPCache.CacheRequest._request')
+ rq = Thread(target = self._request, name = 'CacheRequest('+self.server+')')
rq.setDaemon(False)
rq.start()
- self.active = True
def _request(self):
"""Do the request."""
@@ -104,30 +191,77 @@
try:
logger.debug('sending request GET '+self.url+', '+str(self.headers))
- self.connection.request('GET',self.url, None, self.headers)
-
- r = self.connection.getresponse()
+ self.connection.request('GET', self.url, None, self.headers)
+
+ # Check for closed persistent connection due to server timeout
+ try:
+ r = self.connection.getresponse()
+ except BadStatusLine:
+ # Reopen the connection to get a new socket
+ logger.debug('persistent connection closed, attempting to reopen')
+ self.connection.close()
+ self.connection.connect()
+ logger.debug('sending request GET '+self.url+', '+str(self.headers))
+ self.connection.request('GET',self.url, None, self.headers)
+ r = self.connection.getresponse()
logger.debug('got response '+str(r.status)+', '+r.reason+', '+str(r.getheaders()))
- self.connection_status = r.status
- self.connection_response = r.reason
- self.headers = dict(r.getheaders())
- self.received_data = r.read()
+ self.request.save_response(r)
+ except gaierror, e:
+ logger.warning('could not contact http server '+self.server+': '+str(e))
+ self.request.error('could not contact http server '+self.server+': '+str(e))
except Exception, e:
logger.exception('error accessing http server')
- self.connection_status = 500
- self.connection_response = 'Internal Server Error'
- self.headers = {}
- self.received_data = 'error accessing http server: '+str(e)
+ self.request.error(str(e))
+ self.last_action = clock()
self.handler.rawserver.add_task(self.request_finished)
def request_finished(self):
"""Process the completed request."""
+
+ # Save the result
+ request = self.request
+ self.request = None
+
+ # Start the next queued item running
+ self.active = False
+ self._run_queue()
+
+ # Return the result
+ self.handler.download_complete(request.path, request.func,
+ request.response)
+
+ def auto_close(self):
+ """Close the connection if it has been idle."""
+ if (not self.active and not self.closed and not self.request and
+ not self.request_queue and (clock() - self.last_action) >= TIMEOUT):
+ self.close()
+
+ def close(self):
+ """Close the connection."""
+ logger.info('Closing the connection to: '+self.server)
+ self.closed = True
self.connection.close()
- self.active = False
- self.handler.download_complete(self, self.path, self.func,
- (self.connection_status, self.connection_response,
- self.headers, self.received_data))
+
+ # Process the current request
+ if self.request:
+ if not self.request.response:
+ self.request.error('connection closed prematurely')
+ self.handler.download_complete(self.request.path,
+ self.request.func,
+ self.request.response)
+ self.request = None
+
+ # Process any waiting requests
+ for request in self.request_queue:
+ if not request.response:
+ request.error('connection closed prematurely')
+ self.handler.download_complete(request.path, request.func,
+ request.response)
+ del self.request_queue[:]
+
+ # Remove the connection to the server
+ self.handler.remove(self, self.server)
class HTTPCache:
@@ -135,8 +269,9 @@
@type rawserver: L{Debtorrent.RawServer.RawServer}
@ivar rawserver: the server
- @type downloads: C{list} of L{CacheRequest}
- @ivar downloads: the list of all current downloads for the cache
+ @type downloads: C{dictionary}
+ @ivar downloads: the current downloads, keys are the server names, values
+ are the L{CacheConnection} objects used to download from the server
@type cachedir: C{string}
@ivar cachedir: the directory to save cache files in
@@ -153,7 +288,7 @@
"""
self.rawserver = rawserver
- self.downloads = []
+ self.downloads = {}
self.cachedir = cachedir
def download_get(self, path, func):
@@ -166,18 +301,35 @@
"""
- logger.info('Starting a downloader for: http://'+'/'.join(path))
- self.downloads.append(CacheRequest(self, path, func))
-
- def download_complete(self, d, path, func, r):
- """Remove a completed download from the list and process the data.
-
- Once a download has been completed, remove the downloader from the
- list and save the downloaded file in the file system. Then return the
- data to the callback function.
-
- @type d: L{CacheRequest}
- @param d: the cache request that is completed
+ if path[0] not in self.downloads:
+ logger.info('Opening a connection to server: '+path[0])
+ self.downloads[path[0]] = CacheConnection(self, path[0])
+
+ if not self.downloads[path[0]].queue(path, func):
+ func(path, (500, 'Internal Server Error',
+ {'Server': VERSION,
+ 'Content-Type': 'text/html; charset=iso-8859-1'},
+ 'Server could not be contacted'))
+
+ def remove(self, d, server):
+ """Remove a completed download connection.
+
+ @type d: L{CacheConnection}
+ @param d: the server connection that is no longer needed
+ @type server: C{string}
+ @param server: the server the connection was to
+
+ """
+
+ assert self.downloads[server] == d
+ del self.downloads[server]
+
+ def download_complete(self, path, func, r):
+ """Process the returned data from a request.
+
+ Once a download has been completed, save the downloaded file in the
+ file system. Then return the data to the callback function.
+
@type path: C{list} of C{string}
@param path: the server and path that was downloaded
@type func: C{method}
@@ -188,7 +340,6 @@
"""
logger.info('download completed for: http://'+'/'.join(path))
- self.downloads.remove(d)
file = self.get_filename(path)
headers = {'Server': VERSION}
Modified: debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py?rev=257&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py Thu Aug 16 09:59:54 2007
@@ -385,7 +385,12 @@
return
if not self.requests:
- logger.error('Got an answer when there was no request')
+ if httpreq is None:
+ # There's only one request allowed, so send the answer
+ self.send_answer(r, self.header, self.command, self.path,
+ self.encoding, self.headers)
+ else:
+ logger.error('got answer for unknown request')
return
if httpreq:
@@ -573,6 +578,7 @@
"""
+ logger.debug('new external connection')
self.connections[connection] = HTTPConnection(self, connection)
def connection_flushed(self, connection):
@@ -583,7 +589,9 @@
"""
+ logger.debug('connection flushed')
if self.connections[connection].done:
+ logger.debug('connection shutdown')
connection.shutdown(1)
def connection_lost(self, connection):
@@ -594,6 +602,7 @@
"""
+ logger.debug('connection lost')
ec = self.connections[connection]
ec.close()
del self.connections[connection]
@@ -610,6 +619,7 @@
c = self.connections[connection]
if not c.data_came_in(data) and not c.closed:
+ logger.debug('closing connection')
c.connection.shutdown(1)
def write_log(self, ip, ident, username, header,
Modified: debtorrent/branches/http1.1/DebTorrent/SocketHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/SocketHandler.py?rev=257&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/SocketHandler.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/SocketHandler.py Thu Aug 16 09:59:54 2007
@@ -92,6 +92,7 @@
self.ip = 'unknown'
else:
self.ip = ip
+ logger.debug('new socket: ' + self.ip)
def get_ip(self, real=False):
"""Get the IP address of the socket.
@@ -115,6 +116,7 @@
def close(self):
"""Close the socket."""
assert self.socket
+ logger.debug('close socket')
self.connected = False
sock = self.socket
self.socket = None
@@ -132,6 +134,7 @@
"""
+ logger.debug('socket shutdown:'+str(val))
self.socket.shutdown(val)
def is_flushed(self):
Modified: debtorrent/branches/http1.1/test.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/test.py?rev=257&op=diff
==============================================================================
--- debtorrent/branches/http1.1/test.py (original)
+++ debtorrent/branches/http1.1/test.py Thu Aug 16 09:59:54 2007
@@ -546,7 +546,7 @@
# Create apt's config files
f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w')
- f.write('deb http://localhost:' + str(num_down) + '988/' + mirror + '/ stable ' + suites + '\n')
+ f.write('deb debtorrent://localhost:' + str(num_down) + '988/' + mirror + '/ stable ' + suites + '\n')
f.close()
f = open(join([downloader_dir, 'etc', 'apt', 'apt.conf']), 'w')
More information about the Debtorrent-commits
mailing list