r202 - /debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py

camrdale-guest at users.alioth.debian.org camrdale-guest at users.alioth.debian.org
Mon Aug 6 05:11:35 UTC 2007


Author: camrdale-guest
Date: Mon Aug  6 05:11:35 2007
New Revision: 202

URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=202
Log:
Switch the AptListener to queue requests by file name and then connection to allow for multiple requests per HTTP connection.

Modified:
    debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py

Modified: debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py?rev=202&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py Mon Aug  6 05:11:35 2007
@@ -93,8 +93,9 @@
         are lists of L{DebTorrent.HTTPHandler.HTTPConnection} objects which are the
         requests that are pending for that path.
     @type request_queue: C{dictionary}
-    @ivar request_queue: the pending HTTP get requests that are waiting for download.
-        Keys are L{DebTorrent.HTTPHandler.HTTPConnection} objects, values are
+    @ivar request_queue: the pending HTTP package requests that are waiting for download.
+        Keys are the file names (including mirror) requested, values are dictionaries
+        with keys of L{DebTorrent.HTTPHandler.HTTPConnection} objects and values of
         (L{DebTorrent.download_bt1.BT1Download}, C{int}, C{list} of C{int}, C{float})
         which are the torrent downloader, file index, list of pieces needed, and 
         the time of the original request.
@@ -148,11 +149,13 @@
         self.request_queue = {}
         rawserver.add_task(self.process_queue, 1)
         
-    def enqueue_request(self, connection, downloader, file_num, pieces_needed):
+    def enqueue_request(self, connection, file, downloader, file_num, pieces_needed):
         """Add a new download request to the queue of those waiting for pieces.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
         @param connection: the conection the request came in on
+        @type file: C{string}
+        @param file: the file to download, starting with the mirror name
         @type downloader: L{DebTorrent.download_bt1.BT1Download}
         @param downloader: the torrent download that has the file
         @type file_num: C{int}
@@ -163,11 +166,15 @@
         
         """
         
-        assert not self.request_queue.has_key(connection)
-        
-        logger.info('queueing request as file '+str(file_num)+' needs pieces: '+str(pieces_needed))
-
-        self.request_queue[connection] = (downloader, file_num, pieces_needed, clock())
+        # Get the file's queue and check it for this connection
+        queue = self.request_queue.setdefault(file, {})
+        if connection in queue:
+            logger.error('Received multiple requests for the same file on one connection')
+            return
+
+        logger.info('queueing request as file '+file+' needs pieces: '+str(pieces_needed))
+
+        queue[connection] = (downloader, file_num, pieces_needed, clock())
         
     def process_queue(self):
         """Process the queue of waiting requests."""
@@ -176,28 +183,31 @@
         self.rawserver.add_task(self.process_queue, 1)
         
         closed_conns = []
-        for c, v in self.request_queue.items():
-            # Check for a closed connection
-            if c.closed:
-                closed_conns.append(c)
-                logger.warning('connection closed while request queued for file '+str(v[1]))
-                continue
-                
-            # Remove the downloaded pieces from the list of needed ones
-            for piece in list(v[2]):
-                if v[0].storagewrapper.do_I_have(piece):
-                    logger.debug('queued request for file '+str(v[1])+' got piece '+str(piece))
-                    v[2].remove(piece)
+        for file, queue in self.request_queue.items():
+            for c, v in queue.items():
+                # Check for a closed connection
+                if c.closed:
+                    closed_conns.append((file, c))
+                    logger.warning('connection closed while request queued for file '+file)
+                    continue
                     
-            # If no more pieces are needed, return the answer and remove the request
-            if not v[2]:
-                logger.info('queued request for file '+str(v[1])+' is complete')
-                del self.request_queue[c]
-                self.answer_package(c, v[0], v[1])
-
-        # Remove closed connections from the queue
-        for c in closed_conns:
-            del self.request_queue[c]
+                # Remove the downloaded pieces from the list of needed ones
+                for piece in list(v[2]):
+                    if v[0].storagewrapper.do_I_have(piece):
+                        logger.debug('queued request for file '+file+' got piece '+str(piece))
+                        v[2].remove(piece)
+                        
+                # If no more pieces are needed, return the answer and remove the request
+                if not v[2]:
+                    logger.info('queued request for file '+file+' is complete')
+                    closed_conns.append((file, c))
+                    self.answer_package(c, file, v[0], v[1])
+
+        # Remove closed/finished connections from the queue
+        for (file, c) in closed_conns:
+            self.request_queue[file].pop(c)
+            if not self.request_queue[file]:
+                self.request_queue.pop(file)
 
 
     def get_infopage(self):
@@ -418,7 +428,9 @@
             if not d.storagewrapper.do_I_have(piece):
                 pieces_needed.append(piece)
             elif not pieces_needed:
-                data = data + d.storagewrapper.get_piece(piece, 0, -1).getarray().tostring()
+                piecebuf = d.storagewrapper.get_piece(piece, 0, -1)
+                data += piecebuf.getarray().tostring()
+                piecebuf.release()
         
         if not pieces_needed:
             return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data)
@@ -435,16 +447,18 @@
         d.fileselector.set_priority(f, 1)
         
         # Add the connection to the list of those needing responses
-        self.enqueue_request(connection, d, f, pieces_needed)
+        self.enqueue_request(connection, '/'.join(path), d, f, pieces_needed)
         
         return None
         
     
-    def answer_package(self, connection, d, f):
+    def answer_package(self, connection, file, d, f):
         """Send the newly downloaded package file to the requester.
         
         @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
         @param connection: the conection the request came in on
+        @type file: C{string}
+        @param file: the file to download, starting with the mirror name
         @type d: L{DebTorrent.download_bt1.BT1Download}
         @param d: the torrent download that has the file
         @type f: C{int}
@@ -464,7 +478,9 @@
             if not d.storagewrapper.do_I_have(piece):
                 pieces_needed.append(piece)
             elif not pieces_needed:
-                data = data + d.storagewrapper.get_piece(piece, 0, -1).getarray().tostring()
+                piecebuf = d.storagewrapper.get_piece(piece, 0, -1)
+                data += piecebuf.getarray().tostring()
+                piecebuf.release()
         
         if not pieces_needed:
             connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data))
@@ -472,7 +488,7 @@
 
         # Something strange has happened, requeue it
         logger.warning('requeuing request for file '+str(f)+' as it still needs pieces: '+str(pieces_needed))
-        self.enqueue_request(connection, d, f, pieces_needed)
+        self.enqueue_request(connection, file, d, f, pieces_needed)
         
     
     def got_Packages(self, path, data):




More information about the Debtorrent-commits mailing list