-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add tarball merger #853
add tarball merger #853
Conversation
add a new utility function that merges separate tarballs into one tarball data can be piped in and/or out
…te/viral-ngs into ct-add-tarball-merger
I'll try to look at this more tomorrow... is this pre-requisite for breaking out the tar-repack post-upload step into its own dx applet? Part of me wonders if we should bite the bullet soon and switch our compression/decompression stuff to blosc. Maybe a separate PR. But we could shed all the binaries for pigz, bzip2, lz4 and just add the python library and gain a few other formats like zstd. I also wonder whether we could implement a streaming tarcat standalone method that avoids the unpacking to disk by withholding the EOF zero markers for the first BTW if you add a new top-level python file, there's various places it needs to be added including coveralls/py.test invocation scripts, readthedocs/sphinx code, and possibly some other places I'm forgetting at the moment. |
The intent was to have tar repack functionality as part of viral-ngs, to support either a separate dx applet or repack-capable demux. @mlin now has a branch with a WIP yml-specified applet to perform the merge operation, so this may be a bit redundant, but including it as part of demux would save an extra download of the packed tarball onto the demux instance. Switching (de)compression to blosc should probably be a separate PR since we call the various binaries in various places. Something I'm not sure of is whether some of the magic of blosc relies on compile-time optimizations targeting the instruction set extensions/CPU cache available, or if it compiles to include multiple code paths. We may lose some optimizations if it is installed from a source like conda. The best thing is to probably just try it and see if real-world performance is improved over what we have now. The function included in this PR has a few different code paths. By default it acts as a streaming tarcat and file data never touches the disk; it reads from the untarred stream, buffered by python's tarfile, and writtes directly to the tarfile output stream ( I had the same thought as you about concatenating tarballs in the old-school tape drive way, by stripping the final two 512-byte zero blocks off each ( We can, of course, simply Something I'm not sure of yet is how dxWDL handles an |
Fascinating. A few thoughts. I think some of the speed magic of blosc isn't really about blosc, but the underlying algorithms that it implements.. a lot of the newer ones are computationally simple enough to saturate the memory bandwidth of any machine. Agreed though that we should keep it separate. dxWDL does not currently handle Anyway the short of it is that you're saying that the default execution behavior of this is to be able to repack tarballs while both streaming the inputs and outputs and avoiding any disk I/O. That's great, and quite ideal actually! That should significantly speed up repacking of large tarballs (avoiding the disk I/O). In my mind, a hacked together tarcat would have always been streaming: instead of calculating file sizes and offsets, I would've just introduced a 1kB buffer and dropped any EOF blocks from the stream except for the last one. But better to use an established python library. Of the different code paths, how many of them are tested? Can you test both the The advantage to @mlin 's standalone applet for repack prior to demux is that the repacked tarball always emits first, regardless of whether demux fails, and the standalone applet doesn't need to waste 5 mins pulling the viral-ngs docker image. The downside to it though is that it is hitting disk. Since DNAnexus seems to rely exclusively on non-EBS-backed EC2 instances, the speed hit isn't too big of a deal, but it does mean that the instance sizes have to scale based on the data size (since local disk has to be big enough for the uncompressed flowcell, which wouldn't be true with your repacker). I guess the best of both worlds would be to separate out your python |
Ok, so it sounds like we'll be using @mlin's implementation for several reasons. I can add additional tests for this PR if you think it is useful enough to be included. |
Yeah I think we'd want this anyway, it does look quite useful |
util/file.py
Outdated
raise IOError("An input file of unknown type was provided: %s" % filepath) | ||
return return_obj | ||
|
||
def create_containing_dirs(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not a mkdir_p
style invocation?
util/file.py
Outdated
if out_compressed_tarball != "-": | ||
out_compress_ps = subprocess.Popen(choose_compressor(out_compressed_tarball)["compress_cmd"], stdout=None if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE) | ||
else: | ||
assert out_compressed_tarball != '-' or pipe_hint, "cannot autodetect compression for stdout output unless pipeHint provided" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't out_compressed_tarball != '-'
just checked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also may want to avoid using assert outside of unit test code.
util/file.py
Outdated
if not os.path.exists(path) and len(path): | ||
os.mkdir(path) | ||
|
||
class FileDiverter(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a huge fan of class inside the function. This function is already very long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree, feels ugly but it's also not used beyond here and it relies on the attributes possessing TarInfo attributes so its function is quite internal to this function.
util/file.py
Outdated
|
||
def __del__(self): | ||
self.written_mirror_file.flush() | ||
self.written_mirror_file.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close
calls flush
util/file.py
Outdated
|
||
fileinfo = tar_in.next() | ||
while fileinfo is not None: | ||
if extract_to_disk_path: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break out into subfunction?
util/file.py
Outdated
|
||
if avoid_disk_roundtrip: | ||
fileobj = tar_in.extractfile(fileinfo) | ||
#tar_out.addfile(fileinfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
util/file.py
Outdated
out_compress_ps = subprocess.Popen(choose_compressor(out_compressed_tarball)["compress_cmd"], stdout=None if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE) | ||
else: | ||
assert out_compressed_tarball != '-' or pipe_hint, "cannot autodetect compression for stdout output unless pipeHint provided" | ||
out_compress_ps = subprocess.Popen(choose_compressor(pipe_hint)["compress_cmd"], stdout=None if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DRY this because only the choose_compressor
call is different
file_utils.py
Outdated
help='If specified, the tar contents will also be extracted to a local directory.') | ||
parser.add_argument('--pipeHint', | ||
dest="pipe_hint", | ||
default=".gz", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better UI is not to have the leading .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I was just trying to be consistent with util.file.extract_tarball()
(which we can also change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the pipeHint
used in other places is actually meant to be any kind of file path or URI even, and the logic that matches on it isn't looking for strings that match .gz
, but rather it's doing string.endswith()
calls on it. The idea is that you could just supply a hint or you could be lazy and supply the whole filename, bucket path, or whatever.
file_utils.py
Outdated
'out_tarball', | ||
help='''output tarball (*.tar.gz|*.tar.lz4|*.tar.bz2|-) | ||
Note: if "-" is used, a gzip-compressed tarball | ||
will be written to stdout''') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comment about how output compression is inferred by the file extension.
test/unit/test_file_utils.py
Outdated
|
||
assert_equal_contents(self, inf, outf) | ||
|
||
def test_merge_with_extract(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test avoid_roundtrip = False
as well?
Thanks for the code review! |
…being created when stdout is used
…te/viral-ngs into ct-add-tarball-merger
in tarball unpacking code, allow concatenated tarballs. Owing to tar's history as a way to create Tape ARchive backups, tar files can be joined by being concatenated together. The final block is padded with zeros though (indicating EOF), which can cause tar to terminate prematurely when concatenated tarballs are being unpacked unless it is told to tolerate these early stops. This adds the `--ignore-zeros` flag to make tarball extraction more permissive. Note: this applies only to uncompressed tarballs (including concatenated tarballs within compressed archives). Our tarball repacking code already tolerates such tarballs; background info here: broadinstitute/viral-ngs#853 (comment)
in tarball unpacking code, allow concatenated tarballs. Owing to tar's history as a way to create Tape ARchive backups, tar files can be joined by being concatenated together. The final block is padded with zeros though (indicating EOF), which can cause tar to terminate prematurely when concatenated tarballs are being unpacked unless it is told to tolerate these early stops. This adds the `--ignore-zeros` flag to make tarball extraction more permissive. Note: this applies only to uncompressed tarballs (including concatenated tarballs within compressed archives). Our tarball repacking code already tolerates such tarballs; background info here: broadinstitute/viral-ngs#853 (comment)
add a new utility function that merges separate tarballs into one tarball; data can be piped in and/or out, and the contents can optionally be extracted to disk during the repack