r109 - in /debtorrent/trunk/DebTorrent: BT1/AptListener.py BT1/Storage.py HTTPCache.py download_bt1.py

camrdale-guest at users.alioth.debian.org camrdale-guest at users.alioth.debian.org
Thu Jun 14 23:44:12 UTC 2007


Author: camrdale-guest
Date: Thu Jun 14 23:44:12 2007
New Revision: 109

URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=109
Log:
Add caching to the proxy downloading done by AptListener.

Added:
    debtorrent/trunk/DebTorrent/HTTPCache.py   (with props)
Modified:
    debtorrent/trunk/DebTorrent/BT1/AptListener.py
    debtorrent/trunk/DebTorrent/BT1/Storage.py
    debtorrent/trunk/DebTorrent/download_bt1.py

Modified: debtorrent/trunk/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/AptListener.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/AptListener.py Thu Jun 14 23:44:12 2007
@@ -38,6 +38,7 @@
 from binascii import b2a_hex, a2b_hex, a2b_base64
 from string import lower
 from makemetafile import getpieces, getsubpieces, uniconvert
+from DebTorrent.HTTPCache import HTTPCache
 import sys, os
 import signal
 import re
@@ -188,6 +189,13 @@
     @ivar uq_broken: unknown
     @type Filter: unknown
     @ivar Filter: unknown
+    @type Cache: L{DebTorrent.HTTPCache.HTTPCache}
+    @ivar Cache: the cache of downloaded files
+    @type cache_waiting: C{dictionary}
+    @ivar cache_waiting: the pending HTTP get requests that are waiting for download 
+        from the cache. Keys are strings that are the path being requested, values
+        are lists of L{DebTorrent.HTTPHandler.HTTPConnection} objects which are the
+        requests that are pending for that path.
     @type request_queue: C{dictionary}
     @ivar request_queue: the pending HTTP get requests that are waiting for download.
         Keys are L{DebTorrent.HTTPHandler.HTTPConnection} objects, values are
@@ -301,6 +309,8 @@
                 
         self.uq_broken = unquote('+') != ' '
         self.Filter = Filter(rawserver.add_task)
+        self.Cache = HTTPCache(rawserver)
+        self.cache_waiting = {}
         
         self.request_queue = {}
         rawserver.add_task(self.process_queue, 1)
@@ -457,35 +467,43 @@
         return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/html; charset=iso-8859-1'}, """<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">\n<html><head><title>Meow</title>\n</head>\n<body style="color: rgb(255, 255, 255); background-color: rgb(0, 0, 0);">\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">I&nbsp;IZ&nbsp;TAKIN&nbsp;BRAKE</span></big></big></big><br></div>\n<pre><b><tt>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .-o=o-.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ,&nbsp; /=o=o=o=\ .--.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; _|\|=o=O=o=O=|&nbsp;&nbsp;&nbsp; \<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; __.'&nbsp; a`\=o=o=o=(`\&nbsp;&nbsp; /<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; '.&nbsp;&nbsp; a 4/`|.-""'`\ \ ;'`)&nbsp;&nbsp; .---.<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; \&nbsp;&nbsp; .'&nbsp; /&nbsp;&nbsp; .--'&nbsp; |_.'&nbsp;&nbsp; / .-._)<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `)&nbsp; _.'&nbsp;&nbsp; /&nbsp;&nbsp;&nbsp;&nbsp; /`-.__.' /<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `'-.____;&nbsp;&nbsp;&nbsp;&nbsp; /'-.___.-'<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `\"""`</tt></b></pre>\n<div><big style="font-weight: bold;"><big><big><span style="font-family: arial,helvetica,sans-serif;">FRM&nbsp;GETIN&nbsp;UR&nbsp;PACKAGES</span></big></big></big><br></div>\n</body>\n</html>""")
 
 
-    def get_file(self, path):
-        """Proxy the download of a file from a mirror.
-        
+    def get_cached(self, connection, path):
+        """Proxy the (possibly cached) download of a file from a mirror.
+        
+        @type connection: L{DebTorrent.HTTPHandler.HTTPConnection}
+        @param connection: the conection the request came in on
         @type path: C{list} of C{string}
         @param path: the path of the file to download, starting with the mirror name
         @rtype: (C{int}, C{string}, C{dictionary}, C{string})
