r261 - in /debtorrent/trunk/DebTorrent/BT1: AptListener.py makemetafile.py

camrdale-guest at users.alioth.debian.org camrdale-guest at users.alioth.debian.org
Fri Aug 17 04:22:48 UTC 2007

Author: camrdale-guest
Date: Fri Aug 17 04:22:48 2007
New Revision: 261

URL: http://svn.debian.org/wsvn/debtorrent/?sc=1&rev=261
Make the Packages decompression and torrent creation threaded.


Modified: debtorrent/trunk/DebTorrent/BT1/AptListener.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/AptListener.py?rev=261&op=diff
--- debtorrent/trunk/DebTorrent/BT1/AptListener.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/AptListener.py Fri Aug 17 04:22:48 2007
@@ -22,13 +22,11 @@
 from urlparse import urlparse
 from os.path import join
 from cStringIO import StringIO
-from gzip import GzipFile
-from bz2 import decompress
 from time import time, gmtime, strftime
 from DebTorrent.clock import clock
 from sha import sha
 from binascii import b2a_hex
-from makemetafile import getpieces, getsubpieces, uniconvert, convert_all
+from makemetafile import TorrentCreator
 from DebTorrent.HTTPCache import HTTPCache
 import os, logging
 from DebTorrent.__init__ import version, product_name,version_short
@@ -362,9 +360,11 @@
                     # Oops, we do need the cached file after all to start the torrent
                     logger.info('retrieving the cached Packages file to start the torrent')
                     r2 = self.Cache.cache_get(path)
-                    self.got_Packages(path, r2[3])
+                    TorrentCreator(path, r2[3], self.start_torrent, 
+                                   self.rawserver.add_task, self.config['separate_all'])
-                    self.got_Packages(path, r[3])                    
+                    TorrentCreator(path, r[3], self.start_torrent, 
+                                   self.rawserver.add_task, self.config['separate_all'])
             return r
@@ -396,7 +396,8 @@
         # If it's a torrent file, start it
         if r[0] == 200 and path[-1] in ('Packages', 'Packages.gz', 'Packages.bz2'):
-            self.got_Packages(path, r[3])
+            TorrentCreator(path, r[3], self.start_torrent,
+                           self.rawserver.add_task, self.config['separate_all'])
         for (connection, httpreq) in connections:
             # Check to make sure the requester is still waiting
@@ -501,51 +502,10 @@
         self.enqueue_request(connection, file, d, f, httpreq, pieces_needed)
-    def got_Packages(self, path, data):
-        """Process a downloaded Packages file and start a torrent.
-        @type path: C{list} of C{string}
-        @param path: the path of the file to download, starting with the mirror name
-        @type data: C{string}
-        @param data: the downloaded Packages file
-        """
-        try:
-            # Decompress the data
-            if path[-1].endswith('.gz'):
-                compressed = StringIO(data)
-                f = GzipFile(fileobj = compressed)
-                data = f.read()
-            elif path[-1].endswith('.bz2'):
-                data = decompress(data)
-            name = '_'.join(path[:-1])
-            assert data[:8] == "Package:"
-            h = data.split('\n')
-        except:
-            logger.warning('Packages file is not in the correct format')
-            return 
-        sub_pieces = getsubpieces('_'.join(path))
-        (info, info_all) = getpieces(h, separate_all = self.config['separate_all'], sub_pieces = sub_pieces)
-        if info and self.config['separate_all'] in (0, 2, 3):
-            self.start_torrent(info, name, path)
-        if info_all and self.config['separate_all'] in (1, 3):
-            self.start_torrent(info_all, convert_all(name), path)
-    def start_torrent(self, info, name, path):
-        response = {'info': info,
-                    'announce': self.config['default_tracker'], 
-                    'name': uniconvert(name)}
-        if path.count('dists'):
-            mirror = 'http://' + '/'.join(path[:path.index('dists')]) + '/'
-            response['deb_mirrors'] = [mirror]
+    def start_torrent(self, response, name, path):
+        if 'announce' not in response or not response['announce']:
+            response['announce'] = self.config['default_tracker']
         infohash = sha(bencode(response['info'])).digest()

