r203 - in /debtorrent/branches/http1.1: DebTorrent/BT1/AptListener.py DebTorrent/BT1/track.py DebTorrent/HTTPHandler.py DebTorrent/launchmanycore.py TODO test.py

camrdale-guest at users.alioth.debian.org camrdale-guest at users.alioth.debian.org
Mon Aug 6 23:17:58 UTC 2007


Author: camrdale-guest
Date: Mon Aug  6 23:17:57 2007
New Revision: 203

URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=203
Log:
Upgrade the HTTP server to support HTTP/1.1 connections, including persistent connections and pipelining.

Modified:
    debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
    debtorrent/branches/http1.1/DebTorrent/BT1/track.py
    debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py
    debtorrent/branches/http1.1/DebTorrent/launchmanycore.py
    debtorrent/branches/http1.1/TODO
    debtorrent/branches/http1.1/test.py

Modified: debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py Mon Aug  6 23:17:57 2007
@@ -96,9 +96,10 @@
     @ivar request_queue: the pending HTTP package requests that are waiting for download.
         Keys are the file names (including mirror) requested, values are dictionaries
         with keys of L{DebTorrent.HTTPHandler.HTTPConnection} objects and values of
-        (L{DebTorrent.download_bt1.BT1Download}, C{int}, C{list} of C{int}, C{float})
-        which are the torrent downloader, file index, list of pieces needed, and 
-        the time of the original request.
+        (L{DebTorrent.download_bt1.BT1Download}, C{int},
+        L{DebTorrent.HTTPHandler.HTTPRequest}, C{list} of C{int}, C{float})
+        which are the torrent downloader, file index, HTTP request object to answer, 
+        list of pieces needed, and the time of the original request.
     
     """
 
@@ -149,7 +150,7 @@
         self.request_queue = {}
         rawserver.add_task(self.process_queue, 1)
         
-    def enqueue_request(self, connection, file, downloader, file_num, pieces_needed):
+    def enqueue_request(self, connection, file, downloader, file_num, httpreq, pieces_needed):
         """Add a new download request to the queue of those waiting for pieces.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
@@ -160,6 +161,8 @@
         @param downloader: the torrent download that has the file
         @type file_num: C{int}
         @param file_num: the index of the file in the torrent
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: the HTTP request object to answer (for queueing)
         @type pieces_needed: C{list} of C{int}
         @param pieces_needed: the list of pieces in the torrent that still 
             need to download
@@ -174,7 +177,7 @@
 
         logger.info('queueing request as file '+file+' needs pieces: '+str(pieces_needed))
 
-        queue[connection] = (downloader, file_num, pieces_needed, clock())
+        queue[connection] = (downloader, file_num, httpreq, pieces_needed, clock())
         
     def process_queue(self):
         """Process the queue of waiting requests."""
@@ -192,16 +195,16 @@
                     continue
                     
                 # Remove the downloaded pieces from the list of needed ones
-                for piece in list(v[2]):
+                for piece in list(v[3]):
                     if v[0].storagewrapper.do_I_have(piece):
                         logger.debug('queued request for file '+file+' got piece '+str(piece))
-                        v[2].remove(piece)
+                        v[3].remove(piece)
                         
                 # If no more pieces are needed, return the answer and remove the request
-                if not v[2]:
+                if not v[3]:
                     logger.info('queued request for file '+file+' is complete')
                     closed_conns.append((file, c))
-                    self.answer_package(c, file, v[0], v[1])
+                    self.answer_package(c, file, v[0], v[1], v[2])
 
         # Remove closed/finished connections from the queue
         for (file, c) in closed_conns:
@@ -316,7 +319,7 @@
         return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/html; charset=iso-8859-1'}, """<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">\n<html><head><title>Meow</title>\n</head>\n<body style="color: rgb(255, 255, 255); background-color: rgb(0, 0, 0);">\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">I&nbsp;IZ&nbsp;TAKIN&nbsp;BRAKE</span></big></big></big><br></div>\n<pre><b><tt>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .-o=o-.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ,&nbsp; /=o=o=o=\ .--.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _|\|=o=O=o=O=|&nbsp;&nbsp;&nbsp; \<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; __.'&nbsp; a`\=o=o=o=(`\&nbsp;&nbsp; /<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; '.&nbsp;&nbsp; a 4/`|.-""'`\ \ ;'`)&nbsp;&nbsp; .---.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; \&nbsp;&nbsp; .'&nbsp; /&nbsp;&nbsp; .--'&nbsp; |_.'&nbsp;&nbsp; / .-._)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `)&nbsp; _.'&nbsp;&nbsp; /&nbsp;&nbsp;&nbsp;&nbsp; /`-.__.' /<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `'-.____;&nbsp;&nbsp;&nbsp;&nbsp; /'-.___.-'<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `\"""`</tt></b></pre>\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">FRM&nbsp;GETIN&nbsp;UR&nbsp;PACKAGES</span></big></big></big><br></div>\n</body>\n</html>""")
 
 
