Thu Aug 16 09:59:54 UTC 2007

Author: camrdale-guest
Date: Thu Aug 16 09:59:54 2007
New Revision: 257

URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=257
Update the HTTP cache to not thread too many requests at a time.


Modified: debtorrent/branches/http1.1/DebTorrent/HTTPCache.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/HTTPCache.py?rev=257&op=diff
--- debtorrent/branches/http1.1/DebTorrent/HTTPCache.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/HTTPCache.py Thu Aug 16 09:59:54 2007
@@ -13,12 +13,16 @@
 @var VERSION: the UserAgent identifier sent to all sites
 @type alas: C{string}
 @var alas: the message to send when the data is not found
+ at type TIMEOUT: C{float}
+ at var TIMEOUT: the number of seconds after which an idle connection is closed
-from httplib import HTTPConnection
+from httplib import HTTPConnection, BadStatusLine
+from socket import gaierror
 from threading import Thread
 from DebTorrent.__init__ import product_name,version_short
+from clock import clock
 from os.path import join, split, getmtime, getsize, exists
 from os import utime, makedirs, listdir
 from time import strftime, strptime, gmtime
@@ -35,41 +39,24 @@
 time_format = '%a, %d %b %Y %H:%M:%S'
 VERSION = product_name+'/'+version_short
 alas = 'your file may exist elsewhere in the universe\nbut alas, not here\n'
+TIMEOUT = 60.0
 class CacheRequest:
-    """Download a file needed for the HTTP download cache.
-    @type handler: L{HTTPCache}
-    @ivar handler: the cache manager for the download
+    """A new request to send to the server for the cache.
     @type path: C{list} of C{string}
     @ivar path: the server and path to download
-    @type server: C{string}
-    @ivar server: the webserver address and port to connect to 
-    @type url: C{string}
-    @ivar url: the URL to request from the site
     @type func: C{method}
     @ivar func: the method to call when the download completes
