[apt-proxy-devel] r606 - in people/halls/rework: apt_proxy apt_proxy/test debian

Chris Halls halls at costa.debian.org
Tue May 9 22:40:27 UTC 2006


Author: halls
Date: Tue May  9 22:39:28 2006
New Revision: 606

Modified:
   people/halls/rework/apt_proxy/apt_proxy.py
   people/halls/rework/apt_proxy/apt_proxy_conf.py
   people/halls/rework/apt_proxy/cache.py
   people/halls/rework/apt_proxy/fetchers.py
   people/halls/rework/apt_proxy/misc.py
   people/halls/rework/apt_proxy/test/test_apt_proxy.py
   people/halls/rework/apt_proxy/test/test_config.py
   people/halls/rework/apt_proxy/test/test_requests.py
   people/halls/rework/debian/changelog
   people/halls/rework/doc/apt-proxy.conf
   people/halls/rework/doc/apt-proxy.conf.5

Log:
* Change read_limit to bandwidth_limit, and add rsync bandwidth limiting
* Document bandwidth limiting
* Fixes to fetchers
* Rsync backend tests and fixes


Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py	(original)
+++ people/halls/rework/apt_proxy/apt_proxy.py	Tue May  9 22:39:28 2006
@@ -17,7 +17,7 @@
 import os, stat, signal, fcntl, exceptions
 from os.path import dirname, basename
 import tempfile, glob, re, urlparse, time
-from twisted.internet import reactor, abstract
+from twisted.internet import reactor
 from twisted.python.failure import Failure
 from twisted.internet import error, protocol
 from twisted.web import http
@@ -159,7 +159,7 @@
         log.debug("Created new BackendServer: " + uri)
 
         # hack because urlparse doesn't support rsync
-        if uri[0:5] == 'rsync':
+        if uri[0:6] == 'rsync:':
             uri = 'http'+uri[5:]
             is_rsync=1
         else:
@@ -167,6 +167,8 @@
 
         self.scheme, netloc, self.path, parameters, \
                      query, fragment = urlparse.urlparse(uri)
+        if is_rsync:
+            self.scheme = 'rsync'
 
         if '@' in netloc:
             auth = netloc[:netloc.rindex('@')]
@@ -174,13 +176,12 @@
             self.username, self.password = auth.split(':')
         else:
             self.username = None
+            self.password = None
         if ':' in netloc:
             self.host, self.port = netloc.split(':')
         else:
             self.host = netloc
             self.port = self.ports[self.scheme]
-        if is_rsync:
-            self.scheme = 'rsync'
         self.fetcher = self.fetchers[self.scheme]
         try:
             self.port = int(self.port)
@@ -232,7 +233,7 @@
                     % (self.method, self.uri, backendName, self.uri),'Request')
 
         if self.factory.config.disable_pipelining:
-            self.setHeader('Connection','close')
+            #self.setHeader('Connection','close')
             self.channel.persistent = 0
 
         if self.method != 'GET':
@@ -278,8 +279,10 @@
         if self.if_modified_since is None or self.if_modified_since < mtime:
             log.debug("start_streaming size=%s mtime=%s if_modified_since=%s" % (size, mtime, self.if_modified_since) , 'Request')
             self.setResponseCode(http.OK, 'Streaming file')
-            self.setHeader('last-modified', http.datetimeToString(mtime))
-            self.setHeader('content-length', size)
+            if mtime is not None:
+                self.setHeader('last-modified', http.datetimeToString(mtime))
+            if size is not None:
+                self.setHeader('content-length', size)
             return True
         else:
             log.debug("file not modified: mtime=%s if_modified_since=%s" % (mtime, self.if_modified_since) , 'Request')
@@ -296,11 +299,11 @@
 
     def finish(self):
         "Finish request after streaming"
-        log.debug("finish" , 'Request')
+        log.debug("finish. Queued: %s" % (self.queued) , 'Request')
         http.Request.finish(self)
-        if self.factory.config.disable_pipelining:
-            if hasattr(self.transport, 'loseConnection'):
-                self.transport.loseConnection()
+        #if self.factory.config.disable_pipelining:
+            #if hasattr(self.transport, 'loseConnection'):
+                #self.transport.loseConnection()
 
         if self.cacheEntry:
             self.cacheEntry.remove_request(self)
@@ -398,6 +401,7 @@
 
     def startPeriodic(self):
         if (self.config.cleanup_freq != None and self.periodicCallback is None):
+            log.debug("Will do periodic cleaup in %s sec" % (self.config.cleanup_freq))
             self.periodicCallback = reactor.callLater(self.config.cleanup_freq, self.periodic)
 
     def stopPeriodic(self):
@@ -417,6 +421,7 @@
     def startFactory(self):
         #start periodic updates
         self.configurationChanged()
+        self.dumpdbs()
         self.recycler = MirrorRecycler(self, 1)
         self.recycler.start()
 

Modified: people/halls/rework/apt_proxy/apt_proxy_conf.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy_conf.py	(original)
+++ people/halls/rework/apt_proxy/apt_proxy_conf.py	Tue May  9 22:39:28 2006
@@ -94,12 +94,12 @@
         ['max_versions', 3, '*int'],
         ['max_age', 10, '*time'],
         ['import_dir', '/var/cache/apt-proxy/import', 'string'],