-    def get_cached(self, connection, path, headers):
+    def get_cached(self, connection, path, headers, httpreq):
         """Proxy the (possibly cached) download of a file from a mirror.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
@@ -325,6 +328,8 @@
         @param path: the path of the file to download, starting with the mirror name
         @type headers: C{dictionary}
         @param headers: the headers from the request
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: the HTTP request object to answer (for queueing)
         @rtype: (C{int}, C{string}, C{dictionary}, C{string})
         @return: the HTTP status code, status message, headers, and downloaded file
             (or None if the file is being downloaded)
@@ -344,10 +349,10 @@
             if r[0] not in (200, 304):
                 # Get Debs from the debtorrent download, others are straight download
                 if path[-1][-4:] == '.deb':
-                    return self.get_package(connection, path)
+                    return self.get_package(connection, path, httpreq)
                 else:
                     # Save the connection info and start downloading the file
-                    self.cache_waiting.setdefault('/'.join(path), []).append(connection)
+                    self.cache_waiting.setdefault('/'.join(path), []).append((connection, httpreq))
                     self.Cache.download_get(path, self.get_cached_callback)
                     return None
             
@@ -392,21 +397,23 @@
         if r[0] == 200 and path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
             self.got_Packages(path, r[3])
 
-        for connection in connections:
+        for (connection, httpreq) in connections:
             # Check to make sure the requester is still waiting
             if connection.closed:
                 logger.warning('Retrieved the file, but the requester is gone: '+'/'.join(path))
                 continue
             
-            connection.answer(r)
-            
-    def get_package(self, connection, path):
+            connection.answer(r, httpreq)
+            
+    def get_package(self, connection, path, httpreq):
         """Download a package file from a torrent.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
         @param connection: the conection the request came in on
         @type path: C{list} of C{string}
         @param path: the path of the file to download, starting with the mirror name
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: the HTTP request object to answer (for queueing)
         @rtype: (C{int}, C{string}, C{dictionary}, C{string})
         @return: the HTTP status code, status message, headers, and package data
             (or None if the package is to be downloaded)
@@ -447,12 +454,12 @@
         d.fileselector.set_priority(f, 1)
         
         # Add the connection to the list of those needing responses
-        self.enqueue_request(connection, '/'.join(path), d, f, pieces_needed)
+        self.enqueue_request(connection, '/'.join(path), d, f, httpreq, pieces_needed)
         
         return None
         
     
-    def answer_package(self, connection, file, d, f):
+    def answer_package(self, connection, file, d, f, httpreq):
         """Send the newly downloaded package file to the requester.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
@@ -463,6 +470,8 @@
         @param d: the torrent download that has the file
         @type f: C{int}
         @param f: the index of the file in the torrent
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: the HTTP request object to answer (for queueing)
         
         """
 
@@ -483,12 +492,12 @@
                 piecebuf.release()
         
         if not pieces_needed:
-            connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data))
+            connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data), httpreq)
             return
 
         # Something strange has happened, requeue it
         logger.warning('requeuing request for file '+str(f)+' as it still needs pieces: '+str(pieces_needed))
-        self.enqueue_request(connection, file, d, f, pieces_needed)
+        self.enqueue_request(connection, file, d, f, httpreq, pieces_needed)
         
     
     def got_Packages(self, path, data):
@@ -611,7 +620,7 @@
                 response)
 
 
-    def get(self, connection, path, headers):
+    def get(self, connection, path, headers, httpreq):
         """Respond to a GET request.
         
         Process a GET request from APT/browser/other. Process the request,
@@ -624,6 +633,8 @@
         @param path: the URL being requested
         @type headers: C{dictionary}
         @param headers: the headers from the request
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: the HTTP request object to answer (for queueing)
         @rtype: (C{int}, C{string}, C{dictionary}, C{string})
         @return: the HTTP status code, status message, headers, and message body
         
@@ -695,7 +706,7 @@
             if 'Packages.diff' in path:
                 return (404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
             
-            return self.get_cached(connection, path, headers)
+            return self.get_cached(connection, path, headers, httpreq)
             
         except ValueError, e:
             logger.exception('Bad request from: '+ip)

Modified: debtorrent/branches/http1.1/DebTorrent/BT1/track.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/BT1/track.py?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/BT1/track.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/BT1/track.py Mon Aug  6 23:17:57 2007
@@ -1125,7 +1125,7 @@
         return data
 
 
-    def get(self, connection, path, headers):
+    def get(self, connection, path, headers, httpreq):
         """Respond to a GET request to the tracker.
         
         Process a GET request from a peer/tracker/browser. Process the request,
