r358 - in /debtorrent/trunk/DebTorrent: BT1/AptListener.py BT1/makemetafile.py HTTPCache.py HTTPHandler.py SocketHandler.py
camrdale-guest at users.alioth.debian.org
camrdale-guest at users.alioth.debian.org
Sun Jan 27 00:35:48 UTC 2008
Author: camrdale-guest
Date: Sun Jan 27 00:35:48 2008
New Revision: 358
URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=358
Log:
HTTPCache saves the data to a file and returns only the file name.
Modified:
debtorrent/trunk/DebTorrent/BT1/AptListener.py
debtorrent/trunk/DebTorrent/BT1/makemetafile.py
debtorrent/trunk/DebTorrent/HTTPCache.py
debtorrent/trunk/DebTorrent/HTTPHandler.py
debtorrent/trunk/DebTorrent/SocketHandler.py
Modified: debtorrent/trunk/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/AptListener.py?rev=358&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/AptListener.py Sun Jan 27 00:35:48 2008
@@ -354,10 +354,10 @@
uptodate = False
# First check the cache for the file
- r = self.Cache.cache_get(path, uptodate, headers.get('if-modified-since', ''))
+ r, filename = self.Cache.cache_get(path, uptodate, headers.get('if-modified-since', ''))
# If the cache doesn't have it
- if r[0] not in (200, 304):
+ if not filename:
# Get Debs from the debtorrent download, others are straight download
if path[-1][-4:] == '.deb':
return self.get_package(connection, path, httpreq)
@@ -368,16 +368,12 @@
return None
if path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
- if r[0] == 304:
- # Oops, we do need the cached file after all to start the torrent
- logger.info('retrieving the cached Packages file to start the torrent')
- r2 = self.Cache.cache_get(path)
- TorrentCreator(path, r2[3], self.start_torrent,
- self.rawserver.add_task, self.config)
- else:
- TorrentCreator(path, r[3], self.start_torrent,
- self.rawserver.add_task, self.config)
-
+ TorrentCreator(path, filename, self.start_torrent,
+ self.rawserver.add_task, self.config)
+
+ # Returning a file, so open the file to be returned
+ r = r[0:3] + (open(filename, 'rb'), )
+
return r
except IOError, e:
@@ -389,13 +385,15 @@
msg = 'Unknown error occurred'
return (status, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, msg)
- def get_cached_callback(self, path, r):
+ def get_cached_callback(self, path, r, filename):
"""Return the newly cached file to the waiting connection.
@type path: C{list} of C{string}
@param path: the path of the file to download, starting with the mirror name
@type r: (C{int}, C{string}, C{dictionary}, C{string})
@param r: the HTTP status code, status message, headers, and cached data
+ @type filename: C{string}
+ @param filename: the file containing the successfully downloaded file
"""
@@ -408,8 +406,12 @@
# If it's a torrent file, start it
if r[0] == 200 and path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
- TorrentCreator(path, r[3], self.start_torrent,
+ TorrentCreator(path, filename, self.start_torrent,
self.rawserver.add_task, self.config)
+
+ if filename:
+ # Returning a file, so open the file to be returned
+ r = r[0:3] + (open(filename, 'rb'), )
for (connection, httpreq) in connections:
# Check to make sure the requester is still waiting
Modified: debtorrent/trunk/DebTorrent/BT1/makemetafile.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/makemetafile.py?rev=358&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/makemetafile.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/makemetafile.py Sun Jan 27 00:35:48 2008
@@ -30,7 +30,7 @@
from traceback import print_exc
from DebTorrent.zurllib import urlopen
import gzip
-from bz2 import decompress
+from bz2 import BZ2File
from StringIO import StringIO
from re import subn
from debian_bundle import deb822
@@ -749,13 +749,13 @@
"""
- def __init__(self, path, data, callback, sched, config):
+ def __init__(self, path, filename, callback, sched, config):
"""Process a downloaded Packages file and start the torrent making thread.
@type path: C{list} of C{string}
@param path: the path of the file to download, starting with the mirror name
- @type data: C{string}
- @param data: the downloaded Packages file
+ @type filename: C{string}
+ @param filename: the file containing the downloaded Packages file
@type callback: C{method}
@param callback: the method to call with the torrent when it has been created
@type sched: C{method}
@@ -766,7 +766,7 @@
"""
self.path = path
- self.data = data
+ self.filename = filename
self.callback = callback
self.sched = sched
self.config = config
@@ -782,27 +782,20 @@
def _create(self):
"""Process a downloaded Packages file and start a torrent."""
- h = []
try:
- # Decompress the data
- if self.path[-1].endswith('.gz'):
- compressed = StringIO(self.data)
- f = GzipFile(fileobj = compressed)
- self.data = f.read()
- elif self.path[-1].endswith('.bz2'):
- self.data = decompress(self.data)
-
- assert self.data[:8] == "Package:"
- h = self.data.split('\n')
- self.data = ''
+ # Open and possibly decompress the file
+ if self.filename.endswith('.gz'):
+ f = gzip.open(self.filename, 'r')
+ elif self.filename.endswith('.bz2'):
+ f = BZ2File(self.filename)
+ else:
+ f = open(self.filename)
except:
- logger.warning('Packages file is not in the correct format')
- self.data = ''
- del h[:]
+ logger.warning('Packages file could not be opened')
self.sched(self._finished)
return
- logger.debug('Packages file successfully decompressed')
+ logger.debug('Packages file successfully opened')
try:
sub_pieces = getsubpieces('_'.join(self.path))
@@ -815,16 +808,16 @@
if self.config['separate_all'] in [1, 3]:
(piece_ordering_all, ordering_all_headers) = getordering('_'.join(self.path), all = True)
- (info, info_all) = getpieces(h, separate_all = self.config['separate_all'],
+ (info, info_all) = getpieces(f, separate_all = self.config['separate_all'],
sub_pieces = sub_pieces,
piece_ordering = piece_ordering,
piece_ordering_all = piece_ordering_all,
num_pieces = int(ordering_headers.get('NextPiece', 0)),
num_all_pieces = int(ordering_all_headers.get('NextPiece', 0)))
- del h[:]
+ f.close()
except:
logger.exception('Failed to create torrent for: %s', self.name)
- del h[:]
+ f.close()
return
name = self.name
Modified: debtorrent/trunk/DebTorrent/HTTPCache.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/HTTPCache.py?rev=358&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/HTTPCache.py (original)
+++ debtorrent/trunk/DebTorrent/HTTPCache.py Sun Jan 27 00:35:48 2008
@@ -43,24 +43,29 @@
@ivar path: the server and path to download
@type func: C{method}
@ivar func: the method to call when the download completes
+ @type filename: C{string}
+ @ivar filename: the file to save the downloaded data to
@type response: (C{int}, C{string}, C{dictionary}, C{string})
@ivar response: the HTTP status code, status message, headers, and
downloaded data
"""
- def __init__(self, path, func):
+ def __init__(self, path, func, filename):
"""Initialize the instance.
@type path: C{list} of C{string}
@param path: the server and path to download
@type func: C{method}
@param func: the method to call when the download completes
+ @type filename: C{string}
+ @param filename: the file to save the downloaded data to
"""
self.path = path
self.func = func
+ self.filename = filename
self.response = None
def save_response(self, r):
@@ -71,7 +76,44 @@
"""
- self.response = (r.status, r.reason, dict(r.getheaders()), r.read())
+ logger.debug('processing response: %s', self.filename)
+ if r.status == 200:
+ # Create the directory for the new file
+ new_dir = split(self.filename)[0]
+ if new_dir != '' and not exists(new_dir):
+ makedirs(new_dir)
+
+ # Write the new file
+ f = open(self.filename, 'wb')
+ data = r.read(4096)
+ while len(data) > 0:
+ f.write(data)
+ data = r.read(4096)
+ f.close()
+ r.close()
+
+ # Set the modified time (on error use current time which should work)
+ try:
+ mtime = timegm(strptime(r.getheader('last-modified'), time_format + ' %Z'))
+ times = (mtime, mtime)
+ utime(self.filename, times)
+ except:
+ logger.exception('Failed to set the cache time for the file')
+ else:
+ data = r.read()
+
+ headers = {'Server': VERSION}
+
+ # Use the headers we want
+ if exists(self.filename):
+ mtime_string = strftime(time_format + ' GMT', gmtime(getmtime(self.filename)))
+ headers['last-modified'] = mtime_string
+
+ for k, v in r.getheaders():
+ if k in ('last-modified', 'content-type'):
+ headers[k] = v
+
+ self.response = (r.status, r.reason, headers, data)
def error(self, error_msg):
"""Save an error response.
@@ -135,24 +177,22 @@
logger.exception('cannot connect to http server: '+self.server)
self.close()
- def queue(self, path, func):
+ def queue(self, req):
"""Queue a download for later starting.
- @type path: C{list} of C{string}
- @param path: the server and path to download
- @type func: C{method}
- @param func: the method to call when the download completes
+ @type req: L{CacheRequest}
+ @param path: the cache request object to queue
@rtype: C{boolean}
@return: whether the download was successfully queued
"""
- assert path[0] == self.server
+ assert req.path[0] == self.server
if self.closed:
return False
- logger.debug('queueing request for '+'/'.join(path))
- self.request_queue.append(CacheRequest(path, func))
+ logger.debug('queueing request for '+'/'.join(req.path))
+ self.request_queue.append(req)
self._run_queue()
return True
@@ -223,8 +263,7 @@
self._run_queue()
# Return the result
- self.handler.download_complete(request.path, request.func,
- request.response)
+ self.handler.download_complete(request)
def auto_close(self):
"""Close the connection if it has been idle."""
@@ -242,17 +281,14 @@
if self.request:
if not self.request.response:
self.request.error('connection closed prematurely')
- self.handler.download_complete(self.request.path,
- self.request.func,
- self.request.response)
+ self.handler.download_complete(self.request)
self.request = None
# Process any waiting requests
for request in self.request_queue:
if not request.response:
request.error('connection closed prematurely')
- self.handler.download_complete(request.path, request.func,
- request.response)
+ self.handler.download_complete(request)
del self.request_queue[:]
# Remove the connection to the server
@@ -300,7 +336,8 @@
logger.info('Opening a connection to server: '+path[0])
self.downloads[path[0]] = CacheConnection(self, path[0])
- if not self.downloads[path[0]].queue(path, func):
+ filename = self.get_filename(path)
+ if not self.downloads[path[0]].queue(CacheRequest(path, func, filename)):
func(path, (500, 'Internal Server Error',
{'Server': VERSION,
'Content-Type': 'text/html; charset=iso-8859-1'},
@@ -319,56 +356,26 @@
assert self.downloads[server] == d
del self.downloads[server]
- def download_complete(self, path, func, r):
+ def download_complete(self, req):
"""Process the returned data from a request.
- Once a download has been completed, save the downloaded file in the
- file system. Then return the data to the callback function.
-
- @type path: C{list} of C{string}
- @param path: the server and path that was downloaded
- @type func: C{method}
- @param func: the method to call with the data
- @type r: (C{int}, C{string}, C{dictionary}, C{string})
- @param r: the HTTP status code, status message, headers, and downloaded data
-
- """
-
- logger.info('download completed for: http://'+'/'.join(path))
-
- file = self.get_filename(path)
- headers = {'Server': VERSION}
-
- if r[0] == 200:
- # Create the directory for the new file
- new_dir = split(file)[0]
- if new_dir != '' and not exists(new_dir):
- makedirs(new_dir)
-
- # Write the new file
- f = open(file, 'wb')
- f.write(r[3])
- f.close()
-
- # Set the modified time (on error use current time which should work)
- try:
- mtime = timegm(strptime(r[2]['last-modified'], time_format + ' %Z'))
- times = (mtime, mtime)
- utime(file, times)
- except:
- logger.exception('Failed to set the cache time for the file')
-
- # Use the headers we want
- if exists(file):
- mtime_string = strftime(time_format + ' GMT', gmtime(getmtime(file)))
- headers['last-modified'] = mtime_string
-
- for k, v in r[2].items():
- if k in ('last-modified', 'content-type'):
- headers[k] = v
-
+ Once a download has been completed, return the data to the callback
+ function.
+
+ @type req: L{CacheRequest}
+ @param path: the cache request object containing the response
+
+ """
+
+ logger.info('download completed for: http://'+'/'.join(req.path))
+
+ # Return None for the filename if the download failed
+ filename = req.filename
+ if req.response[0] not in [200, 304]:
+ filename = None
+
# Call the callback function
- func(path, (r[0], r[1], headers, r[3]))
+ req.func(req.path, req.response, filename)
def cache_get(self, path, uptodate = False, if_modified_time = ''):
"""Get the file from the cache.
@@ -387,8 +394,9 @@
@type if_modified_time: C{string}
@param if_modified_time: the if-modified-since header from the request
(optional, defaults to not checking the if-modified-time)
- @rtype: (C{int}, C{string}, C{dictionary}, C{string})
- @return: the HTTP status code, status message, headers, and package data
+ @rtype: (C{int}, C{string}, C{dictionary}, C{string}), C{string}
+ @return: the HTTP status code, status message, headers, and error string,
+ and the name of the file in the cache, if it was found
"""
@@ -397,7 +405,7 @@
# Check if the file isn't in the cache
if not exists(file):
logger.info('cache miss: '+file)
- return (404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
+ return (404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas), None
if uptodate:
# Get the last modified time from the server
@@ -410,21 +418,18 @@
# Check if the cached data is stale
if self.check_mtime(last_modified, file = file) > 0:
logger.info('cache out of date: '+file)
- return (405, 'Method Not Allowed', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
+ return (405, 'Method Not Allowed', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas), None
# Check if the request needs the data
if if_modified_time and self.check_mtime(if_modified_time, file = file) >= 0:
logger.info('cache up to date and so is request: '+file)
- return (304, 'Not Modified', {'Server': VERSION, 'Pragma': 'no-cache'}, '')
+ return (304, 'Not Modified', {'Server': VERSION, 'Pragma': 'no-cache'}, ''), file
# Read in the file and return the data
- f = open(file, 'rb')
- data = f.read()
- f.close()
mtime_string = strftime(time_format+' GMT', gmtime(getmtime(file)))
logger.info('cache hit: '+file)
- return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain', 'Last-Modified': mtime_string}, data)
+ return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain', 'Last-Modified': mtime_string}, ''), file
def get_filename(self, path):
"""Get the file name used for this path in the cache.
Modified: debtorrent/trunk/DebTorrent/HTTPHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/HTTPHandler.py?rev=358&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/HTTPHandler.py (original)
+++ debtorrent/trunk/DebTorrent/HTTPHandler.py Sun Jan 27 00:35:48 2008
@@ -20,7 +20,7 @@
import time
from clock import clock
from gzip import GzipFile
-import signal, logging
+import signal, os, logging
logger = logging.getLogger('DebTorrent.HTTPHandler')
@@ -443,7 +443,7 @@
"""
# Encode the response data
- if encoding == 'gzip':
+ if encoding == 'gzip' and isinstance(data, str):
compressed = StringIO()
gz = GzipFile(fileobj = compressed, mode = 'wb', compresslevel = 9)
gz.write(data)
@@ -456,18 +456,24 @@
data = cdata
headers['Content-Encoding'] = 'gzip'
+ # Determine the length of the data to be written
+ if isinstance(data, str):
+ size = len(data)
+ else:
+ size = os.fstat(data.fileno())[6] - data.tell()
+
# i'm abusing the identd field here, but this should be ok
if encoding == 'identity':
ident = '-'
else:
ident = self.encoding
self.handler.write_log( self.connection.get_ip(), ident, '-',
- header, responsecode, len(data),
+ header, responsecode, size,
req_headers.get('referer', '-'),
req_headers.get('user-agent', '-') )
logger.info('sending response: '+self.protocol+' '+str(responsecode)+' '+responsestring+
- ' ('+str(len(data))+' bytes)' + repr(headers))
+ ' ('+str(size)+' bytes)' + repr(headers))
r = StringIO()
@@ -481,7 +487,7 @@
# Write the individual headers
if self.version >= (1, 0) or self.protocol != 'HTTP':
- headers['Content-Length'] = len(data)
+ headers['Content-Length'] = size
for key, value in headers.items():
r.write(key + ': ' + str(value) + '\r\n')
r.write('\r\n')
Modified: debtorrent/trunk/DebTorrent/SocketHandler.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/SocketHandler.py?rev=358&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/SocketHandler.py (original)
+++ debtorrent/trunk/DebTorrent/SocketHandler.py Sun Jan 27 00:35:48 2008
@@ -191,7 +191,7 @@
try:
while self.buffer:
# Read data from the file and put in on the buffer
- while type(self.buffer[0]) == file:
+ while self.buffer and type(self.buffer[0]) == file:
data = self.buffer[0].read(4096)
if len(data) > 0:
self.buffer.insert(0, data)
More information about the Debtorrent-commits
mailing list