-        ['disable_pipelining', '1', 'boolean'],
+        ['disable_pipelining', '0', 'boolean'],
         ['passive_ftp', 'on', 'boolean'],
         ['dynamic_backends', 'on', 'boolean'],
         ['http_proxy', None , 'proxyspec'],
         ['username', 'aptproxy', 'string'],
-        ['read_limit', None, '*int']
+        ['bandwidth_limit', None, '*int']
         ]
 
     """
@@ -114,7 +114,7 @@
         ['passive_ftp', None, 'boolean'],
         ['backends', '', 'stringlist'],
         ['http_proxy', None , 'proxyspec'],
-        ['read_limit', None, '*int']
+        ['bandwidth_limit', None, '*int']
         ]
 
     DEFAULT_CONFIG_FILE = ['/etc/apt-proxy/apt-proxy-v2.conf',

Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py	(original)
+++ people/halls/rework/apt_proxy/cache.py	Tue May  9 22:39:28 2006
@@ -350,15 +350,16 @@
         Download is not possible
         """
         log.msg("download_failure %s: (%s) %s"% (self.file_path, http_code, reason), "CacheEntry")
+
         for request in self.requests:
             request.finishCode(http_code, reason)
 
-        # Remove directory if file was not created
-        if not os.path.exists(self.file_path):
-            try:
-                os.removedirs(self.filedir)
-            except:
-                pass
+        ## Remove directory if file was not created
+        #if not os.path.exists(self.file_path):
+            #try:
+                #os.removedirs(self.factory.config.cache_dir + os.sep + self.backend.base)
+            #except:
+                #pass
 
 
     def file_sent(self):
@@ -376,6 +377,7 @@
         self.state = self.STATE_NEW
 
     def init_tempfile(self):
+        #log.msg("init_tempfile:" + self.file_path, "CacheEntry")
         self.create_directory()
         self.streamFilename = self.file_path + ".apDownload"
         self.streamfile = StreamFile(self.streamFilename)

Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py	(original)
+++ people/halls/rework/apt_proxy/fetchers.py	Tue May  9 22:39:28 2006
@@ -21,9 +21,9 @@
 network backends
 """
 
-import re, os, string, time, glob, signal
+import re, os, string, time, glob, signal, stat
 from twisted.web import static, http
-from twisted.internet import protocol, reactor, defer, error
+from twisted.internet import protocol, reactor, defer, error, abstract
 from twisted.python import failure
 from twisted.protocols import policies, ftp
 
@@ -107,7 +107,7 @@
         """
         Download was successful
         """
-        log.debug("download complete", "Fetcher")
+        log.debug("download complete. Sent:%s bytes" % (self.len_received), "Fetcher")
         self.cacheEntry.download_data_end()
         self.deferred.callback((True, ""))
         #self.fetcher = None
@@ -133,12 +133,15 @@
             self.fetcher.disconnect()
         self.download_failed(None, "Download canceled")
 
-    def data_received(self, data):
+    def data_received(self, data, save=True):
         """
         File Data has been received from the backend server