@@ -1138,6 +1138,8 @@
         @param path: the URL being requested
         @type headers: C{dictionary}
         @param headers: the headers from the request
+        @type httpreq: L{DebTorrent.HTTPHandler.HTTPRequest}
+        @param httpreq: not used since HTTP 1.1 is not used by the tracker
         @rtype: (C{int}, C{string}, C{dictionary}, C{string})
         @return: the HTTP status code, status message, headers, and message body
         

Modified: debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/HTTPHandler.py Mon Aug  6 23:17:57 2007
@@ -49,6 +49,94 @@
         secs = time.time()
     return time.strftime('%Y-%m-%d %H:%M UTC', time.gmtime(secs))
 
+class HTTPRequest:
+    """A single request on an HTTP connection.
+    
+    Handles one of possibly many HTTP GET or HEAD requests from a client using
+    HTTP/1.1.
+    
+    @type header: C{string}
+    @ivar header: the first header line received from the request
+    @type command: C{string}
+    @ivar command: the requested command ('GET' or 'HEAD')
+    @type path: C{string}
+    @ivar path: the requested path to get
+    @type encoding: C{string}
+    @ivar encoding: the encoding to use when sending the response
+    @type headers: C{dictionary}
+    @ivar headers: the headers received with the request
+    @type pre1: C{boolean}
+    @ivar pre1: whether the request is from a pre version 1.0 client
+    @type answer: (C{int}, C{string}, C{dictionary}, C{string})
+    @ivar answer: the HTTP status code, status message, headers, and package
+        data, or None if the answer is not yet available
+    
+    """
+    
+    def __init__(self, header, command, path, encoding, headers, pre1):
+        """Initialize the instance.
+        
+        @type header: C{string}
+        @param header: the first header line received from the request
+        @type command: C{string}
+        @param command: the requested command ('GET' or 'HEAD')
+        @type path: C{string}
+        @param path: the requested path to get
+        @type encoding: C{string}
+        @param encoding: the encoding to use when sending the response
+        @type headers: C{dictionary}
+        @param headers: the headers received with the request
+        @type pre1: C{boolean}
+        @param pre1: whether the request is from a pre version 1.0 client
+        
+        """
+        
+        self.header = header
+        self.command = command
+        self.path = path
+        self.encoding = encoding
+        self.headers = headers
+        self.pre1 = pre1
+        self.answer = None
+        
+    def save_answer(self, r):
+        """Save an answer, replacing the old one if it's better.
+        
+        @type r: (C{int}, C{string}, C{dictionary}, C{string})
+        @param r: the HTTP status code, status message, headers, and package data
+        
+        """
+        
+        # Queue the answer
+        if self.answer:
+            logger.error('An answer already exists for this request, keeping the better one')
+            # Better means lower code, or newer response if codes are the same
+            if r[0] <= self.answer[0]:
+                self.answer = r
+        else:
+            self.answer = r
+        
+    def has_answer(self):
+        """Determine whether an answer is available for the request.
+        
+        @rtype: C{boolean}
+        @return: whether the answer is available yet
+        
+        """
+        
+        return not not self.answer
+
+    def get_answer(self):
+        """Get the saved answer.
+        
+        @rtype: (C{int}, C{string}, C{dictionary}, C{string})
+        @return: the HTTP status code, status message, headers, and package
+            data, or None if the answer is not yet available
+        
+        """
+        
+        return self.answer
+
 class HTTPConnection:
     """A single connection from an HTTP client.
     
@@ -60,6 +148,13 @@
     @ivar connection: the new connection that was created
     @type buf: C{string}
     @ivar buf: the buffered data received on the connection
+    @type requests: C{list} of L{HTTPRequest}
+    @ivar requests: the outstanding requests for paths
+    @type version: C{string}
+    @ivar version: the HTTP protocol version of the request
+    @type close_connection: C{boolean}
+    @ivar close_connection: whether the connection will be closed after this
+        request
     @type closed: C{boolean}
     @ivar closed: whether the connection has been closed
     @type done: C{boolean}
@@ -72,6 +167,8 @@
     @ivar header: the first header line received from the request
     @type command: C{string}
     @ivar command: the requested command ('GET' or 'HEAD')
+    @type path: C{string}
+    @ivar path: the requested path to get
     @type pre1: C{boolean}
     @ivar pre1: whether the request is from a pre version 1.0 client
     @type headers: C{dictionary}
@@ -94,9 +191,13 @@
         self.handler = handler
         self.connection = connection
         self.buf = ''
+        self.requests = []
+        self.version = None
+        self.close_connection = True
         self.closed = False
         self.done = False
         self.donereading = False
+        self.req_count = 0
         self.next_func = self.read_type
 
     def get_ip(self):
@@ -148,12 +249,35 @@
         
         """
         
