[apt-proxy-devel] r601 - in people/halls/rework: apt_proxy debian
Chris Halls
halls at costa.debian.org
Sat Apr 8 15:42:34 UTC 2006
Author: halls
Date: Sat Apr 8 15:42:30 2006
New Revision: 601
Modified:
people/halls/rework/apt_proxy/apt_proxy.py
people/halls/rework/apt_proxy/cache.py
people/halls/rework/apt_proxy/fetchers.py
people/halls/rework/debian/changelog
Log:
Port rsync backend to new fetcher API (not yet debugged)
Modified: people/halls/rework/apt_proxy/apt_proxy.py
==============================================================================
--- people/halls/rework/apt_proxy/apt_proxy.py (original)
+++ people/halls/rework/apt_proxy/apt_proxy.py Sat Apr 8 15:42:30 2006
@@ -228,7 +228,8 @@
return
backendName = self.uri[1:].split('/')[0]
- log.debug("Request: %s %s backend=%s uri=%s"%(self.method, self.uri, backendName, self.uri),'Request')
+ log.debug("Request: %s %s backend=%s uri=%s"
+ % (self.method, self.uri, backendName, self.uri),'Request')
if self.factory.config.disable_pipelining:
self.setHeader('Connection','close')
Modified: people/halls/rework/apt_proxy/cache.py
==============================================================================
--- people/halls/rework/apt_proxy/cache.py (original)
+++ people/halls/rework/apt_proxy/cache.py Sat Apr 8 15:42:30 2006
@@ -353,6 +353,14 @@
for request in self.requests:
request.finishCode(http_code, reason)
+ # Remove directory if file was not created
+ if not os.path.exists(self.file_path):
+ try:
+ os.removedirs(self.filedir)
+ except:
+ pass
+
+
def file_sent(self):
"""
File has been sent successfully to at least one client
Modified: people/halls/rework/apt_proxy/fetchers.py
==============================================================================
--- people/halls/rework/apt_proxy/fetchers.py (original)
+++ people/halls/rework/apt_proxy/fetchers.py Sat Apr 8 15:42:30 2006
@@ -21,7 +21,7 @@
network backends
"""
-import re, os, string, time
+import re, os, string, time, glob, signal
from twisted.web import static, http
from twisted.internet import protocol, reactor, defer
from twisted.python import failure
@@ -127,14 +127,10 @@
self.deferred.callback((False, reason_msg))
def cancel_download(self):
- if self.fetcher and self.fetcher.transport:
+ if self.fetcher:
log.debug(
- "telling the transport to loseConnection",'Fetcher')
- try:
- self.fetcher.transport.loseConnection()
- except KeyError:
- # Rsync fetcher already loses conneciton for us
- pass
+ "telling fetchers to disconnect",'Fetcher')
+ self.fetcher.disconnect()
self.download_failed(None, "Download canceled")
def data_received(self, data):
@@ -210,6 +206,10 @@
# TODO - failover?
self.download_failed(http.NOT_FOUND, "file not found on backend")
+ def fetcher_internal_error(self, reason):
+ log.msg("(%s) internal error: %s" % (self.backendServer.path, reason), 'fetcher')
+ self.download_failed(http.INTERNAL_SERVER_ERROR, reason)
+
def send_complete_file(self, filename):
"""
Send a complete file (used by FileFetcher)
@@ -366,7 +366,7 @@
else:
log.err("File transfer overrun! Expected size:%s Received size:%s" %
(self.server_size, self.fetcher.len_received), 'http_client')
- self.parent.download_failure(http.INTERNAL_SERVER_ERROR, "Data overrun")
+ self.parent.fetcher_internal_error("Data overrun")
# def handleResponse(self, buffer):
# if self.length == 0:
@@ -463,7 +463,7 @@
self.connection.transport.loseConnection()
self.isConnected = False
-class FtpFetcher(Fetcher, protocol.Protocol):
+class FtpFetcher(protocol.Protocol):
"""
This is the secuence here:
@@ -609,7 +609,7 @@
def ftpListFailed(self, msgs):
log.debug("ftp list failed: %s" % (msgs), 'ftp_client')
- self.parent.download_failed(http.INTERNAL_SERVER_ERROR, "Could not list directory")
+ self.parent.fetcher_internal_error("Could not list directory")
def ftpFetchFile(self):
"And finally, we ask for the file."
@@ -756,56 +756,67 @@
self.apDataReceived("")
self.apDataEnd(self.transfered)
-class RsyncFetcher(Fetcher, protocol.ProcessProtocol):
+class RsyncFetcher(protocol.ProcessProtocol):
"""
- I frequently am not called directly, Request.fetch makes the
- arrangement for FetcherGzip to use us and gzip the result if needed.
+ Fetch a file using the rsync protocol
+ rsync is run as an external process
"""
- post_convert = re.compile(r"^Should not match anything$")
- gzip_convert = re.compile(r"/Packages.gz$")
-
- "Temporary filename that rsync streams to"
- rsyncTempFile = None
-
- "Number of bytes sent to client already"
- bytes_sent = 0
+ rsyncCommand = '/usr/bin/rsync'
- def activate (self, request):
- Fetcher.activate(self, request)
- if not request.apFetcher:
- return
+ def __init__(self, backendServer):
+ self.backendServer = backendServer
+ self.rsyncProcess = None
+
+ def connect(self):
+ # We can't connect seperately so just return true
+ return defer.succeed(True)
+
+ def download(self, fetcher, uri, mtime):
+ """
+ Request download
+ %param fetcher: Fetcher class to receive callbacks
+ %param uri: URI of file to be downloaded within backend
+ %param mtime: Modification time of current file in cache
+ """
+ self.rsyncTempFile = None # Temporary filename that rsync streams to
+ self.bytes_sent = 0 #Number of bytes sent to client already
+ self.parent = fetcher
+ self.cache_mtime = mtime
+ self.request_uri = uri
+ self.cache_path = fetcher.CacheEntry.cache_path
+ self.cache_dir = fetcher.CacheEntry.filedir
+ self.remote_file = (self.backendServer.path + '/'
+ + uri)
# Change /path/to/FILE -> /path/to/.FILE.* to match rsync tempfile
- self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.local_file)
-
+ self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.cache_path)
+
for file in glob.glob(self.globpattern):
- log.msg('Deleting stale tempfile:' + file)
+ log.msg('Deleting stale tempfile:' + file, 'rsyncFetcher')
unlink(file)
-
- uri = 'rsync://'+request.backendServer.host\
- +request.backendServer.path+'/'+request.backend_uri
- self.local_dir=re.sub(r"/[^/]*$", "", self.local_file)+'/'
- exe = '/usr/bin/rsync'
+ uri = 'rsync://'+ self.backendServer.host \
+ +self.backendServer.path+'/' + self.request_uri
+
if(log.isEnabled('rsync',9)):
- args = (exe, '--partial', '--progress', '--verbose', '--times',
- '--timeout', "%d"%(request.backend.config.timeout),
+ args = (self.rsyncCommand, '--partial', '--progress', '--verbose', '--times',
+ '--timeout', "%d"%(self.backendServer.backend.config.timeout),
uri, '.',)
else:
- args = (exe, '--quiet', '--times', uri, '.',
- '--timeout', "%d"%(request.backend.config.timeout),
+ args = (self.rsyncCommand, '--quiet', '--times', uri, '.',
+ '--timeout', "%d"%(self.backendServer.backend.config.timeout),
)
- if(not os.path.exists(self.local_dir)):
- os.makedirs(self.local_dir)
- self.process = reactor.spawnProcess(self, exe, args, None,
- self.local_dir)
+ #if(not os.path.exists(self.cache_dir)):
+ # os.makedirs(self.cache_dir)
+ self.rsyncProcess = reactor.spawnProcess(self, self.rsyncCommand, args, None,
+ self.cache_dir)
def findRsyncTempFile(self):
"""
Look for temporary file created by rsync during streaming
"""
files = glob.glob(self.globpattern)
-
+
if len(files)==1:
self.rsyncTempFile = files[0]
log.debug('tempfile: ' + self.rsyncTempFile, 'rsync_client')
@@ -814,13 +825,13 @@
pass
else:
log.err('found more than one tempfile, abort rsync')
- self.transport.loseConnection()
-
- def connectionMade(self):
- pass
+ self.parent.fetcher_internal_error("Found more than one rsync temporary file")
+
+ #def connectionMade(self):
+ # pass
- "Data received from rsync process to stdout"
def outReceived(self, data):
+ "Data received from rsync process to stdout"
for s in string.split(data, '\n'):
if len(s):
log.debug('rsync: ' + s, 'rsync_client')
@@ -828,14 +839,13 @@
if not self.rsyncTempFile:
self.findRsyncTempFile()
# Got tempfile?
- if self.rsyncTempFile:
- self.setResponseCode(http.OK)
+ #if self.rsyncTempFile:
+ # self.setResponseCode(http.OK)
if self.rsyncTempFile:
self.sendData()
-
- "Data received from rsync process to stderr"
def errReceived(self, data):
+ "Data received from rsync process to stderr"
for s in string.split(data, '\n'):
if len(s):
log.err('rsync error: ' + s, 'rsync_client')
@@ -850,64 +860,58 @@
else:
# Tempfile has gone, stream main file
#log.debug("sendData open dest " + str(self.bytes_sent))
- f = open(self.local_file, 'rb')
-
+ f = open(self.cache_path, 'rb')
+
if f:
f.seek(self.bytes_sent)
data = f.read(abstract.FileDescriptor.bufferSize)
#log.debug("sendData got " + str(len(data)))
f.close()
if data:
- self.apDataReceived(data)
+ self.parent.data_received(data)
self.bytes_sent = self.bytes_sent + len(data)
reactor.callLater(0, self.sendData)
elif not self.rsyncTempFile:
# Finished reading final file
- #self.transport = None
log.debug("sendData complete")
# Tell clients, but data is already saved by rsync so don't
# write file again
- self.apDataEnd(self.transfered, False)
-
-
+ self.parent.download_complete()
+
def processEnded(self, status_object):
__pychecker__ = 'unusednames=reason'
- log.debug("Status: %d" %(status_object.value.exitCode)
- ,'rsync_client')
self.rsyncTempFile = None
-
- # Success?
- exitcode = status_object.value.exitCode
-
- if exitcode == 0:
- # File received. Send to clients.
- self.server_mtime = os.stat(self.local_file)[stat.ST_MTIME]
- reactor.callLater(0, self.sendData)
- else:
- if exitcode == 10:
- # Host not found
- self.setResponseCode(http.INTERNAL_SERVER_ERROR)
+ self.rsyncProcess = None
+
+ if isinstance(status_object, failure.Failure):
+ log.debug("rsync failure: %s" %(status_object)
+ ,'rsync_client')
+ self.parent.fetcher_internal_error("Error in rsync")
+ else:
+ log.debug("Status: %d" %(status_object.value.exitCode)
+ ,'rsync_client')
+
+ # Success?
+ exitcode = status_object.value.exitCode
+
+ if exitcode == 0:
+ # File received. Send to clients.
+ self.parent.server_mtime(os.stat(self.cache_path)[stat.ST_MTIME])
+ reactor.callLater(0, self.sendData)
else:
- self.setResponseCode(http.NOT_FOUND)
-
- if not os.path.exists(self.local_file):
- try:
- os.removedirs(self.local_dir)
- except:
- pass
- self.apDataReceived("")
- self.apDataEnd(self.transfered)
+ if exitcode == 10:
+ # Host not found
+ self.parent.connection_failed('rsync connection to %s failed'
+ % (self.backendServer.host))
+ else:
+ self.parent.file_not_found()
- def loseConnection(self):
+ def disconnect(self):
"Kill rsync process"
- if self.transport:
- if self.transport.pid:
- log.debug("killing rsync child" +
- str(self.transport.pid), 'rsync_client')
- os.kill(self.transport.pid, signal.SIGTERM)
- #self.transport.loseConnection()
-
-
+ if self.rsyncProcess and self.rsyncProcess.pid:
+ log.debug("disconnect: killing rsync child pid " +
+ str(self.rsyncProcess.pid), 'rsync_client')
+ os.kill(self.rsyncProcess.pid, signal.SIGTERM)
class FetcherCachedFile(Fetcher):
"""
Modified: people/halls/rework/debian/changelog
==============================================================================
--- people/halls/rework/debian/changelog (original)
+++ people/halls/rework/debian/changelog Sat Apr 8 15:42:30 2006
@@ -6,9 +6,10 @@
* Add support for username and password in http_proxy parameter.
Thanks to Thomas Champagne for the patch (Closes: #323147)
* Move fetchers and cache management into separate files
+ * Add bandwidth limiting for http backends (Part of #306095)
* Add more unit tests
- -- Chris Halls <halls at debian.org> Tue, 11 Oct 2005 16:25:06 +0100
+ -- Chris Halls <chris.halls at credativ.co.uk> Wed, 29 Mar 2006 11:04:41 +0100
apt-proxy (1.9.33) unstable; urgency=low
More information about the apt-proxy-devel
mailing list