+        @param data: raw data received from server
+        @param save: if true, save to disk (rsync saves file itself)
         """
         if self.len_received == 0:
-            self.cacheEntry.init_tempfile()
+            if save:
+                self.cacheEntry.init_tempfile()
             self.cacheEntry.download_started(self, self.size, self.mtime)
         self.len_received = self.len_received + len(data)
 
@@ -422,7 +425,7 @@
         else:
             host = self.proxy.host
             port = self.proxy.port
-        self.read_limit = self.backendServer.backend.config.read_limit
+        self.read_limit = self.backendServer.backend.config.bandwidth_limit
         if self.read_limit is None:
             factory = self
         else:
@@ -784,19 +787,20 @@
         self.cache_mtime = mtime
         self.request_uri = uri
         self.cache_path = fetcher.cacheEntry.cache_path
+        self.file_path = fetcher.cacheEntry.file_path   # Absolute path of file
         self.cache_dir = fetcher.cacheEntry.filedir
         self.remote_file = (self.backendServer.path + '/' 
                             + uri)
 
         # Change /path/to/FILE -> /path/to/.FILE.* to match rsync tempfile
-        self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.cache_path)
+        self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.file_path)
 
         for file in glob.glob(self.globpattern):
           log.msg('Deleting stale tempfile:' + file, 'rsyncFetcher')
           unlink(file)
 
         # rsync needs the destination directory in place, so create it if necessary
-        if(not os.path.exists(self.cache_dir)):
+        if not os.path.exists(self.cache_dir):
             os.makedirs(self.cache_dir)
 
         if self.backendServer.port:
@@ -807,15 +811,20 @@
         uri = 'rsync://'+ self.backendServer.host + portspec \
               +self.backendServer.path+'/' + self.request_uri
 
-        if(log.isEnabled('rsync',9)):
-            args = (self.rsyncCommand, '--partial', '--progress', '--verbose', '--times',
-                    '--timeout', "%d"%(self.backendServer.backend.config.timeout),
-                    uri, '.',)
-        else:
-            args = (self.rsyncCommand, '--quiet', '--times', uri, '.',
-                    '--timeout',  "%d"%(self.backendServer.backend.config.timeout),
-                    )
-        log.debug('rsync command: %s' %(string.join(args,' ')), 'rsyncFetcher')
+        args = [self.rsyncCommand, '--partial', '--progress', '--times',
+                '--timeout=%s' %(self.backendServer.backend.config.timeout)]
+        if log.isEnabled('rsync',9):
+            args.append('--verbose')
+        else:
+            args.append('--quiet')
+        bwlimit = self.backendServer.backend.config.bandwidth_limit
+        if bwlimit:
+            bwlimit = bwlimit / 1000 # rsync wants kbps
+            if bwlimit < 1: 
+                bwlimit = 1
+            args.append('--bwlimit=%d' % (bwlimit))
+        args.extend([uri, '.'])
+        log.debug('rsync command: (%s) %s' %(self.cache_dir, string.join(args,' ')), 'rsyncFetcher')
         self.rsyncProcess = reactor.spawnProcess(self, self.rsyncCommand, args, None,
                                             self.cache_dir)
 
@@ -867,8 +876,8 @@
                 return
         else:
             # Tempfile has gone, stream main file
-            #log.debug("sendData open dest " + str(self.bytes_sent))
-            f = open(self.cache_path, 'rb')
+            log.debug("sendData open dest (sent: %s bytes)"% (self.bytes_sent), 'rsync_client')
+            f = open(self.file_path, 'rb')
 
         if f:
             f.seek(self.bytes_sent)
@@ -876,30 +885,30 @@
             #log.debug("sendData got " + str(len(data)))
             f.close()
             if data:
-                self.parent.data_received(data)
+                self.parent.data_received(data, save=False)
                 self.bytes_sent = self.bytes_sent + len(data)
                 reactor.callLater(0, self.sendData)
             elif not self.rsyncTempFile:
                 # Finished reading final file
-                log.debug("sendData complete")
+                log.debug("sendData complete. Bytes sent: %s" %(self.bytes_sent))
                 # Tell clients, but data is already saved by rsync so don't
                 # write file again
                 self.parent.download_complete()
+                self.parent.connection_closed() # We don't have a persistent connection
 
     def processEnded(self, status_object):
         __pychecker__ = 'unusednames=reason'
         self.rsyncTempFile = None
         self.rsyncProcess = None
 
-        log.debug("rsync terminated: %s" %(status_object)
-                ,'rsync_client')
         r = status_object.trap(error.ProcessTerminated, error.ProcessDone)
         if r == error.ProcessDone:
+            log.debug("rsync process complete", 'rsync_client')
             # File received.  Send to clients.
-            self.parent.server_mtime(os.stat(self.cache_path)[stat.ST_MTIME])
+            self.parent.server_mtime(os.stat(self.file_path)[stat.ST_MTIME])
             reactor.callLater(0, self.sendData)
         elif r == error.ProcessTerminated:
-            log.debug("Status: %d" %(status_object.value.exitCode)
+            log.debug("Status: %s" %(status_object.value.exitCode)
                       ,'rsync_client')
             exitcode = status_object.value.exitCode
             if exitcode == 10:
@@ -917,6 +926,7 @@
             log.debug("disconnect: killing rsync child pid " + 
                       str(self.rsyncProcess.pid), 'rsync_client')
             os.kill(self.rsyncProcess.pid, signal.SIGTERM)
+            self.transport.loseConnection()
 
 class FetcherCachedFile(Fetcher):
     """
@@ -959,7 +969,7 @@
             return
         self.factory.file_served(request.uri)
         self.size = request.local_size
-        
+
         self.start_transfer(request)
         
     def start_transfer(self, request):

Modified: people/halls/rework/apt_proxy/misc.py
==============================================================================
--- people/halls/rework/apt_proxy/misc.py	(original)
+++ people/halls/rework/apt_proxy/misc.py	Tue May  9 22:39:28 2006
@@ -14,8 +14,8 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-import os
-from twisted.internet import reactor
+import os, time
+from twisted.internet import reactor, defer
 from twisted import python
 
 class DomainLogger:
@@ -114,6 +114,7 @@
         self.timer = timer
         self.factory = factory
         self.callback = None
+        self.working = None  # Deferred triggered when recycler finishes
     def start(self):
         """
         Starts the Recycler if it is not working, it will use
@@ -121,9 +122,11 @@
         tree.
         """
         if not self.working:
+            self.working = defer.Deferred()
             if self.factory.backends == []:
                 log.msg("NO BACKENDS FOUND",'recycle')
-                return
+                self.working.errback(python.failure.Failure())
+                return self.working
             self.cur_uri = '/'
             self.cur_dir = self.factory.config.cache_dir
             self.pending = []
@@ -131,16 +134,20 @@
                  self.pending.append(backend.base)
             self.stack = []
             self.callback = reactor.callLater(self.timer, self.process)
-            self.working = 1
+        return self.working
     def stop(self):
         if self.callback is not None:
             self.callback.cancel()
             self.callback = None
+            if self.working:
+                self.working.callback(None)
+                self.working = None
     def pop(self):
         if self.stack:
             (self.cur_dir, self.cur_uri, self.pending) = self.stack.pop()
         else:
