[pkg-eucalyptus-commits] [SCM] managing cloud instances for Eucalyptus branch, master, updated. 3.0.0-alpha3-257-g1da8e3a
Garrett Holmstrom
gholms at fedoraproject.org
Sun Jun 16 02:31:00 UTC 2013
The following commit has been merged in the master branch:
commit 7e3e56d9044ae4e901dd362111e1929a865e806e
Author: Garrett Holmstrom <gholms at fedoraproject.org>
Date: Mon Apr 8 16:03:28 2013 -0700
WIP on bundler (now has a class)
diff --git a/euca2ools/commands/bundle/bundleimage2.py b/euca2ools/commands/bundle/bundleimage2.py
index 42d9f38..692ec75 100644
--- a/euca2ools/commands/bundle/bundleimage2.py
+++ b/euca2ools/commands/bundle/bundleimage2.py
@@ -15,6 +15,127 @@ import threading
import time
+## TODO: make the progress bar optional
+
+
+class Bundle(object):
+ DEFAULT_PART_SIZE = 1024 * 1024 ## FIXME
+
+ def __init__(self):
+ self.enc_key = None
+ self.enc_iv = None
+ self.parts = None
+ self.tarball_sha1sum = None
+ self._lock = threading.Lock()
+
+ @classmethod
+ def create_from_image(cls, image_filename, part_prefix, part_size=None,
+ show_progress=False):
+ new_bundle = cls()
+ new_bundle.__create_from_image(image_filename, part_prefix,
+ part_size=part_size,
+ show_progress=show_progress)
+ return new_bundle
+
+ def __create_from_image(self, image_filename, part_prefix, part_size=None,
+ show_progress=False):
+ if part_size is None:
+ part_size = self.DEFAULT_PART_SIZE
+ # pipe for getting the digest from sha1sum
+ digest_pipe_out, digest_pipe_in = multiprocessing.Pipe(duplex=False)
+ # pipe for tar --> sha1sum
+ tar_out_pipe_out, tar_out_pipe_in = os.pipe()
+ # pipe for sha1sum --> gzip
+ sha_out_pipe_out, sha_out_pipe_in = os.pipe()
+
+ # tar --> sha1sum
+ #
+ # Digest calculation is a little processor-intensive, so it goes in its
+ # own process.
+ #
+ # That conveniently lets us avoid the annoyances of streaming lots of
+ # data between two threads by letting us simply use OS pipes.
+ pid = os.fork()
+ if pid == 0:
+ digest_pipe_out.close()
+ os.close(tar_out_pipe_in)
+ os.close(sha_out_pipe_out)
+ _calc_digest_and_exit(tar_out_pipe_out, sha_out_pipe_in,
+ digest_pipe_in)
+ digest_pipe_in.close()
+ os.close(tar_out_pipe_out)
+ os.close(sha_out_pipe_in)
+
+ # sha1sum --> gzip
+ try:
+ gzip = subprocess.Popen(['pigz', '-c'], stdin=sha_out_pipe_out,
+ stdout=subprocess.PIPE, close_fds=True,
+ bufsize=-1)
+ except OSError:
+ gzip = subprocess.Popen(['gzip', '-c'], stdin=sha_out_pipe_out,
+ stdout=subprocess.PIPE, close_fds=True,
+ bufsize=-1)
+ os.close(sha_out_pipe_out)
+
+ # gzip --> openssl
+ srand = random.SystemRandom()
+ key = format(srand.getrandbits(128), 'x')
+ iv = format(srand.getrandbits(128), 'x')
+ with self._lock:
+ self.key = key
+ self.iv = iv
+ openssl = subprocess.Popen(['openssl', 'enc', '-e', '-aes-128-cbc',
+ '-K', key, '-iv', iv],
+ stdin=gzip.stdout, stdout=subprocess.PIPE,
+ close_fds=True, bufsize=-1)
+
+ # openssl --> writer
+ writer_thread = threading.Thread(target=self._write_parts,
+ args=(openssl.stdout, part_prefix,
+ part_size))
+ writer_thread.start()
+
+ # Drive everything by feeding tar
+ with open(image_filename) as image:
+ with os.fdopen(tar_out_pipe_in, 'w') as tar_input:
+ _write_tarball(image, tar_input, show_progress=show_progress)
+ writer_thread.join()
+
+ digest = digest_pipe_out.recv()
+ digest_pipe_out.close()
+ with self._lock:
+ self.digest = digest
+
+ def _write_parts(self, infile, part_prefix, part_size):
+ with self._lock:
+ self.parts = []
+ for part_no in itertools.count():
+ part_fname = '{0}.part.{1}'.format(part_prefix, part_no)
+ part_info = _write_single_part(infile, part_fname, part_size)
+ with self._lock:
+ self.parts.append(part_info)
+ if part_info['size'] < part_size:
+ # That's the last part
+ return
+
+
+def _write_tarball(infile, outfile, show_progress=False):
+ widgets = [progressbar.Percentage(), ' ', progressbar.Bar(marker='='), ' ',
+ progressbar.FileTransferSpeed(), ' ', progressbar.AdaptiveETA()]
+ bar = progressbar.ProgressBar(maxval=os.path.getsize(infile.name),
+ widgets=widgets)
+ tar_thread = threading.Thread(target=_add_fileobj_to_tarball,
+ args=(infile, outfile))
+ tar_thread.start()
+ if show_progress:
+ bar.start()
+ while tar_thread.is_alive():
+ bar.update(infile.tell())
+ time.sleep(0.5)
+ bar.finish()
+ tar_thread.join()
+
+
def _add_fileobj_to_tarball(infile, outfile):
tarball = tarfile.open(mode='w|', fileobj=outfile)
try:
@@ -25,23 +146,7 @@ def _add_fileobj_to_tarball(infile, outfile):
tarball.close()
-def write_tarball(infile, outfile):
- widgets = ['Bundling ', progressbar.Bar(), ' ', progressbar.Percentage(),
- ' ', progressbar.FileTransferSpeed(), ' ', progressbar.ETA()]
- bar = progressbar.ProgressBar(maxval=os.path.getsize(infile.name),
- widgets=widgets)
- tar_thread = threading.Thread(target=_add_fileobj_to_tarball,
- args=(infile, outfile))
- tar_thread.start()
- bar.start()
- while tar_thread.is_alive():
- bar.update(infile.tell())
- time.sleep(0.25)
- tar_thread.join()
- bar.finish()
-
-
-def calc_digest_and_exit(in_fileno, out_fileno, result_pipe):
+def _calc_digest_and_exit(in_fileno, out_fileno, result_pipe):
infile = os.fdopen(in_fileno)
outfile = os.fdopen(out_fileno, 'w')
digest = hashlib.sha1()
@@ -59,90 +164,10 @@ def calc_digest_and_exit(in_fileno, out_fileno, result_pipe):
sys.exit()
-def bundle_image(image_filename, bundle_filename):
- # pipe for getting the digest from sha1sum
- digest_pipe_out, digest_pipe_in = multiprocessing.Pipe(duplex=False)
- # pipe for getting the part info from the part writer
- writer_pipe_out, writer_pipe_in = multiprocessing.Pipe(duplex=False)
- # pipe for tar --> sha1sum
- tar_out_pipe_out, tar_out_pipe_in = os.pipe()
- # pipe for sha1sum --> gzip
- sha_out_pipe_out, sha_out_pipe_in = os.pipe()
-
- # tar --> sha1sum
- # Digest calculation is a little more processor-intensive, so it goes in
- # its own process.
- # That conveniently lets us avoid the annoyances of passing lots of data
- # between two threads by letting us simply use OS pipes.
- pid = os.fork()
- if pid == 0:
- digest_pipe_out.close()
- os.close(tar_out_pipe_in)
- os.close(sha_out_pipe_out)
- calc_digest_and_exit(tar_out_pipe_out, sha_out_pipe_in, digest_pipe_in)
- digest_pipe_in.close()
- os.close(tar_out_pipe_out)
- os.close(sha_out_pipe_in)
-
- # sha1sum --> gzip
- try:
- gzip = subprocess.Popen(['pigz', '-c'], stdin=sha_out_pipe_out,
- stdout=subprocess.PIPE, close_fds=True,
- bufsize=-1)
- except OSError:
- gzip = subprocess.Popen(['gzip', '-c'], stdin=sha_out_pipe_out,
- stdout=subprocess.PIPE, close_fds=True,
- bufsize=-1)
- os.close(sha_out_pipe_out)
-
- # gzip --> openssl
- srand = random.SystemRandom()
- key = format(srand.getrandbits(128), 'x')
- iv = format(srand.getrandbits(128), 'x')
- openssl = subprocess.Popen(['openssl', 'enc', '-e', '-aes-128-cbc',
- '-K', key, '-iv', iv], stdin=gzip.stdout,
- stdout=subprocess.PIPE, close_fds=True,
- bufsize=-1)
-
- # openssl --> writer
- writer_thread = threading.Thread(target=write_parts,
- args=(openssl.stdout, bundle_filename,
- writer_pipe_in))
- writer_thread.start()
-
- # Drive everything by feeding tar
- with open(image_filename) as image:
- with os.fdopen(tar_out_pipe_in, 'w') as tar_input:
- write_tarball(image, tar_input)
- writer_thread.join()
-
- digest = digest_pipe_out.recv()
- digest_pipe_out.close()
- parts = writer_pipe_out.recv()
- writer_pipe_out.close()
- return (parts, digest, key, iv)
-
-
-PART_SIZE = 1024 * 1024
-
-
-def write_parts(infile, part_prefix, result_pipe):
- parts = []
- for part_no in itertools.count():
- part_fname = '{0}.part.{1}'.format(part_prefix, part_no)
- print 'Writing part', part_fname
- part_info = _write_single_part(infile, part_fname)
- parts.append(part_info)
- if part_info['size'] < PART_SIZE:
- # That's the last part
- result_pipe.send(parts)
- return
-
-
-def _write_single_part(infile, part_fname):
+def _write_single_part(infile, part_fname, part_size):
part_digest = hashlib.sha1()
with open(part_fname, 'w') as part:
- bytes_to_write = PART_SIZE
+ bytes_to_write = part_size
while bytes_to_write > 0:
chunk = infile.read(min((bytes_to_write, 65536)))
if chunk:
@@ -158,7 +183,9 @@ def _write_single_part(infile, part_fname):
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('image_filename')
- parser.add_argument('bundle_filename')
+ parser.add_argument('part_prefix')
args = parser.parse_args()
- result = bundle_image(args.image_filename, args.bundle_filename)
- print result
+ bundle = Bundle.create_from_image(args.image_filename, args.part_prefix,
+ part_size=None, show_progress=True)
+ from pprint import pprint
+ pprint(vars(bundle))
--
managing cloud instances for Eucalyptus
More information about the pkg-eucalyptus-commits
mailing list