Modified: debtorrent/trunk/DebTorrent/BT1/makemetafile.py
URL: http://svn.debian.org/wsvn/debtorrent/debtorrent/trunk/DebTorrent/BT1/makemetafile.py?rev=261&op=diff
--- debtorrent/trunk/DebTorrent/BT1/makemetafile.py (original)
+++ debtorrent/trunk/DebTorrent/BT1/makemetafile.py Fri Aug 17 04:22:48 2007
@@ -25,11 +25,12 @@
 from string import strip
 from DebTorrent.bencode import bencode
 from btformats import check_info
-from threading import Event
+from threading import Event, Thread
 from time import time
 from traceback import print_exc
 from DebTorrent.zurllib import urlopen
 from gzip import GzipFile
+from bz2 import decompress
 from StringIO import StringIO
 from re import subn
 import binascii, logging
@@ -542,3 +543,98 @@
                 make_meta_file(i, url, params, progress = vc)
         except ValueError:
+class TorrentCreator:
+    """Create a torrent metainfo from a downloaded Packages file (threaded).
+    """
+    def __init__(self, path, data, callback, sched, separate_all = 0):
+        """Process a downloaded Packages file and start the torrent making thread.
+        @type path: C{list} of C{string}
+        @param path: the path of the file to download, starting with the mirror name
+        @type data: C{string}
+        @param data: the downloaded Packages file
+        @type callback: C{method}
+        @param callback: the method to call with the torrent when it has been created
+        @type sched: C{method}
+        @param sched: the method to call to schedule future invocation of a function
+        @type separate_all: C{boolean}
+        @param separate_all: whether to separate the architecture:all packages into
+            a separate torrent (optional, defaults to False)
+        """
+        self.path = path
+        self.data = data
+        self.callback = callback
+        self.sched = sched
+        self.separate_all = separate_all
+        self.name = '_'.join(self.path[:-1])
+        self.responses = []
+        # Create and start the thread to create the torrent metainfo
+        logger.debug('starting thread to create torrent for: '+self.name)
+        rq = Thread(target = self._create, name = 'TorrentCreator('+self.name+')')
+        rq.setDaemon(False)
+        rq.start()
+    def _create(self):
+        """Process a downloaded Packages file and start a torrent."""
+        h = []
+        try:
+            # Decompress the data
+            if self.path[-1].endswith('.gz'):
+                compressed = StringIO(self.data)
+                f = GzipFile(fileobj = compressed)
+                self.data = f.read()
+            elif self.path[-1].endswith('.bz2'):
+                self.data = decompress(self.data)
+            assert self.data[:8] == "Package:"
+            h = self.data.split('\n')
+            self.data = ''
+        except:
+            logger.warning('Packages file is not in the correct format')
+            self.data = ''
+            del h[:]
+            self.sched(self._finished)
+            return
+        logger.debug('Packages file successfully decompressed')
+        sub_pieces = getsubpieces('_'.join(self.path))
+        (info, info_all) = getpieces(h, separate_all = self.separate_all, sub_pieces = sub_pieces)
+        del h[:]
+        mirror = []
+        if self.path.count('dists'):
+            mirror.append('http://' + '/'.join(self.path[:self.path.index('dists')]) + '/')
+        name = self.name
+        if info and self.separate_all in (0, 2, 3):
+            response = {'info': info,
+                        'name': uniconvert(name)}
+            if mirror:
+                response['deb_mirrors'] = mirror
+            self.responses.append((response, name))
+        name = convert_all(self.name)
+        if info_all and self.separate_all in (1, 3):
+            response = {'info': info,
+                        'name': uniconvert(name)}
+            if mirror:
+                response['deb_mirrors'] = mirror
+            self.responses.append((response, name))
+        self.sched(self._finished)
+    def _finished(self):
+        """Wrap up the creation and call the callback function."""
+        for (response, name) in self.responses:
+            self.callback(response, name, self.path)
+        del self.responses[:]