-            self.working = 0
+            self.working.callback(None)
+            self.working = None
     def push(self):
         if self.pending:
             self.stack.append((self.cur_dir, self.cur_uri, self.pending))
@@ -161,8 +168,11 @@
             self.cur_uri = uri
             self.pending = os.listdir(self.cur_dir)
             if not self.pending:
-                log.msg("Pruning empty directory: "+path,'recycle')
-                os.removedirs(path)
+                # Prune directory if it has not just been created
+                pathage = time.time() - os.path.getctime(path)
+                if pathage > 60:
+                    log.msg("Pruning empty directory: "+path,'recycle')
+                    os.removedirs(path)
         elif os.path.isfile(path):
             ext = os.path.splitext(path)[1]
             if not ext == 'apDownload':

Modified: people/halls/rework/apt_proxy/test/test_apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/test/test_apt_proxy.py	(original)
+++ people/halls/rework/apt_proxy/test/test_apt_proxy.py	Tue May  9 22:39:28 2006
@@ -70,7 +70,7 @@
 """
 
 class apTestHelper(unittest.TestCase):
-    default_config = "[DEFAULT]\ndebug=all:9 apt:0 memleak:0\n cleanup_freq=off\n" # Config string to use
+    default_config = "[DEFAULT]\ndebug=all:9 apt:0 memleak:0\ncleanup_freq=off\n" # Config string to use
     def setUp(self):
         self.cache_dir = tempfile.mkdtemp('.aptproxy')
         self.config = self.default_config.replace('[DEFAULT]','[DEFAULT]\ncache_dir=' + self.cache_dir)
@@ -261,3 +261,55 @@
         # Max versions should have deleted one file
         self.assertEquals(len(pkgs), 2)
 
+backendServerConfig = """
+[test_servers]
+backends=http://server1/path1
+         ftp://server2/path2
+         rsync://server3/path3
+         file://server4/path4
+[test_usernames]
+backends=http://myUser:thePassword@server/path
+"""
+class BackendServerTest(FactoryTestHelper):
+    def setUp(self):
+        """
+        Set up a factory using the additional config given
+        """
+        FactoryTestHelper.setUp(self, backendServerConfig)
+        self.backend = self.factory.getBackend('test_servers')
+
+    def testServerHosts(self):
+        values = ['server1','server2','server3','server4']
+        for server in self.backend.uris:
+            value = values[self.backend.uris.index(server)]
+            self.assertEquals(server.host, value)
+    def testServerPaths(self):
+        values = ['/path1','/path2','/path3','/path4']
+        for server in self.backend.uris:
+            value = values[self.backend.uris.index(server)]
+            self.assertEquals(server.path, value)
+    def testServerProtocols(self):
+        values = ['http','ftp','rsync','file']
+        for server in self.backend.uris:
+            value = values[self.backend.uris.index(server)]
+            self.assertEquals(server.scheme, value)
+    def testServerDefaultPorts(self):
+        values = [80,21,873,0]
+        for server in self.backend.uris:
+            value = values[self.backend.uris.index(server)]
+            self.assertEquals(server.port, value)
+    def testStr(self):
+        "__str__ operator"
+        for server in self.backend.uris:
+            self.assertNotEquals(server.__str__(), None)
+    def testNoUser(self):
+        self.assertEquals(self.backend.uris[0].username,None)
+    def testNoPassword(self):
+        self.assertEquals(self.backend.uris[0].password,None)
+    def testUser(self):
+        backend = self.factory.getBackend('test_usernames')
+        self.assertEquals(backend.uris[0].username,'myUser')
+    def testPassword(self):
+        backend = self.factory.getBackend('test_usernames')
+        self.assertEquals(backend.uris[0].password,'thePassword')
+        
\ No newline at end of file

Modified: people/halls/rework/apt_proxy/test/test_config.py
==============================================================================
--- people/halls/rework/apt_proxy/test/test_config.py	(original)
+++ people/halls/rework/apt_proxy/test/test_config.py	Tue May  9 22:39:28 2006
@@ -35,12 +35,12 @@
 port=8989
 address=1.2.3.4 5.6.7.8
 timeout = 888
-read_limit = 2323
+bandwidth_limit = 2323
 
 [backend1]
 backends = ftp://a.b.c
 timeout = 999
-read_limit = 3434
+bandwidth_limit = 3434
 
 [backend2]
 backends = 
@@ -81,13 +81,13 @@
         self.assertEquals(self.c.backends['dynamic1'].name,'dynamic1')
         self.assertEquals(self.c.backends['dynamic1'].dynamic,True)
         self.assertEquals(self.c.backends['dynamic1'].timeout,888)
-    def testReadLimit(self):
-        self.assertEquals(self.c.read_limit, 2323)
-        self.assertEquals(self.c.backends['backend1'].read_limit,3434)
-        self.assertEquals(self.c.backends['backend2'].read_limit,2323)
+    def testBandwidthLimit(self):
+        self.assertEquals(self.c.bandwidth_limit, 2323)
+        self.assertEquals(self.c.backends['backend1'].bandwidth_limit,3434)
+        self.assertEquals(self.c.backends['backend2'].bandwidth_limit,2323)
 
 class DefaultsTest(unittest.TestCase):
     def setUp(self):
         self.c = apConfig(StringIO(""))
     def testDefaultReadLimit(self):
-        self.assertEquals(self.c.read_limit, None)
+        self.assertEquals(self.c.bandwidth_limit, None)

Modified: people/halls/rework/apt_proxy/test/test_requests.py
==============================================================================
--- people/halls/rework/apt_proxy/test/test_requests.py	(original)
+++ people/halls/rework/apt_proxy/test/test_requests.py	Tue May  9 22:39:28 2006
@@ -19,16 +19,18 @@
 import os, time
 from twisted.trial import unittest
 from twisted.internet import protocol, reactor, defer
+from twisted.python import failure
 from twisted import web
 from twisted.web import http
 from StringIO import StringIO
 
 from apt_proxy.apt_proxy_conf import apConfig
-from apt_proxy.test.test_apt_proxy import apTestHelper
 from apt_proxy.cache import CacheEntry
 from apt_proxy.apt_proxy import Factory
 from apt_proxy.misc import log
 from apt_proxy.fetchers import DownloadQueue
+from apt_proxy.test.test_fetchers import RsyncServer
+from apt_proxy.test.test_apt_proxy import apTestHelper
 
 class uriRequester(http.HTTPClient):
     """