+        self.req_count += 1
         self.header = data.strip()
         words = data.split()
         if len(words) == 3:
-            self.command, self.path, garbage = words
+            # Must be HTTP 1.0 or greater
+            self.command, self.path, self.version = words
             self.pre1 = False
+            
+            if self.handler.protocol >= "HTTP/1.1":
+                # Extract the version number from the request
+                if self.version[:5] != 'HTTP/':
+                    logger.error("Bad request version (%r)", self.version)
+                    return None
+                try:
+                    base_version_number = self.version.split('/', 1)[1]
+                    version_number = base_version_number.split(".")
+                    if len(version_number) != 2:
+                        logger.error("Bad request version (%r)", self.version)
+                        return None
+                    version_number = int(version_number[0]), int(version_number[1])
+                except (ValueError, IndexError):
+                    logger.error("Bad request version (%r)", self.version)
+                    return None
+                
+                # Use persistent connections for HTTP 1.1
+                if version_number >= (1, 1):
+                    self.close_connection = False
         elif len(words) == 2:
+            # Old HTTP 0.9 connections don't include the version and only support GET
             self.command, self.path = words
             self.pre1 = True
             if self.command != 'GET':
@@ -162,9 +286,12 @@
         else:
             logger.warning('connection closed, corrupt header line: '+data)
             return None
+        
         if self.command not in ('HEAD', 'GET'):
             logger.warning('connection closed, improper command: '+self.command)
             return None
+        
+        logger.info(str(self.req_count)+': '+self.header)
         self.headers = {}
         return self.read_header
 