-        @return: the HTTP status code, status message, headers, and bencoded 
-            metainfo file
+        @return: the HTTP status code, status message, headers, and downloaded file
+            (or None if the file is being downloaded)
         
         """
         
-        # TODO: Add caching and cache-checking before downloading
-        # TODO: Make this threaded so it doesn't block while waiting for the download
         try:
-            ungzip = True
-            if path[-1].endswith('.gz'):
-                ungzip = False
-            url = 'http://'
-            url += '/'.join(path)
-            if DEBUG:
-                print 'fetching:', url
-            f = urlopen(url, ungzip)
-            headers = {}
-            for k,v in f.response.getheaders():
-                if k.lower() != 'content-length':
-                    headers[k] = v
-            data = f.read()
-            
-            return (200, 'OK', headers, data)
+            # Deb files don't need to be checked for staleness
+            uptodate = True
+            if path[-1][-4:] == '.deb':
+                uptodate = False
+
+            # First check the cache for the file
+            data = self.Cache.cache_get(path, uptodate)
+
+            # If the cache doesn't have it
+            if data is None:
+                # Get Debs from the debtorrent download, others are straight download
+                if path[-1][-4:] == '.deb':
+                    return self.get_package(connection, path)
+                else:
+                    # Save the connection info and start downloading the file
+                    self.cache_waiting.setdefault('/'.join(path), []).append(connection)
+                    self.Cache.download_get(path, self.get_cached_callback)
+                    return None
+            
+            if path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
+                self.got_Packages(path, data)
+
+            return (200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data)
         
         except IOError, e:
             try:
@@ -495,6 +513,38 @@
                 msg = 'Unknown error occurred'
             return (status, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, msg)
             
+    def get_cached_callback(self, path, data):
+        """Return the newly cached file to the waiting connection.
+        
+        @type path: C{list} of C{string}
+        @param path: the path of the file to download, starting with the mirror name
+        @type data: C{string}
+        @param data: the downloaded newly cached file
+        
+        """
+
+        # Get the list of connections waiting for this file
+        connections = self.cache_waiting.pop('/'.join(path), None)
+        
+        if connections is None:
+            if DEBUG:
+                print 'no connection exists to return the cached file on'
+            return
+
+        # If it's a torrent file, start it
+        if data is not None and path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
+            self.got_Packages(path, data)
+
+        for connection in connections:
+            # Check to make sure the requester is still waiting
+            if connection.closed:
+                continue
+            
+            if data is None:
+                connection.answer((404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas))
+            else:
+                connection.answer((200, 'OK', {'Server': VERSION, 'Content-Type': 'text/plain'}, data))
+            
     def get_package(self, connection, path):
         """Download a package file from a torrent.
         
@@ -508,7 +558,6 @@
         
         """
 
-        # TODO: try and find the file in the directory structure too
         # Find the file in one of the torrent downloads
         d, f = self.handler.find_file(path[0], path[1:])
         
@@ -580,32 +629,24 @@
         self.enqueue_request(connection, d, f, pieces_needed)
         
     
-    def get_Packages(self, path):
-        """Download a Packages file and start a torrent.
+    def got_Packages(self, path, data):
+        """Process a downloaded Packages file and start a torrent.
         
         @type path: C{list} of C{string}
         @param path: the path of the file to download, starting with the mirror name
-        @rtype: (C{int}, C{string}, C{dictionary}, C{string})
-        @return: the HTTP status code, status message, headers, and Packages file
+        @type data: C{string}
+        @param data: the downloaded Packages file
         
         """
-
-        # Download the Packages file
-        r = self.get_file(path)
-        
-        if not r[0] == 200:
-            return r
 
         try:
             # Decompress the data
             if path[-1].endswith('.gz'):
-                compressed = StringIO(r[3])
+                compressed = StringIO(data)
                 f = GzipFile(fileobj = compressed)
                 data = f.read()
             elif path[-1].endswith('.bz2'):