@@ -37,6 +39,7 @@
     def __init__(self, factory):
         self.factory = factory
         self.http_status = None
+        self.received_len = 0
 
     def connectionMade(self):
         """
@@ -57,13 +60,21 @@
                    (version, code, message, self.factory.expectedResponse), 'uriRequesterTest')
         self.http_status = int(code)
 
-
-    #def dataReceived(self, data):
-    #    log.debug("data received, len: %s" % (len(data)), 'uriRequesterTest')
+    def dataReceived(self, data):
+        self.received_len = self.received_len + len(data)
+        log.debug("data received, len: %s" % (self.received_len), 'uriRequesterTest')
+        http.HTTPClient.dataReceived(self, data)
+        
     def handleResponse(self, buffer):
-        log.debug('data received: %s bytes' % (len(buffer)), 'uriRequesterTest')
-        self.received_len = len(buffer)
+        received_len = len(buffer)
+        log.debug('data received: %s bytes, expected:%s' % (received_len, self.factory.expectedSize), 'uriRequesterTest')
         if self.http_status != self.factory.expectedResponse:
+            log.debug('test FAILED: response code (%s) is not %s' % 
+                       (self.http_status, self.factory.expectedResponse), 'uriRequesterTest')
+            self.factory.failed()
+        elif self.factory.expectedSize is not None and received_len != self.factory.expectedSize:
+            log.debug('test FAILED: received %s bytes, but expected %s' % 
+                    (received_len, self.factory.expectedSize), 'uriRequesterTest')
             self.factory.failed()
         else:
             self.factory.passed()
@@ -73,15 +84,16 @@
     Helper factory to connect to apt-proxy and send
     HTTP requests using uriRequester
     """
-    def __init__(self, filename, host, expectedResponse, if_modified_since=None):
+    def __init__(self, filename, host, expectedResponse, if_modified_since=None, expectedSize=None):
         self.filename = filename
         self.host = host
         self.testDone = False
         self.testPassed = False
         self.timedOut = False
         self.expectedResponse = expectedResponse
-        self.timeout = reactor.callLater(30, self.timeout)
         self.if_modified_since = if_modified_since
+        self.expectedSize = expectedSize # If not none, the file sent should have this size
+        self.deferred = defer.Deferred() # Deferred that returns result of test
     #def startedConnecting(self, connector):
     #    print 'Started to connect.'
     def buildProtocol(self, addr):
@@ -92,30 +104,18 @@
     def clientConnectionLost(self, connector, reason):
         log.debug('Lost connection.  Reason:'+ str(reason))
         self.testDone = True
-        self.cancelTimeout()
-
-    def cancelTimeout(self):
-        if self.timeout is not None:
-            self.timeout.cancel()
-            self.timeout = None
+        #self.passed()
 
     def clientConnectionFailed(self, connector, reason):
-        print 'Connection failed. Reason:', reason
+        log.err('Connection failed. Reason:', reason, 'requestFactory')
         self.failed()
 
-    def timeout(self):
-        # print 'Test timeout'
-        log.debug("Test timeout", 'uriRequesterTest')
-        self.timeout = None
-        self.testDone = True
-        self.timedOut = True
     def passed(self):
-        self.testDone = True
-        self.testPassed = True
-        self.cancelTimeout()
+        log.debug('test passed', 'requestFactory')
+        self.deferred.callback(None)
     def failed(self):
-        self.testDone = True
-        self.cancelTimeout()
+        log.debug('test failed', 'requestFactory')
+        self.deferred.errback(failure.Failure())
 
 class TestRequestHelper(apTestHelper):
     def setUp(self, config):
@@ -134,19 +134,21 @@
         apTestHelper.tearDown(self)
         self.assertRaises(OSError, os.stat, self.cache_dir)
 
