r304 - branches/rewrite/src
Otavio Salvador
partial-mirror-devel@lists.alioth.debian.org
Thu, 11 Nov 2004 10:17:33 -0700
Author: otavio
Date: Thu Nov 11 10:17:31 2004
New Revision: 304
Modified:
branches/rewrite/src/Download.py
Log:
Revert async download. We are having problems with broken files.
Modified: branches/rewrite/src/Download.py
==============================================================================
--- branches/rewrite/src/Download.py (original)
+++ branches/rewrite/src/Download.py Thu Nov 11 10:17:31 2004
@@ -30,125 +30,84 @@
if item not in self.queue:
self.queue.append(item)
-class Curl:
- def __init__(self, DisplayStatus):
- self._curl = pycurl.Curl()
- self._curl.setopt(pycurl.FOLLOWLOCATION, 1)
- self._curl.setopt(pycurl.MAXREDIRS, 5)
- self._curl.setopt(pycurl.NOSIGNAL, 1)
- self._curl.setopt(pycurl.CONNECTTIMEOUT, 30)
- self._curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
- self._curl.setopt(pycurl.NOPROGRESS, 0)
- self._curl.setopt(pycurl.TIMEOUT, 300)
- self._curl.setopt(pycurl.FAILONERROR, 1)
- self._fp = None
- self._curl.parent = self
+class DownloadThread(threading.Thread):
+ """ Implement a Download Thread and use a DisplayStatus class to
+ notify the user about what it's currently doing."""
+ _Lock = threading.Lock()
- self.DisplayStatus = DisplayStatus
-
- def setUrl(self, url):
- self._curl.setopt(pycurl.URL, url)
- self._curl.url = url
-
- def setFilename(self, filename):
- if self._fp is not None:
- try:
- self._fp.close()
- except IOError:
- self._fp = None
- self._fp = open(filename, "wb")
- self._curl.setopt(pycurl.WRITEFUNCTION, self._fp.write)
-
- def perform(self):
- self._curl.perform()
-
- def close(self):
- self.http_code = self._curl.getinfo(pycurl.HTTP_CODE)
- self._fp.close()
- self._fp = None
- self._curl.close()
-
- def progress(self, download_t, download_d, upload_t, upload_d):
- if self.DisplayStatus[self._curl.url] == None:
- self.DisplayStatus.start(self._curl.url, download_t)
- self.DisplayStatus.update(self._curl.url, download_d)
+ DisplayStatus = None
-
-class DownloadFetcher(threading.Thread):
- def __init__(self, info = None, downloaders = 3):
- # Add DisplayStatus object.
+ def __init__(self, info = None):
if info == None:
self.DisplayStatus = TextDisplayStatus()
else:
self.DisplayStatus = info
- # Create the needed pycurl objects to manage the connections.
- self._multi = pycurl.CurlMulti()
- self._multi.handles = []
- self._free = Queue()
- for i in range(downloaders):
- curl = Curl(self.DisplayStatus)
- self._multi.handles.append(curl)
-
- self._running = False
- map(lambda x: self._free.put(x), self._multi.handles)
-
threading.Thread.__init__(self)
-
+
def run(self):
- if self._running:
- return # We already running
-
- self._running = True
while 1:
- while 1:
- try:
- fetcher = self._free.get_nowait()
- except Empty:
- self._multi.select()
- break # No fetcher available, process the pending.
+ try:
+ url, filename = Download.queue.get_nowait()
+ except Empty:
+ # Doesn't have any other file to download so exit.
+ return
- try:
- url, filename = Download.queue.get_nowait()
- except Empty:
- # Doesn't have any other file to download so exit.
- self._running = False
- return
-
- # Get a free fetcher
- fetcher.setUrl(url)
- fetcher.setFilename(filename)
-
- self._multi.add_handle(fetcher._curl)
-
- # Run the internal curl state machine for the multi stack
- while 1:
- self._multi.select()
- ret, num_handles = self._multi.perform()
- if ret != pycurl.E_CALL_MULTI_PERFORM:
- break
+ f = open(filename, "wb")
+ curl = pycurl.Curl()
+ curl.setopt(pycurl.FOLLOWLOCATION, 1)
+ curl.setopt(pycurl.MAXREDIRS, 5)
+ curl.setopt(pycurl.URL, url)
+ curl.setopt(pycurl.WRITEFUNCTION, f.write)
+ curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+ curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
+ curl.setopt(pycurl.NOPROGRESS, 0)
+ curl.setopt(pycurl.TIMEOUT, 300)
+ curl.setopt(pycurl.FAILONERROR, 1)
+
+ self.url = url
+
+ # Store counter information about it
+ self._Lock.acquire()
+ DownloadQueue.counter += 1
+ self._counter = DownloadQueue.counter
+ self._Lock.release()
- # Check for curl objects which have terminated, and add them to the freelist
- while 1:
- num_q, ok_list, err_list = self._multi.info_read()
- for c in ok_list:
- self._multi.remove_handle(c)
- self._free.put(c.parent)
- for c, errno, errmsg in err_list:
- self._multi.remove_handle(c)
- self.DisplayStatus.errored(c.url, errmsg)
- self._free.put(c.parent)
- if num_q == 0:
- break
+ try:
+ curl.perform()
+ except Exception, e:
+ self.DisplayStatus.errored(url, e)
+
+ curl.close()
+ try:
+ f.close()
+ except IOError:
+ pass
+
+ # Clear counter information about it
+ self._Lock.acquire()
+ DownloadQueue.counter -= 1
+ self._counter = DownloadQueue.counter
+ self._Lock.release()
+
+ def progress(self, download_t, download_d, upload_t, upload_d):
+ if self.DisplayStatus[self.url] == None:
+ self.DisplayStatus.start(self.url, download_t)
+ self.DisplayStatus.update(self.url, download_d)
class Download:
""" Download queue """
queue = DownloadQueue()
""" Fetcher to use """
- fetcher = None
+ fetchers = []
- def __init__(self, uri, destine, max_concurrent=3):
+ def __init__(self, uri, destine, max_threads=3):
self.queue.put((uri, destine))
- if Download.fetcher is None:
- Download.fetcher = DownloadFetcher(None, max_concurrent)
- Download.fetcher.start()
+
+ # Alloc all needed threads.
+ if len(self.fetchers) < max_threads:
+ for i in range(max_threads - len(self.fetchers)):
+ t = DownloadThread()
+ self.fetchers.append(t)
+ t.start()