-                data = decompress(r[3])
-            else:
-                data = r[3]
+                data = decompress(data)
             
             name = "dt_" + '_'.join(path)
 
@@ -614,7 +655,7 @@
         except:
             if DEBUG:
                 print 'ERROR: Packages file could not be converted to a torrent'
-            return r
+            return 
 
         sub_pieces = getsubpieces(name)
 
@@ -632,7 +673,7 @@
         
         # TODO: add mirror data and start deb_mirror downloader to running torrent
         if self.handler.has_torrent(infohash):
-            return r
+            return
         
         a = {}
         a['path'] = '/'.join(path)
@@ -662,8 +703,6 @@
         
         self.handler.add(infohash, a)
         
-        return r
-        
     
     def get(self, connection, path, headers):
         """Respond to a GET request.
@@ -749,14 +788,8 @@
 
             if 'Packages.diff' in path:
                 return (404, 'Not Found', {'Server': VERSION, 'Content-Type': 'text/plain', 'Pragma': 'no-cache'}, alas)
-
-            if path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
-                return self.get_Packages(path)
-
-            if path[-1][-4:] == '.deb':
-                return self.get_package(connection, path)
-            
-            return self.get_file(path)
+            
+            return self.get_cached(connection, path)
             
         except ValueError, e:
             return (400, 'Bad Request', {'Server': VERSION, 'Content-Type': 'text/plain'}, 

Modified: debtorrent/trunk/DebTorrent/BT1/Storage.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/Storage.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/BT1/Storage.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/Storage.py Thu Jun 14 23:44:12 2007
@@ -241,8 +241,6 @@
                 self.sizes[file] = length
                 so_far += l
 
-        if DEBUG:
-            print 'piece_files:', self.piece_files
         self.total_length = total
         self._reset_ranges()
 

Added: debtorrent/trunk/DebTorrent/HTTPCache.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/HTTPCache.py?rev=109&op=file
==============================================================================
--- debtorrent/trunk/DebTorrent/HTTPCache.py (added)
+++ debtorrent/trunk/DebTorrent/HTTPCache.py Thu Jun 14 23:44:12 2007
@@ -1,0 +1,302 @@
+# Written by Cameron Dale
+# see LICENSE.txt for license information
+#
+# $Id$
+
+"""Manage an HTTP download cache.
+
+ at type VERSION: C{string}
+ at var VERSION: the UserAgent identifier sent to all sites
+
+"""
+
+from httplib import HTTPConnection, BadStatusLine
+from threading import Thread
+from traceback import print_exc
+from DebTorrent.__init__ import product_name,version_short
+from os.path import join, split, getmtime, getsize, exists
+from os import utime, makedirs, listdir
+from time import strptime, gmtime
+from calendar import timegm
+try:
+    True
+except:
+    True = 1
+    False = 0
+
+DEBUG = True
+
+VERSION = product_name+'/'+version_short
+
+class CacheRequest:
+    """Download a file needed for the HTTP download cache.
+    
+    @type handler: L{HTTPCache}
+    @ivar handler: the cache manager for the download
+    @type path: C{list} of C{string}
+    @ivar path: the server and path to download
+    @type server: C{string}
+    @ivar server: the webserver address and port to connect to 
+    @type url: C{string}
+    @ivar url: the URL to request from the site
+    @type func: C{method}
+    @ivar func: the method to call when the download completes
+    @type connection: C{HTTPConnection}
+    @ivar connection: the connection to the HTTP server
+    @type headers: C{dictionary}
+    @ivar headres: the HTTP headers to send in the request
+    @type error: C{string}
+    @ivar error: the error received from the server
+    @type errorcount: C{int}
+    @ivar errorcount: the number of download errors that have occurred since
+        the last successful download from the site
+    @type active: C{boolean}
+    @ivar active: whether there is a download underway
+    @type cancelled: C{boolean}
+    @ivar cancelled: whether the download has been cancelled
+    @type received_data: C{string}
+    @ivar received_data: the data returned from the most recent request
+    @type connection_status: C{int}
+    @ivar connection_status: the status code returned by the server for the 
+        most recent request
+    @type last_modified: C{string}
+    @ivar last_modified: the Last-Modified HTTP header from the request
+    
+    """
+    
+    def __init__(self, handler, path, func):
+        """Initialize the instance.
+        
+        @type handler: L{HTTPCache}
+        @param handler: the cache manager for the download
+        @type path: C{list} of C{string}
+        @param path: the server and path to download
+        @type func: C{method}
+        @param func: the method to call when the download completes
+        
+        """
+        
+        self.handler = handler
+        self.path = path
+        self.server = path[0]
+        self.url = '/' + '/'.join(path[1:])
+        self.func = func
+        try:
+            self.connection = HTTPConnection(self.server)
+        except:
+            if DEBUG:
+                print 'cannot connect to http seed:', self.server
+            return
+        
+        self.headers = {'User-Agent': VERSION}
+        self.error = None
+        self.errorcount = 0
+        self.active = False
+        self.cancelled = False
+        if DEBUG:
+            print 'CacheRequest: downloading ', self.url
+        rq = Thread(target = self._request)
+        rq.setDaemon(False)
+        rq.start()
+        self.active = True
+
+    def _request(self):
+        """Do the request."""
+        import encodings.ascii
+        import encodings.punycode
+        import encodings.idna
+        
+        self.error = None
+        self.received_data = None
+        try:
+            if DEBUG:
+                print 'CacheRequest: sending request'
+                print 'GET', self.url, self.headers
+            self.connection.request('GET',self.url, None, self.headers)
+            
+            # Check for closed persistent connection due to server timeout
+            try:
+                r = self.connection.getresponse()
+            except BadStatusLine:
+                # Reopen the connection to get a new socket
+                self.connection.close()
+                self.connection.connect()
+                self.connection.request('GET',self.url, None, self.headers)
+                r = self.connection.getresponse()
+                
+            if DEBUG:
+                print 'CacheRequest: got response'
+                print r.status, r.reason, r.getheaders()
+            self.connection_status = r.status
+            self.last_modified = r.getheader('last-modified')
+            self.received_data = r.read()
+        except Exception, e:
+            if DEBUG:
+                print 'error accessing http server: '+str(e)
+                print_exc()
+            self.error = 'error accessing http server: '+str(e)
+            try:
+                self.connection.close()
+            except:
+                pass
+            try:
+                self.connection = HTTPConnection(self.server)
+            except:
+                self.connection = None  # will cause an exception and retry next cycle
+        self.handler.rawserver.add_task(self.request_finished)
+
+    def request_finished(self):
+        """Process the completed request."""
+        self.active = False
+        if self.error is not None:
+            self.errorcount += 1
+        if self.received_data:
+            self.errorcount = 0
+            if not self._got_data():
+                self.received_data = None
+        self.handler.download_complete(self, self.path, self.func, 
+                                       self.received_data, self.last_modified)
+
+    def _got_data(self):
+        """Process the returned data from the request.
+        
+        @rtype: C{boolean}
+        @return: whether the data was good
+        
+        """
+        
+        if self.connection_status not in [200, 206]:
+            self.errorcount += 1
+            return False
+        if self.cancelled:
+            return False
+        return True
+    
+
+class HTTPCache:
+    """Manage an HTTP download cache.
+    
+    @type rawserver: L{Debtorrent.RawServer.RawServer}
+    @ivar rawserver: the server
+    @type downloads: C{list} of L{CacheRequest}
+    @ivar downloads: the list of all current downloads for the cache
+    
+    """
+    
+    def __init__(self, rawserver):
+        """Initialize the instance.
+        
+        @type rawserver: L{Debtorrent.RawServer.RawServer}
+        @param rawserver: the server
+        
+        """
+        
+        self.rawserver = rawserver
+        self.downloads = []
+
+    def download_get(self, path, func):
+        """Create a new download from a site.
+        
+        @type path: C{list} of C{string}
+        @param path: the server and path to download
+        @type func: C{method}
+        @param func: the method to call with the data when the download is complete
+        
+        """
+        
+        if DEBUG:
+            print 'Starting a HttpCache downloader for:', 'http://'+'/'.join(path)
+        self.downloads.append(CacheRequest(self, path, func))
+
+    def download_complete(self, d, path, func, data, last_modified):
+        """Remove a completed download from the list and process the data.
+        
+        Once a download has been completed, remove the downloader from the 
+        list and save the downloaded file in the file system. Then return the
+        data to the callback function.
+        
+        @type d: L{CacheRequest}
+        @param d: the cache request that is completed
+        @type path: C{list} of C{string}
+        @param path: the server and path that was downloaded
+        @type func: C{method}
+        @param func: the method to call with the data
+        @type data: C{string}
+        @param data: the downloaded data
+        @type last_modified: C{string}
+        @param last_modified: the Last-Modified HTTP header from the request
+        
+        """
+        
+        if DEBUG:
+            print 'HttpCache download completed for:', 'http://'+'/'.join(path)
+        self.downloads.remove(d)
+
+        if data is not None:
+            # Build the file name from the path list
+            file = path[0]
+            for i in path[1:]:
+                file = join(file, i)
+            
+            # Create the directory for the new file
+            new_dir = split(file)[0]
+            if new_dir != '' and not exists(new_dir):
+                makedirs(new_dir)
+            
+            # Write the new file
+            f = open(file, 'wb')
+            f.write(data)
+            f.close()
+            
+            # Set the modified time (on error use current time which should work)
+            try:
+                mtime = timegm(strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z'))
+                times = (mtime, mtime)
+                utime(file, times)
+            except:
+                pass
+
+        # Call the callback function
+        func(path, data)
+
+    def cache_get(self, path, uptodate = True):
+        """Get the file from the cache.
+        
+        @type path: C{list} of C{string}
+        @param path: the server and path to download
+        @type uptodate: C{boolean}
+        @param uptodate: whether to check the age of the file to see if it 
+            is still current (optional, defaults to True)
+        @rtype: C{string}
+        @return: the cached data, or None if the cached data was not found 
+            or is stale
+        
+        """
+        
+        # Build the file name
+        file = path[0]
+        for i in path[1:]:
+            file = join(file, i)
+            
+        # Return None if the file isn't in the cache
+        if not exists(file):
+            return None
+        
+        if uptodate:
+            # Get the last modified time from the server
+            connection = HTTPConnection(path[0])
+            connection.request('HEAD', '/' + '/'.join(path[1:]), None, {'User-Agent': VERSION})
+            r = connection.getresponse()
+            last_modified = r.getheader('last-modified')
+            
+            # Check the server's time against the cached copy
+            file_mtime = getmtime(file)
+            server_mtime = timegm(strptime(last_modified, '%a, %d %b %Y %H:%M:%S %Z'))
+            if server_mtime - file_mtime > 65:
+                return None
+
+        # Read in the file and return the data
+        f = open(file, 'rb')
+        data = f.read()
+        f.close()
+        return data

Propchange: debtorrent/trunk/DebTorrent/HTTPCache.py
------------------------------------------------------------------------------
    svn:keywords = ID

Modified: debtorrent/trunk/DebTorrent/download_bt1.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/download_bt1.py?rev=109&op=diff
==============================================================================
--- debtorrent/trunk/DebTorrent/download_bt1.py (original)
+++ debtorrent/trunk/DebTorrent/download_bt1.py Thu Jun 14 23:44:12 2007
@@ -48,7 +48,6 @@
 from BTcrypto import CRYPTO_OK
 from __init__ import createPeerID
 from BT1.makemetafile import getpieces, getsubpieces, uniconvert
-from os.path import split
 from gzip import GzipFile
 from StringIO import StringIO
 import binascii
@@ -462,7 +461,7 @@
 
     try:
         if file:
-            name = "dt_" + split(file)[1]
+            name = "dt_" + path.split(file)[1]
             h = open(file, 'r')
             try:
                 for line in h:   # quick test to see if packages file is correct




More information about the Debtorrent-commits mailing list