-    def doRequest(self, file, responseCode, if_modified_since=None):
+    def doRequest(self, file, responseCode, if_modified_since=None, filePath=None):
         portno = self.port.getHost().port
         log.debug("Starting test connection to 127.0.0.1:%s, file:%s:" %(portno, file), 'uriRequesterTest')
-        clientFactory = requestFactory(file, "127.0.0.1:%s"% (portno), responseCode, if_modified_since)
+        if filePath is not None:
+            # Get size of actual file
+            expectedSize = os.path.getsize(filePath)
+        else:
+            expectedSize = None
+        clientFactory = requestFactory(file, "127.0.0.1:%s"% (portno), responseCode, if_modified_since, expectedSize)
         connection = reactor.connectTCP("127.0.0.1", portno, clientFactory)
+        self.connection = connection
 
-        while clientFactory.testDone == False:
-            #print "iterate.."
-            reactor.iterate(0.1)
-        self.assertNotEquals(clientFactory.timedOut, True)
-        self.assertEquals(clientFactory.testPassed, True)
-
-        connection.disconnect()
+        clientFactory.deferred.addBoth(lambda x: connection.disconnect())
+        self.lastRequestFactory = clientFactory
+        return clientFactory.deferred
 
 class FileBackendTest(TestRequestHelper):
     def setUp(self):
@@ -155,24 +157,26 @@
         [files]
         backends=file:///<path to test packages directory>
         """
-        filedir = os.path.normpath(os.getcwd()+"/../test_data/packages")
+        self.filedir = os.path.normpath(os.getcwd()+"/../test_data/packages")
         config = ("dynamic_backends=off\n"+
                   "[files]\n" +
-                  "backends=file://" + filedir)
+                  "backends=file://" + self.filedir)
         #print "config: " + config
         TestRequestHelper.setUp(self, config)
 
     def testNotFound(self):
-        self.doRequest('/files/test.gz', http.NOT_FOUND)
+        return self.doRequest('/files/test.gz', http.NOT_FOUND)
     def testPackagesFile(self):
-        self.doRequest('/files/Packages.gz', http.OK)
+        file = 'Packages.gz'
+        return self.doRequest('/files/'+file, http.OK, filePath=self.filedir+os.sep+file).addCallback(self.PackagesFile2)
+    def PackagesFile2(self, x):
         backend = self.factory.getBackend('files')
         # Check that request was deleted from backend
         self.assertEquals(len(backend.entries), 0)
 
     def testForbidden(self):
-        self.doRequest('/notbackend/Release', http.NOT_FOUND)
-        
+        d = self.doRequest('/notbackend/Release', http.NOT_FOUND)
+        return d
 
 class WebServer:
     def start(self):
@@ -192,96 +196,147 @@
     def stop(self):
         self.port.stopListening()
 
-class DebianHttpBackendTest(TestRequestHelper):
-    def setUp(self):
+class BackendTestBase:
+    """
+    Class to perform a series of requests against a test backend.
+    Derived classes should arrange for a local server to serve
+    files from the test_daya directory.
+    """
+
+    # Name of test backend
+    backendName = 'test_data'
+
+    def setUp(self, backend_uri):
         """
         Make a configuration with a single backend
-        [files]
-        backends=file:///<path to test packages directory>
+
+        @param backend_uri: backend server uri e.g. http://127.0.0.1:1234
         """
-        #config = ("""
-#dynamic_backends=off
-#[debian]
-#backends=http://ftp.debian.org/debian
-        #""")
-        self.httpserver = WebServer()
-        port = self.httpserver.start()
         config = ("dynamic_backends=off\n" +
                   "[test_data]\n" +
-                  "backends=http://127.0.0.1:" + str(port))
+                  "backends=" + str(backend_uri))
         TestRequestHelper.setUp(self, config)
+        self.testfilesdir = os.path.normpath(os.getcwd()+"/../test_data")
     def tearDown(self):
-        self.httpserver.stop()
+        log.debug("tearDown", self.debugname)
         TestRequestHelper.tearDown(self)
     def testNotFound(self):
-        self.doRequest('/test_data/NotHere.gz', http.NOT_FOUND)
-    def downloadFile(self):
+        return self.doRequest('/test_data/NotHere.gz', http.NOT_FOUND)
+    testNotFound.timeout = 2
+
+    def downloadFile(self, file='/packages/Packages.gz'):
         """
         Download a file to cache
         self.backend is set to backend name
         self.file is set to filename
         self.filepath is set to physical filename
         """
-        self.backend = 'test_data'
-        self.filename = '/' + self.backend + '/packages/Packages.gz'
+        self.filename = '/' + self.backendName + file
         self.filepath = self.cache_dir + self.filename
 
         # File should not be in cache
         self.assertRaises(OSError, os.stat, self.filepath)
-        self.doRequest(self.filename, http.OK)
-
-        # Check that file was really placed in cache
-        os.stat(self.filepath) 
+        d = self.doRequest(self.filename, http.OK, filePath=self.testfilesdir+file)
+        def checkPath(x):
+            # Check that file was really placed in cache
+            os.stat(self.filepath) 
+        d.addCallback(checkPath)
+        return d
 
     def testPackagesFile(self):
-        self.downloadFile()
-
+        return self.downloadFile().addCallback(self.PackagesFile2)
+    def PackagesFile2(self, x):
         # Check access time datbase was updated
         self.assertApproximates(self.factory.access_times[self.filename], time.time(), 6)