-    @type connection: C{HTTPConnection}
-    @ivar connection: the connection to the HTTP server
-    @type headers: C{dictionary}
-    @ivar headres: the HTTP headers to send in the request, and the headers
-        returned by the response
-    @type active: C{boolean}
-    @ivar active: whether there is a download underway
-    @type received_data: C{string}
-    @ivar received_data: the data returned from the server
-    @type connection_status: C{int}
-    @ivar connection_status: the status code returned by the server
-    @type connection_response: C{string}
-    @ivar connection_response: the status message returned by the server
+    @type response: (C{int}, C{string}, C{dictionary}, C{string})
+    @ivar response: the HTTP status code, status message, headers, and
+        downloaded data
-    def __init__(self, handler, path, func):
+    def __init__(self, path, func):
         """Initialize the instance.
-        @type handler: L{HTTPCache}
-        @param handler: the cache manager for the download
         @type path: C{list} of C{string}
         @param path: the server and path to download
         @type func: C{method}
@@ -77,24 +64,124 @@
+        self.path = path
+        self.func = func
+        self.response = None
+    def save_response(self, r):
+        """Save a returned response from the server.
+        @type r: C{httplib.HTTPResponse}
+        @param r: the response from the server
+        """
+        self.response = (r.status, r.reason, dict(r.getheaders()), r.read())
+    def error(self, error_msg):
+        """Save an error response.
+        @type error_msg: C{string}
+        @param error_msg: the error that occurred
+        """
+        self.response = (502, 'Bad Gateway', {},
+                         'error accessing http server: '+error_msg)
+class CacheConnection:
+    """Download files needed for the HTTP download cache from a single server.
+    @type handler: L{HTTPCache}
+    @ivar handler: the cache manager for the download
+    @type server: C{string}
+    @ivar server: the webserver address and port to connect to 
+    @type request: L{CacheRequest}
+    @ivar request: the request currently in progress
+    @type request_queue: C{list} of L{CacheRequest}
+    @ivar request_queue: the waiting requests
+    @type connection: C{HTTPConnection}
+    @ivar connection: the connection to the HTTP server
+    @type url: C{string}
+    @ivar url: the URL to request from the site
+    @type headers: C{dictionary}
+    @ivar headers: the HTTP headers to send in the request
+    @type active: C{boolean}
+    @ivar active: whether there is a download underway
+    @type closed: C{boolean}
+    @ivar closed: whether ther connection has been closed
+    @type last_action: C{float}
+    @ivar last_action: the last time an action occurred
+    """
+    def __init__(self, handler, server):
+        """Initialize the instance.
+        @type handler: L{HTTPCache}
+        @param handler: the cache manager for the download
+        @type server: C{string}
+        @param server: the server name to send the requests to
+        """
         self.handler = handler
-        self.path = path
-        self.server = path[0]
-        self.url = '/' + '/'.join(path[1:])
-        self.func = func
+        self.server = server
+        self.request = None
+        self.request_queue = []
+        self.headers = {'User-Agent': VERSION}
+        self.active = False
+        self.closed = False
+        self.last_action = clock()
             self.connection = HTTPConnection(self.server)
-            logger.exception('cannot connect to http seed: '+self.server)
+            logger.exception('cannot connect to http server: '+self.server)
+            self.close()
+    def queue(self, path, func):
+        """Queue a download for later starting.
+        @type path: C{list} of C{string}
+        @param path: the server and path to download
+        @type func: C{method}
+        @param func: the method to call when the download completes
+        @rtype: C{boolean}
+        @return: whether the download was successfully queued
+        """
+        assert path[0] == self.server
+        if self.closed:
+            return False
+        logger.debug('queueing request for '+'/'.join(path))
+        self.request_queue.append(CacheRequest(path, func))
+        self._run_queue()
+        return True
+    def _run_queue(self):
+        """Start the next element in the queue downloading."""
+        # Check if one is already running
+        if self.active or self.closed:
-        self.headers = {'User-Agent': VERSION}
-        self.active = False
+        # If the queue is empty, then we are done
+        if not self.request_queue:
+            self.handler.rawserver.add_task(self.auto_close, int(TIMEOUT)+1)
+            return
+        self.active = True
+        self.last_action = clock()
+        self.request = self.request_queue.pop(0)
+        self.url = '/' + '/'.join(self.request.path[1:])
         logger.debug('starting thread to download '+self.url)
-        rq = Thread(target = self._request, name = 'HTTPCache.CacheRequest._request')
+        rq = Thread(target = self._request, name = 'CacheRequest('+self.server+')')
-        self.active = True
     def _request(self):
         """Do the request."""
@@ -104,30 +191,77 @@
             logger.debug('sending request GET '+self.url+', '+str(self.headers))
-            self.connection.request('GET',self.url, None, self.headers)
-            r = self.connection.getresponse()
+            self.connection.request('GET', self.url, None, self.headers)
+            # Check for closed persistent connection due to server timeout
+            try:
+                r = self.connection.getresponse()
+            except BadStatusLine:
+                # Reopen the connection to get a new socket
+                logger.debug('persistent connection closed, attempting to reopen')
+                self.connection.close()
+                self.connection.connect()
+                logger.debug('sending request GET '+self.url+', '+str(self.headers))
+                self.connection.request('GET',self.url, None, self.headers)
+                r = self.connection.getresponse()
             logger.debug('got response '+str(r.status)+', '+r.reason+', '+str(r.getheaders()))
-            self.connection_status = r.status
-            self.connection_response = r.reason
-            self.headers = dict(r.getheaders())
-            self.received_data = r.read()
+            self.request.save_response(r)
+        except gaierror, e:
+            logger.warning('could not contact http server '+self.server+': '+str(e))
+            self.request.error('could not contact http server '+self.server+': '+str(e))
         except Exception, e:
             logger.exception('error accessing http server')
-            self.connection_status = 500
-            self.connection_response = 'Internal Server Error'
-            self.headers = {}
-            self.received_data = 'error accessing http server: '+str(e)
+            self.request.error(str(e))
+        self.last_action = clock()
     def request_finished(self):
         """Process the completed request."""
+        # Save the result
+        request = self.request
+        self.request = None
+        # Start the next queued item running
+        self.active = False
+        self._run_queue()
+        # Return the result
+        self.handler.download_complete(request.path, request.func,
+                                       request.response)
+    def auto_close(self):
+        """Close the connection if it has been idle."""
+        if (not self.active and not self.closed and not self.request and 
+            not self.request_queue and (clock() - self.last_action) >= TIMEOUT):
+            self.close()
+    def close(self):
+        """Close the connection."""
+        logger.info('Closing the connection to: '+self.server)
+        self.closed = True
-        self.active = False
-        self.handler.download_complete(self, self.path, self.func, 
-                   (self.connection_status, self.connection_response, 
-                    self.headers, self.received_data))
+        # Process the current request
+        if self.request:
+            if not self.request.response:
+                self.request.error('connection closed prematurely')
+            self.handler.download_complete(self.request.path,
+                                           self.request.func,
+                                           self.request.response)
+            self.request = None
+        # Process any waiting requests
+        for request in self.request_queue:
+            if not request.response:
+                request.error('connection closed prematurely')
+            self.handler.download_complete(request.path, request.func,
+                                           request.response)
+        del self.request_queue[:]
+        # Remove the connection to the server
+        self.handler.remove(self, self.server)
 class HTTPCache:
@@ -135,8 +269,9 @@
     @type rawserver: L{Debtorrent.RawServer.RawServer}
     @ivar rawserver: the server
-    @type downloads: C{list} of L{CacheRequest}
-    @ivar downloads: the list of all current downloads for the cache
+    @type downloads: C{dictionary}
+    @ivar downloads: the current downloads, keys are the server names, values
+        are the L{CacheConnection} objects used to download from the server
     @type cachedir: C{string}
     @ivar cachedir: the directory to save cache files in
@@ -153,7 +288,7 @@
         self.rawserver = rawserver
-        self.downloads = []
+        self.downloads = {}
         self.cachedir = cachedir
     def download_get(self, path, func):
@@ -166,18 +301,35 @@
-        logger.info('Starting a downloader for: http://'+'/'.join(path))
-        self.downloads.append(CacheRequest(self, path, func))
-    def download_complete(self, d, path, func, r):
-        """Remove a completed download from the list and process the data.
-        Once a download has been completed, remove the downloader from the 
-        list and save the downloaded file in the file system. Then return the
-        data to the callback function. 
-        @type d: L{CacheRequest}
-        @param d: the cache request that is completed
+        if path[0] not in self.downloads:
+            logger.info('Opening a connection to server: '+path[0])
+            self.downloads[path[0]] = CacheConnection(self, path[0])
+        if not self.downloads[path[0]].queue(path, func):
+            func(path, (500, 'Internal Server Error', 
+                        {'Server': VERSION, 
+                         'Content-Type': 'text/html; charset=iso-8859-1'},
+                        'Server could not be contacted'))
+    def remove(self, d, server):
+        """Remove a completed download connection.
+        @type d: L{CacheConnection}
+        @param d: the server connection that is no longer needed
+        @type server: C{string}
+        @param server: the server the connection was to
+        """
+        assert self.downloads[server] == d
+        del self.downloads[server]
+    def download_complete(self, path, func, r):
+        """Process the returned data from a request.
+        Once a download has been completed, save the downloaded file in the
+        file system. Then return the data to the callback function.
         @type path: C{list} of C{string}
         @param path: the server and path that was downloaded
         @type func: C{method}
@@ -188,7 +340,6 @@
         logger.info('download completed for: http://'+'/'.join(path))
-        self.downloads.remove(d)
         file = self.get_filename(path)
         headers = {'Server': VERSION}

Modified: debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py?rev=257&op=diff
--- debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py Thu Aug 16 09:59:54 2007
@@ -385,7 +385,12 @@
         if not self.requests:
-            logger.error('Got an answer when there was no request')
+            if httpreq is None:
+                # There's only one request allowed, so send the answer
+                self.send_answer(r, self.header, self.command, self.path,
+                                 self.encoding, self.headers)
+            else:
+                logger.error('got answer for unknown request')
         if httpreq:
@@ -573,6 +578,7 @@
+        logger.debug('new external connection')
         self.connections[connection] = HTTPConnection(self, connection)
     def connection_flushed(self, connection):
@@ -583,7 +589,9 @@
+        logger.debug('connection flushed')
         if self.connections[connection].done:
+            logger.debug('connection shutdown')
     def connection_lost(self, connection):
@@ -594,6 +602,7 @@
+        logger.debug('connection lost')
         ec = self.connections[connection]
         del self.connections[connection]
@@ -610,6 +619,7 @@
         c = self.connections[connection]
         if not c.data_came_in(data) and not c.closed:
+            logger.debug('closing connection')
     def write_log(self, ip, ident, username, header,

Modified: debtorrent/branches/http1.1/DebTorrent/SocketHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/SocketHandler.py?rev=257&op=diff
--- debtorrent/branches/http1.1/DebTorrent/SocketHandler.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/SocketHandler.py Thu Aug 16 09:59:54 2007
@@ -92,6 +92,7 @@
                 self.ip = 'unknown'
                 self.ip = ip
+        logger.debug('new socket: ' + self.ip)
     def get_ip(self, real=False):
         """Get the IP address of the socket.
@@ -115,6 +116,7 @@
     def close(self):
         """Close the socket."""
         assert self.socket
+        logger.debug('close socket')
         self.connected = False
         sock = self.socket
         self.socket = None
@@ -132,6 +134,7 @@
+        logger.debug('socket shutdown:'+str(val))
     def is_flushed(self):

Modified: debtorrent/branches/http1.1/test.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/test.py?rev=257&op=diff
--- debtorrent/branches/http1.1/test.py (original)
+++ debtorrent/branches/http1.1/test.py Thu Aug 16 09:59:54 2007
@@ -546,7 +546,7 @@
         # Create apt's config files
         f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w')
-        f.write('deb http://localhost:' + str(num_down) + '988/' + mirror + '/ stable ' + suites + '\n')
+        f.write('deb debtorrent://localhost:' + str(num_down) + '988/' + mirror + '/ stable ' + suites + '\n')
         f = open(join([downloader_dir, 'etc', 'apt', 'apt.conf']), 'w')

