r109 - in /debtorrent/trunk/DebTorrent: BT1/AptListener.py BT1/Storage.py HTTPCache.py download_bt1.py
camrdale-guest at users.alioth.debian.org
camrdale-guest at users.alioth.debian.org
Thu Jun 14 23:44:12 UTC 2007
Author: camrdale-guest
Date: Thu Jun 14 23:44:12 2007
New Revision: 109
URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=109
Log:
Add caching to the proxy downloading done by AptListener.
Added:
debtorrent/trunk/DebTorrent/HTTPCache.py (with props)
Modified:
debtorrent/trunk/DebTorrent/BT1/AptListener.py
debtorrent/trunk/DebTorrent/BT1/Storage.py
debtorrent/trunk/DebTorrent/download_bt1.py
Modified: debtorrent/trunk/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/AptListener.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/AptListener.py Thu Jun 14 23:44:12 2007
@@ -38,6 +38,7 @@
from binascii import b2a_hex, a2b_hex, a2b_base64
from string import lower
from makemetafile import getpieces, getsubpieces, uniconvert
+from DebTorrent.HTTPCache import HTTPCache
import sys, os
import signal
import re
@@ -188,6 +189,13 @@
@ivar uq_broken: unknown
@type Filter: unknown
@ivar Filter: unknown
+ @type Cache: L{DebTorrent.HTTPCache.HTTPCache}
+ @ivar Cache: the cache of downloaded files
+ @type cache_waiting: C{dictionary}
+ @ivar cache_waiting: the pending HTTP get requests that are waiting for download
+ from the cache. Keys are strings that are the path being requested, values
+ are lists of L{DebTorrent.HTTPHandler.HTTPConnection} objects which are the
+ requests that are pending for that path.
@type request_queue: C{dictionary}
@ivar request_queue: the pending HTTP get requests that are waiting for download.
Keys are L{DebTorrent.HTTPHandler.HTTPConnection} objects, values are
@@ -301,6 +309,8 @@
self.uq_broken = unquote('+') != ' '
self.Filter = Filter(rawserver.add_task)
+ self.Cache = HTTPCache(rawserver)
+ self.cache_waiting = {}
self.request_queue = {}
rawserver.add_task(self.process_queue, 1)
@@ -457,35 +467,43 @@
return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/html; charset=iso-8859-1'}, """<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">\n<html><head><title>Meow</title>\n</head>\n<body style="color: rgb(255, 255, 255); background-color: rgb(0, 0, 0);">\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">I IZ TAKIN BRAKE</span></big></big></big><br></div>\n<pre><b><tt> .-o=o-.<br> , /=o=o=o=\ .--.<br> _|\|=o=O=o=O=| \<br> __.' a`\=o=o=o=(`\ /<br> '. a 4/`|.-""'`\ \ ;'`) .---.<br> \ .' / .--' |_.' / .-._)<br> `) _.' / /`-.__.' /<br> `'-.____; /'-.___.-'<br> `\"""`</tt></b></pre>\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">FRM GETIN UR PACKAGES</span></big></big></big><br></div>\n</body>\n</html>""")
- def get_file(self, path):
- """Proxy the download of a file from a mirror.
-
+ def get_cached(self, connection, path):
+ """Proxy the (possibly cached) download of a file from a mirror.
+
+ @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
+ @param connection: the conection the request came in on
@type path: C{list} of C{string}
@param path: the path of the file to download, starting with the mirror name
@rtype: (C{int}, C{string}, C{dictionary}, C{string})
- @return: the HTTP status code, status message, headers, and bencoded
- metainfo file
+ @return: the HTTP status code, status message, headers, and downloaded file
+ (or None if the file is being downloaded)
"""
- # TODO: Add caching and cache-checking before downloading
- # TODO: Make this threaded so it doesn't block while waiting for the download
try:
- ungzip = True
- if path[-1].endswith('.gz'):
- ungzip = False
- url = 'http://'
- url += '/'.join(path)
- if DEBUG:
- print 'fetching:', url
- f = urlopen(url, ungzip)
- headers = {}
- for k,v in f.response.getheaders():
- if k.lower() != 'content-length':
- headers[k] = v
- data = f.read()
-
- return (200, 'OK', headers, data)
+ # Deb files don't need to be checked for staleness
+ uptodate = True
+ if path[-1][-4:] == '.deb':
+ uptodate = False
+
+ # First check the cache for the file
+ data = self.Cache.cache_get(path, uptodate)
+
+ # If the cache doesn't have it
+ if data is None:
+ # Get Debs from the debtorrent download, others are straight download
+ if path[-1][-4:] == '.deb':
+ return self.get_package(connection, path)
+ else:
+ # Save the connection info and start downloading the file
+ self.cache_waiting.setdefault('/'.join(path), []).append(connection)
+ self.Cache.download_get(path, self.get_cached_callback)
+ return None
+
+ if path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
+ self.got_Packages(path, data)
+
+ return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data)
except IOError, e:
try:
@@ -495,6 +513,38 @@
msg = 'Unknown error occurred'
return (status, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, msg)
+ def get_cached_callback(self, path, data):
+ """Return the newly cached file to the waiting connection.
+
+ @type path: C{list} of C{string}
+ @param path: the path of the file to download, starting with the mirror name
+ @type data: C{string}
+ @param data: the downloaded newly cached file
+
+ """
+
+ # Get the list of connections waiting for this file
+ connections = self.cache_waiting.pop('/'.join(path), None)
+
+ if connections is None:
+ if DEBUG:
+ print 'no connection exists to return the cached file on'
+ return
+
+ # If it's a torrent file, start it
+ if data is not None and path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
+ self.got_Packages(path, data)
+
+ for connection in connections:
+ # Check to make sure the requester is still waiting
+ if connection.closed:
+ continue
+
+ if data is None:
+ connection.answer((404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas))
+ else:
+ connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data))
+
def get_package(self, connection, path):
"""Download a package file from a torrent.
@@ -508,7 +558,6 @@
"""
- # TODO: try and find the file in the directory structure too
# Find the file in one of the torrent downloads
d, f = self.handler.find_file(path[0], path[1:])
@@ -580,32 +629,24 @@
self.enqueue_request(connection, d, f, pieces_needed)
- def get_Packages(self, path):
- """Download a Packages file and start a torrent.
+ def got_Packages(self, path, data):
+ """Process a downloaded Packages file and start a torrent.
@type path: C{list} of C{string}
@param path: the path of the file to download, starting with the mirror name
- @rtype: (C{int}, C{string}, C{dictionary}, C{string})
- @return: the HTTP status code, status message, headers, and Packages file
+ @type data: C{string}
+ @param data: the downloaded Packages file
"""
-
- # Download the Packages file
- r = self.get_file(path)
-
- if not r[0] == 200:
- return r
try:
# Decompress the data
if path[-1].endswith('.gz'):
- compressed = StringIO(r[3])
+ compressed = StringIO(data)
f = GzipFile(fileobj = compressed)
data = f.read()
elif path[-1].endswith('.bz2'):
- data = decompress(r[3])
- else:
- data = r[3]
+ data = decompress(data)
name = "dt_" + '_'.join(path)
@@ -614,7 +655,7 @@
except:
if DEBUG:
print 'ERROR: Packages file could not be converted to a torrent'
- return r
+ return
sub_pieces = getsubpieces(name)
@@ -632,7 +673,7 @@
# TODO: add mirror data and start deb_mirror downloader to running torrent
if self.handler.has_torrent(infohash):
- return r
+ return
a = {}
a['path'] = '/'.join(path)
@@ -662,8 +703,6 @@
self.handler.add(infohash, a)
- return r
-
def get(self, connection, path, headers):
"""Respond to a GET request.
@@ -749,14 +788,8 @@
if 'Packages.diff' in path:
return (404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
-
- if path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
- return self.get_Packages(path)
-
- if path[-1][-4:] == '.deb':
- return self.get_package(connection, path)
-
- return self.get_file(path)
+
+ return self.get_cached(connection, path)
except ValueError, e:
return (400, 'Bad Request', {'Server': VERSION, 'Content-Type': 'text/plain'},
Modified: debtorrent/trunk/DebTorrent/BT1/Storage.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/Storage.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/Storage.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/Storage.py Thu Jun 14 23:44:12 2007
@@ -241,8 +241,6 @@
self.sizes[file] = length
so_far += l
- if DEBUG:
- print 'piece_files:', self.piece_files
self.total_length = total
self._reset_ranges()
Added: debtorrent/trunk/DebTorrent/HTTPCache.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/HTTPCache.py?rev=109&op=file
==============================================================================
--- debtorrent/trunk/DebTorrent/HTTPCache.py (added)
+++ debtorrent/trunk/DebTorrent/HTTPCache.py Thu Jun 14 23:44:12 2007
@@ -1,0 +1,302 @@
+# Written by Cameron Dale
+# see LICENSE.txt for license information
+#
+# $Id$
+
+"""Manage an HTTP download cache.
+
+ at type VERSION: C{string}
+ at var VERSION: the UserAgent identifier sent to all sites
+
+"""
+
+from httplib import HTTPConnection, BadStatusLine
+from threading import Thread
+from traceback import print_exc
+from DebTorrent.__init__ import product_name,version_short
+from os.path import join, split, getmtime, getsize, exists
+from os import utime, makedirs, listdir
+from time import strptime, gmtime
+from calendar import timegm
+try:
+ True
+except:
+ True = 1
+ False = 0
+
+DEBUG = True
+
+VERSION = product_name+'/'+version_short
+
+class CacheRequest:
+ """Download a file needed for the HTTP download cache.
+
+ @type handler: L{HTTPCache}
+ @ivar handler: the cache manager for the download
+ @type path: C{list} of C{string}
+ @ivar path: the server and path to download
+ @type server: C{string}
+ @ivar server: the webserver address and port to connect to
+ @type url: C{string}
+ @ivar url: the URL to request from the site
+ @type func: C{method}
+ @ivar func: the method to call when the download completes
+ @type connection: C{HTTPConnection}
+ @ivar connection: the connection to the HTTP server
+ @type headers: C{dictionary}
+ @ivar headres: the HTTP headers to send in the request
+ @type error: C{string}
+ @ivar error: the error received from the server
+ @type errorcount: C{int}
+ @ivar errorcount: the number of download errors that have occurred since
+ the last successful download from the site
+ @type active: C{boolean}
+ @ivar active: whether there is a download underway
+ @type cancelled: C{boolean}
+ @ivar cancelled: whether the download has been cancelled
+ @type received_data: C{string}
+ @ivar received_data: the data returned from the most recent request
+ @type connection_status: C{int}
+ @ivar connection_status: the status code returned by the server for the
+ most recent request
+ @type last_modified: C{string}
+ @ivar last_modified: the Last-Modified HTTP header from the request
+
+ """
+
+ def __init__(self, handler, path, func):
+ """Initialize the instance.
+
+ @type handler: L{HTTPCache}
+ @param handler: the cache manager for the download
+ @type path: C{list} of C{string}
+ @param path: the server and path to download
+ @type func: C{method}
+ @param func: the method to call when the download completes
+
+ """
+
+ self.handler = handler
+ self.path = path
+ self.server = path[0]
+ self.url = '/' + '/'.join(path[1:])
+ self.func = func
+ try:
+ self.connection = HTTPConnection(self.server)
+ except:
+ if DEBUG:
+ print 'cannot connect to http seed:', self.server
+ return
+
+ self.headers = {'User-Agent': VERSION}
+ self.error = None
+ self.errorcount = 0
+ self.active = False
+ self.cancelled = False
+ if DEBUG:
+ print 'CacheRequest: downloading ', self.url
+ rq = Thread(target = self._request)
+ rq.setDaemon(False)
+ rq.start()
+ self.active = True
+
+ def _request(self):
+ """Do the request."""
+ import encodings.ascii
+ import encodings.punycode
+ import encodings.idna
+
+ self.error = None
+ self.received_data = None
+ try:
+ if DEBUG:
+ print 'CacheRequest: sending request'
+ print 'GET', self.url, self.headers
+ self.connection.request('GET',self.url, None, self.headers)
+
+ # Check for closed persistent connection due to server timeout
+ try:
+ r = self.connection.getresponse()
+ except BadStatusLine:
+ # Reopen the connection to get a new socket
+ self.connection.close()
+ self.connection.connect()
+ self.connection.request('GET',self.url, None, self.headers)
+ r = self.connection.getresponse()
+
+ if DEBUG:
+ print 'CacheRequest: got response'
+ print r.status, r.reason, r.getheaders()
+ self.connection_status = r.status
+ self.last_modified = r.getheader('last-modified')
+ self.received_data = r.read()
+ except Exception, e:
+ if DEBUG:
+ print 'error accessing http server: '+str(e)
+ print_exc()
+ self.error = 'error accessing http server: '+str(e)
+ try:
+ self.connection.close()
+ except:
+ pass
+ try:
+ self.connection = HTTPConnection(self.server)
+ except:
+ self.connection = None # will cause an exception and retry next cycle
+ self.handler.rawserver.add_task(self.request_finished)
+
+ def request_finished(self):
+ """Process the completed request."""
+ self.active = False
+ if self.error is not None:
+ self.errorcount += 1
+ if self.received_data:
+ self.errorcount = 0
+ if not self._got_data():
+ self.received_data = None
+ self.handler.download_complete(self, self.path, self.func,
+ self.received_data, self.last_modified)
+
+ def _got_data(self):
+ """Process the returned data from the request.
+
+ @rtype: C{boolean}
+ @return: whether the data was good
+
+ """
+
+ if self.connection_status not in [200, 206]:
+ self.errorcount += 1
+ return False
+ if self.cancelled:
+ return False
+ return True
+
+
+class HTTPCache:
+ """Manage an HTTP download cache.
+
+ @type rawserver: L{Debtorrent.RawServer.RawServer}
+ @ivar rawserver: the server
+ @type downloads: C{list} of L{CacheRequest}
+ @ivar downloads: the list of all current downloads for the cache
+
+ """
+
+ def __init__(self, rawserver):
+ """Initialize the instance.
+
+ @type rawserver: L{Debtorrent.RawServer.RawServer}
+ @param rawserver: the server
+
+ """
+
+ self.rawserver = rawserver
+ self.downloads = []
+
+ def download_get(self, path, func):
+ """Create a new download from a site.
+
+ @type path: C{list} of C{string}
+ @param path: the server and path to download
+ @type func: C{method}
+ @param func: the method to call with the data when the download is complete
+
+ """
+
+ if DEBUG:
+ print 'Starting a HttpCache downloader for:', 'http://'+'/'.join(path)
+ self.downloads.append(CacheRequest(self, path, func))
+
+ def download_complete(self, d, path, func, data, last_modified):
+ """Remove a completed download from the list and process the data.
+
+ Once a download has been completed, remove the downloader from the
+ list and save the downloaded file in the file system. Then return the
+ data to the callback function.
+
+ @type d: L{CacheRequest}
+ @param d: the cache request that is completed
+ @type path: C{list} of C{string}
+ @param path: the server and path that was downloaded
+ @type func: C{method}
+ @param func: the method to call with the data
+ @type data: C{string}
+ @param data: the downloaded data
+ @type last_modified: C{string}
+ @param last_modified: the Last-Modified HTTP header from the request
+
+ """
+
+ if DEBUG:
+ print 'HttpCache download completed for:', 'http://'+'/'.join(path)
+ self.downloads.remove(d)
+
+ if data is not None:
+ # Build the file name from the path list
+ file = path[0]
+ for i in path[1:]:
+ file = join(file, i)
+
+ # Create the directory for the new file
+ new_dir = split(file)[0]
+ if new_dir != '' and not exists(new_dir):
+ makedirs(new_dir)
+
+ # Write the new file
+ f = open(file, 'wb')
+ f.write(data)
+ f.close()
+
+ # Set the modified time (on error use current time which should work)
+ try:
+ mtime = timegm(strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z'))
+ times = (mtime, mtime)
+ utime(file, times)
+ except:
+ pass
+
+ # Call the callback function
+ func(path, data)
+
+ def cache_get(self, path, uptodate = True):
+ """Get the file from the cache.
+
+ @type path: C{list} of C{string}
+ @param path: the server and path to download
+ @type uptodate: C{boolean}
+ @param uptodate: whether to check the age of the file to see if it
+ is still current (optional, defaults to True)
+ @rtype: C{string}
+ @return: the cached data, or None if the cached data was not found
+ or is stale
+
+ """
+
+ # Build the file name
+ file = path[0]
+ for i in path[1:]:
+ file = join(file, i)
+
+ # Return None if the file isn't in the cache
+ if not exists(file):
+ return None
+
+ if uptodate:
+ # Get the last modified time from the server
+ connection = HTTPConnection(path[0])
+ connection.request('HEAD', '/' + '/'.join(path[1:]), None, {'User-Agent': VERSION})
+ r = connection.getresponse()
+ last_modified = r.getheader('last-modified')
+
+ # Check the server's time against the cached copy
+ file_mtime = getmtime(file)
+ server_mtime = timegm(strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z'))
+ if server_mtime - file_mtime > 65:
+ return None
+
+ # Read in the file and return the data
+ f = open(file, 'rb')
+ data = f.read()
+ f.close()
+ return data
Propchange: debtorrent/trunk/DebTorrent/HTTPCache.py
------------------------------------------------------------------------------
svn:keywords = ID
Modified: debtorrent/trunk/DebTorrent/download_bt1.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/download_bt1.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/download_bt1.py (original)
+++ debtorrent/trunk/DebTorrent/download_bt1.py Thu Jun 14 23:44:12 2007
@@ -48,7 +48,6 @@
from BTcrypto import CRYPTO_OK
from __init__ import createPeerID
from BT1.makemetafile import getpieces, getsubpieces, uniconvert
-from os.path import split
from gzip import GzipFile
from StringIO import StringIO
import binascii
@@ -462,7 +461,7 @@
try:
if file:
- name = "dt_" + split(file)[1]
+ name = "dt_" + path.split(file)[1]
h = open(file, 'r')
try:
for line in h: # quick test to see if packages file is correct
More information about the Debtorrent-commits
mailing list