+    testPackagesFile.timeout = 2
 
     def testNotModifiedGreater(self):
         "Check http not modified is sent for new file"
-        self.downloadFile()
-        self.doRequest(self.filename, http.NOT_MODIFIED, time.time())
+        d = self.downloadFile()
+        self.testResult = defer.Deferred()
+        d.addCallback(self.NotModifiedGreater2)
+        d.addErrback(lambda x: self.testResult.errback(failure.Failure()))
+        return self.testResult
+    def NotModifiedGreater2(self, x):
+        log.debug("testNotModifiedGreater: starting second client", self.debugname)
+        d = self.doRequest(self.filename, http.NOT_MODIFIED, time.time())
+        d.chainDeferred(self.testResult)
+    testNotModifiedGreater.timeout = 3
 
     def testNotModifiedExact(self):
-        self.downloadFile()
-        self.doRequest(self.filename, http.NOT_MODIFIED, os.path.getmtime(self.filepath))
+        d= self.downloadFile()
+        self.testResult = defer.Deferred()
+        d.addCallback(self.NotModifiedGreater2)
+        d.addErrback(lambda x: self.testResult.errback(failure.Failure()))
+        return self.testResult
+    def NotModifiedExact2(self, x):
+        d = self.doRequest(self.filename, http.NOT_MODIFIED, os.path.getmtime(self.filepath))
+        d.chainDeferred(self.testResult)
+    testNotModifiedExact.timeout = 2
 
     def testCloseFetcherImmediately(self):
         DownloadQueue.closeTimeout = 0 # Close fetcher immediately
-        self.downloadFile()
-        f = self.factory.getBackend(self.backend).queue.fetcher
+        return self.downloadFile().addCallback(self.CloseFetcherImmediately2)
+    def CloseFetcherImmediately2(self, x):
+        f = self.factory.getBackend(self.backendName).queue.fetcher
         self.assertEquals(f, None)
+    testCloseFetcherImmediately.timeout = 2
 
     def testLeaveFetcherOpen(self):
         DownloadQueue.closeTimeout = 2 # 2 second delay to close
-        self.downloadFile()
-        f = self.factory.getBackend(self.backend).queue.fetcher
+        return self.downloadFile().addCallback(self.LeaveFetcherOpen2)
+    def LeaveFetcherOpen2(self, x):
+        f = self.factory.getBackend(self.backendName).queue.fetcher
         self.assertNotEquals(f, None)
+    testLeaveFetcherOpen.timeout = 4
 
     def testAutoCloseFetcher(self):
         DownloadQueue.closeTimeout = 0.1
-        self.downloadFile()
-        self.f = self.factory.getBackend(self.backend).queue.fetcher
-        d = defer.Deferred()
-        reactor.callLater(0.2, self.AutoCloseFetcherResult, d)
-        return d
-    def AutoCloseFetcherResult(self, deferred):
-        f = self.factory.getBackend(self.backend).queue.fetcher
+        d = self.downloadFile()
+        self.autoclosedeferred = defer.Deferred()
+        d.addCallback(self.AutoCloseFetcher2)
+        d.addErrback(lambda x: self.autoclosedeferred.errback(failure.Failure()))
+        return self.autoclosedeferred
+    def AutoCloseFetcher2(self, x):
+        # File is downloaded, now check fetcher state
+        self.f = self.factory.getBackend(self.backendName).queue.fetcher
+        reactor.callLater(0.2, self.AutoCloseFetcher3)
+    def AutoCloseFetcher3(self):
+        f = self.factory.getBackend(self.backendName).queue.fetcher
         self.assertEquals(f, None)
-        deferred.callback(None)
+        self.autoclosedeferred.callback(None)
+    testAutoCloseFetcher.timeout = 2
 
     def testCached(self):
-        self.downloadFile()
-
-        self.doRequest(self.filename, http.OK)
-
-        log.debug("Downloading second copy", 'DebianHttpBackendTest')
+        self.testResult = defer.Deferred()
+        d = self.downloadFile()
+        d.addCallback(self.Cached2)
+        d.addErrback(self.CachedError)
+        return self.testResult
+    def Cached2(self, x):
+        d = self.doRequest(self.filename, http.OK, filePath=self.filepath)
+        d.addCallback(self.Cached3)
+        d.addErrback(self.CachedError)
+    def Cached3(self, x):
+        log.debug("Downloading second copy", self.debugname)
         self.factory.config.min_refresh_delay = 0
-        self.doRequest(self.filename, http.OK)
+        d = self.doRequest(self.filename, http.OK, filePath=self.filepath)
+        d.addCallback(self.CachedPass)
+        d.addErrback(self.CachedError)
+    def CachedPass(self, x):
+        self.testResult.callback(None)
+    def CachedError(self, x):
+        log.debug("testCached ERROR", self.debugname)
+        self.testResult.errback(failure.Failure())
     testCached.timeout = 2
 
