r202 - /debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
camrdale-guest at users.alioth.debian.org
camrdale-guest at users.alioth.debian.org
Mon Aug 6 05:11:35 UTC 2007
Author: camrdale-guest
Date: Mon Aug 6 05:11:35 2007
New Revision: 202
URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=202
Log:
Switch the AptListener to queue requests by file name and then connection to allow for multiple requests per HTTP connection.
Modified:
debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
Modified: debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py?rev=202&op=diff
==============================================================================
--- debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/branches/http1.1/DebTorrent/BT1/AptListener.py Mon Aug 6 05:11:35 2007
@@ -93,8 +93,9 @@
are lists of L{DebTorrent.HTTPHandler.HTTPConnection} objects which are the
requests that are pending for that path.
@type request_queue: C{dictionary}
- @ivar request_queue: the pending HTTP get requests that are waiting for download.
- Keys are L{DebTorrent.HTTPHandler.HTTPConnection} objects, values are
+ @ivar request_queue: the pending HTTP package requests that are waiting for download.
+ Keys are the file names (including mirror) requested, values are dictionaries
+ with keys of L{DebTorrent.HTTPHandler.HTTPConnection} objects and values of
(L{DebTorrent.download_bt1.BT1Download}, C{int}, C{list} of C{int}, C{float})
which are the torrent downloader, file index, list of pieces needed, and
the time of the original request.
@@ -148,11 +149,13 @@
self.request_queue = {}
rawserver.add_task(self.process_queue, 1)
- def enqueue_request(self, connection, downloader, file_num, pieces_needed):
+ def enqueue_request(self, connection, file, downloader, file_num, pieces_needed):
"""Add a new download request to the queue of those waiting for pieces.
@type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
@param connection: the conection the request came in on
+ @type file: C{string}
+ @param file: the file to download, starting with the mirror name
@type downloader: L{DebTorrent.download_bt1.BT1Download}
@param downloader: the torrent download that has the file
@type file_num: C{int}
@@ -163,11 +166,15 @@
"""
- assert not self.request_queue.has_key(connection)
-
- logger.info('queueing request as file '+str(file_num)+' needs pieces: '+str(pieces_needed))
-
- self.request_queue[connection] = (downloader, file_num, pieces_needed, clock())
+ # Get the file's queue and check it for this connection
+ queue = self.request_queue.setdefault(file, {})
+ if connection in queue:
+ logger.error('Received multiple requests for the same file on one connection')
+ return
+
+ logger.info('queueing request as file '+file+' needs pieces: '+str(pieces_needed))
+
+ queue[connection] = (downloader, file_num, pieces_needed, clock())
def process_queue(self):
"""Process the queue of waiting requests."""
@@ -176,28 +183,31 @@
self.rawserver.add_task(self.process_queue, 1)
closed_conns = []
- for c, v in self.request_queue.items():
- # Check for a closed connection
- if c.closed:
- closed_conns.append(c)
- logger.warning('connection closed while request queued for file '+str(v[1]))
- continue
-
- # Remove the downloaded pieces from the list of needed ones
- for piece in list(v[2]):
- if v[0].storagewrapper.do_I_have(piece):
- logger.debug('queued request for file '+str(v[1])+' got piece '+str(piece))
- v[2].remove(piece)
+ for file, queue in self.request_queue.items():
+ for c, v in queue.items():
+ # Check for a closed connection
+ if c.closed:
+ closed_conns.append((file, c))
+ logger.warning('connection closed while request queued for file '+file)
+ continue
- # If no more pieces are needed, return the answer and remove the request
- if not v[2]:
- logger.info('queued request for file '+str(v[1])+' is complete')
- del self.request_queue[c]
- self.answer_package(c, v[0], v[1])
-
- # Remove closed connections from the queue
- for c in closed_conns:
- del self.request_queue[c]
+ # Remove the downloaded pieces from the list of needed ones
+ for piece in list(v[2]):
+ if v[0].storagewrapper.do_I_have(piece):
+ logger.debug('queued request for file '+file+' got piece '+str(piece))
+ v[2].remove(piece)
+
+ # If no more pieces are needed, return the answer and remove the request
+ if not v[2]:
+ logger.info('queued request for file '+file+' is complete')
+ closed_conns.append((file, c))
+ self.answer_package(c, file, v[0], v[1])
+
+ # Remove closed/finished connections from the queue
+ for (file, c) in closed_conns:
+ self.request_queue[file].pop(c)
+ if not self.request_queue[file]:
+ self.request_queue.pop(file)
def get_infopage(self):
@@ -418,7 +428,9 @@
if not d.storagewrapper.do_I_have(piece):
pieces_needed.append(piece)
elif not pieces_needed:
- data = data + d.storagewrapper.get_piece(piece, 0, -1).getarray().tostring()
+ piecebuf = d.storagewrapper.get_piece(piece, 0, -1)
+ data += piecebuf.getarray().tostring()
+ piecebuf.release()
if not pieces_needed:
return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data)
@@ -435,16 +447,18 @@
d.fileselector.set_priority(f, 1)
# Add the connection to the list of those needing responses
- self.enqueue_request(connection, d, f, pieces_needed)
+ self.enqueue_request(connection, '/'.join(path), d, f, pieces_needed)
return None
- def answer_package(self, connection, d, f):
+ def answer_package(self, connection, file, d, f):
"""Send the newly downloaded package file to the requester.
@type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
@param connection: the conection the request came in on
+ @type file: C{string}
+ @param file: the file to download, starting with the mirror name
@type d: L{DebTorrent.download_bt1.BT1Download}
@param d: the torrent download that has the file
@type f: C{int}
@@ -464,7 +478,9 @@
if not d.storagewrapper.do_I_have(piece):
pieces_needed.append(piece)
elif not pieces_needed:
- data = data + d.storagewrapper.get_piece(piece, 0, -1).getarray().tostring()
+ piecebuf = d.storagewrapper.get_piece(piece, 0, -1)
+ data += piecebuf.getarray().tostring()
+ piecebuf.release()
if not pieces_needed:
connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data))
@@ -472,7 +488,7 @@
# Something strange has happened, requeue it
logger.warning('requeuing request for file '+str(f)+' as it still needs pieces: '+str(pieces_needed))
- self.enqueue_request(connection, d, f, pieces_needed)
+ self.enqueue_request(connection, file, d, f, pieces_needed)
def got_Packages(self, path, data):
More information about the Debtorrent-commits
mailing list