@@ -179,16 +306,51 @@
         """
         
         data = data.strip()
+        
+        # A blank line indicates the headers are done
         if data == '':
-            self.donereading = True
+            # Get the encoding to use for the answer
             if self.headers.get('accept-encoding','').find('gzip') > -1:
                 self.encoding = 'gzip'
             else:
                 self.encoding = 'identity'
-            r = self.handler.getfunc(self, self.path, self.headers)
+                
+            # Check for persistent connection headers
+            conntype = self.headers.get('Connection', "").lower()
+            if conntype == 'close':
+                self.close_connection = True
+            elif conntype == 'keep-alive' and self.handler.protocol >= "HTTP/1.1":
+                self.close_connection = False
+
+            # If this is not the last request
+            newrequest = None
+            if not self.close_connection or self.requests:
+                newrequest = HTTPRequest(self.header, self.command, self.path,
+                                         self.encoding, self.headers, self.pre1)
+                self.requests.append(newrequest)
+
+            # Call the function to process the request
+            r = self.handler.getfunc(self, self.path, self.headers, newrequest)
+
+            # Send the answer if available
             if r is not None:
-                self.answer(r)
-            return None
+                if newrequest:
+                    # Multiple requests, so queue it for possible sending
+                    self.answer(r, newrequest)
+                else:
+                    # It's the only request, so just send it
+                    self.send_answer(r, self.header, self.command, self.path,
+                                     self.encoding, self.headers, self.pre1)
+                
+            # Request complete, close or wait for more
+            if self.close_connection:
+                self.donereading = True
+                return None
+            else:
+                self.close_connection = True
+                return self.read_type
+            
+        # Process the header line
         try:
             i = data.index(':')
         except ValueError:
@@ -198,8 +360,40 @@
         logger.debug(data[:i].strip() + ": " + data[i+1:].strip())
         return self.read_header
 
-    def answer(self, (responsecode, responsestring, headers, data)):
-        """Send a response to the client on the connection and close it.
+    def answer(self, r, httpreq):
+        """Add a response to the queued responses and check if any are ready to send.
+        
+        @type r: (C{int}, C{string}, C{dictionary}, C{string})
+        @param r: the HTTP status code, status message, headers, and package data
+        @type httpreq: L{HTTPRequest}
+        @param httpreq: the request the answer is for
+        
+        """
+        
+        if self.closed:
+            logger.warning('connection closed before anwswer, dropping data')
+            return
+        
+        if not self.requests:
+            logger.error('Got an answer when there was no request')
+            return
+
+        if httpreq:
+            if httpreq not in self.requests:
+                logger.error('Got an answer for an unknown request')
+            else:
+                httpreq.save_answer(r)
+
+        # Answer all possible requests
+        while self.requests and self.requests[0].has_answer():
+            httpreq = self.requests.pop(0)
+            r = httpreq.get_answer()
+            self.send_answer(r, httpreq.header, httpreq.command, httpreq.path,
+                             httpreq.encoding, httpreq.headers, httpreq.pre1)
+
+    def send_answer(self, (responsecode, responsestring, headers, data),
+                    header, command, path, encoding, req_headers, pre1):
+        """Send out the complete request.
         
         @type responsecode: C{int}
         @param responsecode: the response code to send
@@ -209,51 +403,71 @@
         @param headers: the headers to send with the response
         @type data: C{string}
         @param data: the data to send with the response
-        
-        """
-        
-        if self.closed:
-            logger.warning('connection closed before anwswer, dropping data')
-            return
-        if self.encoding == 'gzip':
+        @type header: C{string}
+        @param header: the first header line received from the request
+        @type command: C{string}
+        @param command: the requested command ('GET' or 'HEAD')
+        @type path: C{string}
+        @param path: the requested path to get
+        @type encoding: C{string}
+        @param encoding: the encoding to use when sending the response
+        @type req_headers: C{dictionary}
+        @param req_headers: the headers received with the request
+        @type pre1: C{boolean}
+        @param pre1: whether the request is from a pre version 1.0 client
+        
+        """
+
+        # Encode the response data
+        if encoding == 'gzip':
             compressed = StringIO()
             gz = GzipFile(fileobj = compressed, mode = 'wb', compresslevel = 9)
             gz.write(data)
             gz.close()
             cdata = compressed.getvalue()
             if len(cdata) >= len(data):
-                self.encoding = 'identity'
+                encoding = 'identity'
             else:
                 logger.debug('Compressed: '+str(len(cdata))+'  Uncompressed: '+str(len(data)))
                 data = cdata
                 headers['Content-Encoding'] = 'gzip'
 
         # i'm abusing the identd field here, but this should be ok
-        if self.encoding == 'identity':
+        if encoding == 'identity':
             ident = '-'
         else:
             ident = self.encoding
         self.handler.write_log( self.connection.get_ip(), ident, '-',
-                                self.header, responsecode, len(data),
-                                self.headers.get('referer','-'),
-                                self.headers.get('user-agent','-') )
-        self.done = True
+                                header, responsecode, len(data),
+                                req_headers.get('referer', '-'),
+                                req_headers.get('user-agent', '-') )
+
         logger.info('sending response: '+str(responsecode)+' '+responsestring+
                     ' ('+str(len(data))+' bytes)')
         r = StringIO()
-        r.write('HTTP/1.0 ' + str(responsecode) + ' ' + 
+        r.write(self.handler.protocol + ' ' + str(responsecode) + ' ' + 
             responsestring + '\r\n')
         if not self.pre1:
             headers['Content-Length'] = len(data)
             for key, value in headers.items():
                 r.write(key + ': ' + str(value) + '\r\n')
             r.write('\r\n')
-        if self.command != 'HEAD':
+        if command != 'HEAD':
             r.write(data)
         self.connection.write(r.getvalue())
-        if self.connection.is_flushed():
-            self.connection.shutdown(1)
-
+    
+    def close(self):
+        """Close the connection and drop all pending requests/answers."""
+        logger.debug('HTTP connection closed')
+        self.closed = True
+        del self.connection
+        self.next_func = None
+        for httpreq in self.requests:
+            if httpreq.has_answer():
+                logger.debug('Connection lost before answer could be sent: '+httpreq.path)
+        del self.requests[:]
+        
+        
 class HTTPHandler:
     """The handler for all new and existing HTTP connections.
     
@@ -270,10 +484,13 @@
     @ivar logfile: the file name to write the logs to
     @type log: C{file}
     @ivar log: the file to write the logs to