+    def testBwLimit(self):
+        "Bandwidth limiting"
+        b = self.factory.getBackend(self.backendName)
+        b.config.bandwidth_limit = 10000000
+        # We're not testing here that limiting is applied, just that the code runs
+        return self.downloadFile(file='/packages/apt_0.0.1_test.deb')
+
     #def testTimeout(self):
         #pass
     #testTimeout.todo = True
@@ -298,4 +353,56 @@
 
     # More TODO tests:
     # - file mtime is same as server mtime
-    # - correct file path is entered in databases after download
\ No newline at end of file
+    # - correct file path is entered in databases after download
+
+class HttpBackendTest(TestRequestHelper, BackendTestBase):
+    def setUp(self):
+        """
+        Make a configuration with a single backend
+        [files]
+        backends=file:///<path to test packages directory>
+        """
+        self.debugname = 'HttpBackendTest'
+        self.httpserver = WebServer()
+        port = self.httpserver.start()
+        uri = "http://127.0.0.1:" + str(port)
+        BackendTestBase.setUp(self, uri)
+    def tearDown(self):
+        self.httpserver.stop()
+        BackendTestBase.tearDown(self)
+
+class RsyncBackendTest(TestRequestHelper, BackendTestBase):
+    def setUp(self):
+        """
+        Make a configuration with a single backend
+        [files]
+        backends=file:///<path to test packages directory>
+        """
+        self.debugname = 'RsyncBackendTest'
+        self.rsyncserver = RsyncServer()
+        port = self.rsyncserver.start()
+        uri = "rsync://127.0.0.1:" + str(port) + '/apt-proxy'
+        BackendTestBase.setUp(self, uri)
+    def tearDown(self):
+        self.rsyncserver.stop()
+        BackendTestBase.tearDown(self)
+    def testTempFile(self):
+        "rysnc Tempfile is detected"
+        b = self.factory.getBackend(self.backendName)
+        b.config.bandwidth_limit = 100000
+        self.downloadFile(file='/packages/apt_0.0.1_test.deb')
+        reactor.callLater(0.5, self.TempFile2)
+        self.testResult = defer.Deferred()
+        return self.testResult
+    def TempFile2(self):
+        fetcher = self.factory.getBackend(self.backendName).queue.fetcher.fetcher
+        fetcher.findRsyncTempFile()
+        file = fetcher.rsyncTempFile
+        log.debug("rsync TempFile is %s" % (file), self.debugname)
+        fetcher.disconnect()
+        self.connection.disconnect()
+        if file is not None:
+            self.testResult.callback("Tempfile is %s" %(file))
+        else:
+            self.testResult.errback(failure.Failure())
+    testTempFile.timeout=2
\ No newline at end of file

Modified: people/halls/rework/debian/changelog
==============================================================================
--- people/halls/rework/debian/changelog	(original)
+++ people/halls/rework/debian/changelog	Tue May  9 22:39:28 2006
@@ -6,10 +6,12 @@
   * Add support for username and password in http_proxy parameter.
     Thanks to Thomas Champagne for the patch (Closes: #323147)
   * Move fetchers and cache management into separate files
-  * Add bandwidth limiting for http backends (Part of #306095)
+  * Add bandwidth_limit configuration parameter to limit download
+    rates (Closes: #306095)
   * Add more unit tests
+  * Add support for rsync port specification
 
- -- Chris Halls <chris.halls at credativ.co.uk>  Wed, 29 Mar 2006 11:04:41 +0100
+ -- Chris Halls <halls at debian.org>  Fri,  5 May 2006 18:34:40 +0100
 
 apt-proxy (1.9.33) unstable; urgency=low
 

Modified: people/halls/rework/doc/apt-proxy.conf
==============================================================================
--- people/halls/rework/doc/apt-proxy.conf	(original)
+++ people/halls/rework/doc/apt-proxy.conf	Tue May  9 22:39:28 2006
@@ -44,6 +44,9 @@
 ;; Use HTTP proxy?
 ;http_proxy = host:port
 
+;; Limit download rate from backend servers (http and rsync only), in bytes/sec
+;bandwidth_limit = 100000
+
 ;; Enable HTTP pipelining within apt-proxy (for test purposes)
 ;disable_pipelining=0
 

Modified: people/halls/rework/doc/apt-proxy.conf.5
==============================================================================
--- people/halls/rework/doc/apt-proxy.conf.5	(original)
+++ people/halls/rework/doc/apt-proxy.conf.5	Tue May  9 22:39:28 2006
@@ -88,6 +88,12 @@
 disabled by default until this is fixed.  Set to \fB0\fP to enable experimental
 http pipelining.  Default: 1
 
+.TP
+.B bandwidth_limit = \fIamount\fR
+When downloading from a backend server, limit the download speed to 
+\fIamount\fR bytes per second. Note this applies to \fBhttp\fP and \fBrsync\fP
+backends only. Default: no limit
+
 .PP
 .SH RESOURCES
 All other sections in the configuration file will be interpreted as resource
@@ -112,6 +118,11 @@
 .B passive_ftp
 Override the global setting of passive_ftp
 
+.TP
+.B bandwidth_limit
+Set a bandwidth limit for downloads for this resource, overriding the global
+bandwidth_limit
+
 .SH CONFIGURATION EXAMPLES
 
 To access a resource that's listed under a specific section name, simply append



More information about the apt-proxy-devel mailing list