+    @type protocol: C{string}
+    @ivar protocol: the HTTP protocol version to use
     
     """
     
-    def __init__(self, getfunc, minflush, logfile = None, hupmonitor = None):
+    def __init__(self, getfunc, minflush, logfile = None, hupmonitor = None,
+                 protocol = 'HTTP/1.0'):
         """Initialize the instance.
         
         @type getfunc: C{method}
@@ -286,6 +503,9 @@
         @type hupmonitor: C{boolean}
         @param hupmonitor: whether to reopen the log file on a HUP signal
             (optional, default is False)
+        @type protocol: C{string}
+        @param protocol: the HTTP protocol version to use
+            (optional, defaults to HTTP/1.0)
         
         """
         
@@ -295,6 +515,7 @@
         self.lastflush = clock()
         self.logfile = None
         self.log = None
+        self.protocol = protocol
         if (logfile) and (logfile != '-'):
             try:
                 self.logfile = logfile
@@ -344,9 +565,7 @@
         """
         
         ec = self.connections[connection]
-        ec.closed = True
-        del ec.connection
-        del ec.next_func
+        ec.close()
         del self.connections[connection]
 
     def data_came_in(self, connection, data):

Modified: debtorrent/branches/http1.1/DebTorrent/launchmanycore.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/launchmanycore.py?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/launchmanycore.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/launchmanycore.py Mon Aug  6 23:17:57 2007
@@ -366,7 +366,8 @@
                    reuse = True, ipv6_socket_style = config['ipv6_binds_v4'])
             self.rawserver.set_handler(HTTPHandler(self.aptlistener.get, 
                                                    config['min_time_between_log_flushes'],
-                                                   logfile, config['hupmonitor']), 
+                                                   logfile, config['hupmonitor'],
+                                                   'HTTP/1.1'), 
                                        config['apt_port'])
     
             self.ratelimiter = RateLimiter(self.rawserver.add_task,

Modified: debtorrent/branches/http1.1/TODO
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/TODO?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/TODO (original)
+++ debtorrent/branches/http1.1/TODO Mon Aug  6 23:17:57 2007
@@ -38,15 +38,6 @@
 more efficient by adding callbacks to PiecePicker or StorageWrapper, so that
 when a piece comes in and passes the hash check, then the AptListener will
 process any queued requests for that piece.
-
-
-HTTPHandler should support HTTP/1.1 and persistent connections/pipelining
-
-Currently HTTPHandler is HTTP/1.0, and so doesn't support persistent 
-connections. These would be useful as APT could then pipeline multiple requests
-at a time to DebTorrent for processing. This would require something like the
-AptListener callbacks, as the connections would then have to support multiple 
-queued package requests.
 
 
 Different forms of HTTP Downloading may open too many connections

Modified: debtorrent/branches/http1.1/test.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/test.py?rev=203&op=diff
==============================================================================
--- debtorrent/branches/http1.1/test.py (original)
+++ debtorrent/branches/http1.1/test.py Mon Aug  6 23:17:57 2007
@@ -172,6 +172,84 @@
               (1, ['update']),
               ]),
 
+        '7': ('Test pipelining of multiple simultaneous downloads.',
+             {1: []},
+             {1: (1, [], {})},
+             [(1, ['update']), 
+              (1, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              ]),
+
+        '8': ('Test pipelining of multiple simultaneous downloads with many peers.',
+             {1: []},
+             {1: (1, [], {}),
+              2: (1, [], {}),
+              3: (1, [], {}),
+              4: (1, [], {}),
+              5: (1, [], {}),
+              6: (1, [], {})},
+             [(1, ['update']), 
+              (1, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              (2, ['update']), 
+              (2, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              (3, ['update']), 
+              (3, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              (4, ['update']), 
+              (4, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              (5, ['update']), 
+              (5, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              (6, ['update']), 
+              (6, ['install', 'aboot-base', 'aap-doc', 'ada-reference-manual',
+                   'aspectj-doc', 'fop-doc', 'jswat-doc', 'asis-doc',
+                   'bison-doc', 'crash-whitepaper', 'doc-iana',
+                   'bash-doc', 'apt-howto-common', 'autotools-dev',
+                   'aptitude-doc-en', 'armagetron-common', 'asr-manpages',
+                   'atomix-data', 'alcovebook-sgml-doc', 'alamin-doc',
+                   'aegis-doc', 'afbackup-common', 'airstrike-common',
+                   ]),
+              ]),
+
          }
 
 assert 'all' not in tests




More information about the Debtorrent-commits mailing list