From 13fea535043fd9f3e306132a4a267eb07ac99472 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Sat, 2 Jan 2021 00:33:17 -0800 Subject: [PATCH 1/3] Add cram view gs cloud streaming. --- scripts/stream_cloud_file | 44 ++++++++++++++++++++++++++++++++++ tests/test_cram.py | 2 +- xsamtools/cram.py | 50 ++++++++++++++++++++++++++++----------- 3 files changed, 81 insertions(+), 15 deletions(-) create mode 100755 scripts/stream_cloud_file diff --git a/scripts/stream_cloud_file b/scripts/stream_cloud_file new file mode 100755 index 0000000..059b16d --- /dev/null +++ b/scripts/stream_cloud_file @@ -0,0 +1,44 @@ +#!/usr/bin/env python +import sys +import argparse + +from terra_notebook_utils import gs +import gs_chunked_io as gscio +""" +This streams a cloud key directly to stdout. This allows: + 1. The stream to be modified/sliced along the way. + 2. Data to be piped with conventional bash pipes: + e.g., python stream_cloud_file.py | some_tool > output.txt + +TODO: This might be able to be replaced with "gsutil cat" if it ever allows multiple slices. + Currently one slice is implemented, for example: "gsutil cat -r 0-99 gs://lons-test/ce#5b.cram" + which would print the first 100 bytes. +""" + + +def main(argv=sys.argv[1:]): + parser = argparse.ArgumentParser(description='Streams out a provided gs:// file path.', + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('--path', type=str, required=True, help='A google path to stream.') + parser.add_argument('--buffer', type=int, default=16384, help='Size of the buffer used to stream a single file.') + o = parser.parse_args(argv) + + assert o.path.startswith('gs://'), 'Please pass in a google bucket path. ' \ + 'e.g. "gs://example/key/to/file.txt"' + gs_path = o.path.strip()[len('gs://'):].split('/', 1) + assert len(gs_path) == 2, 'Please specify a key in addition to the bucket path. ' \ + 'e.g. "gs://example/key/to/file.txt"' + + bucket = gs.get_client().get_bucket(gs_path[0]) + blob = bucket.get_blob(gs_path[1]) + + with gscio.Reader(blob) as fh: + portion = fh.read(o.buffer) + sys.stdout.buffer.write(portion) + while portion: + portion = fh.read(o.buffer) + sys.stdout.buffer.write(portion) + + +if __name__ == '__main__': + main() diff --git a/tests/test_cram.py b/tests/test_cram.py index 272ced6..d258b26 100755 --- a/tests/test_cram.py +++ b/tests/test_cram.py @@ -124,7 +124,7 @@ def assert_cram_view_with_no_regions_generates_identical_output(self, cram_uri, # This check allows us to change samtools versions without significant changes to the test. def cram_cli(self, cram_uri, crai_uri): - output_file = str(uuid4()) + output_file = f'{uuid4()}.cram' self.clean_up.append(output_file) cmd = f'xsamtools cram view --cram {cram_uri} --crai {crai_uri} -C --output {output_file}' log.info(f'Now running: {cmd}') diff --git a/xsamtools/cram.py b/xsamtools/cram.py index 4fcb643..7874fef 100755 --- a/xsamtools/cram.py +++ b/xsamtools/cram.py @@ -192,23 +192,37 @@ def format_and_check_cram(cram: str) -> str: assert os.path.exists(cram) return cram +def pipe_two_commands(cmd1, cmd2): + log.info(f'Now running proxy for: "{" ".join(cmd1)} | {" ".join(cmd2)}"') + p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE) + p2 = subprocess.Popen(cmd2, stdin=p1.stdout) + p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. + p2.communicate() + def write_final_file_with_samtools(cram: str, crai: Optional[str], regions: Optional[str], cram_format: bool, output: str) -> None: - region_args = ' '.join(regions.split(',')) if regions else '' - cram_format_arg = '-C' if cram_format else '' - if crai: - crai_arg = f'-X {crai}' - else: - log.warning('No crai file present, this may take a while.') - crai_arg = '' + if not crai: + log.warning('No crai file specified, this might take a long time while samtools generates it...') - cmd = f'samtools view {cram_format_arg} {cram} {crai_arg} {region_args}' + cmd1 = ['scripts/stream_cloud_file', '--path', cram] if cram.startswith('gs://') else ['cat', cram] - log.info(f'Now running: {cmd}') - subprocess.run(cmd, shell=True, stdout=open(output, 'w'), stderr=subprocess.PIPE, check=True) + cmd2 = ['samtools', 'view', '-'] + if cram_format: + cmd2 += ['-C'] + if crai: + cmd2 += ['-X', crai] + if regions: + cmd2 += regions.split(',') + if output: + cmd2 += ['-o', output] + + # samtools seems to produce a benign(?) error: 'samtools view: error closing "-": -1' + # this appears to happen after the cram file is written however, & doesn't seem to affect the result + # TODO: 1. make sure that this is the case ^; 2. suppress this error if so + pipe_two_commands(cmd1, cmd2) log.debug(f'Output CRAM successfully generated at: {output}') def stage(uri: str, output: str) -> None: @@ -248,13 +262,21 @@ def view(cram: str, f'Only local file outputs are currently supported.' with TemporaryDirectory() as staging_dir: - staged_cram = os.path.join(staging_dir, 'tmp.cram') - stage(uri=cram, output=staged_cram) + # defaults + staged_cram = cram + staged_crai = crai + + if not cram.startswith('gs://'): + # gs files are skipped because they will be streamed from the cloud directly + # staged files are linked into the stage dir and then streamed with cat + staged_cram = os.path.join(staging_dir, 'tmp.cram') + stage(uri=cram, output=staged_cram) + if crai: + # always stage a crai if the user provides one, allowing samtools to find it... + # otherwise samtools may generate one which can take a very long time staged_crai = os.path.join(staging_dir, 'tmp.crai') stage(uri=crai, output=staged_crai) - else: - staged_crai = None write_final_file_with_samtools(staged_cram, staged_crai, regions, cram_format, output) From eb8c27584811f5280fb34e5a7965c9c334a37e81 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Sat, 2 Jan 2021 00:34:30 -0800 Subject: [PATCH 2/3] Cruft. --- xsamtools/cram.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xsamtools/cram.py b/xsamtools/cram.py index 7874fef..811385b 100755 --- a/xsamtools/cram.py +++ b/xsamtools/cram.py @@ -221,7 +221,6 @@ def write_final_file_with_samtools(cram: str, # samtools seems to produce a benign(?) error: 'samtools view: error closing "-": -1' # this appears to happen after the cram file is written however, & doesn't seem to affect the result - # TODO: 1. make sure that this is the case ^; 2. suppress this error if so pipe_two_commands(cmd1, cmd2) log.debug(f'Output CRAM successfully generated at: {output}') From 22b665d4d32c9e708d37c5d34001ae60468c6103 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Sat, 2 Jan 2021 00:40:00 -0800 Subject: [PATCH 3/3] Type hints. --- xsamtools/cram.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xsamtools/cram.py b/xsamtools/cram.py index 811385b..792e73e 100755 --- a/xsamtools/cram.py +++ b/xsamtools/cram.py @@ -13,7 +13,7 @@ from collections import namedtuple from tempfile import TemporaryDirectory -from typing import Optional +from typing import Optional, List from urllib.request import urlretrieve from terra_notebook_utils import xprofile @@ -192,7 +192,7 @@ def format_and_check_cram(cram: str) -> str: assert os.path.exists(cram) return cram -def pipe_two_commands(cmd1, cmd2): +def pipe_two_commands(cmd1: List[str], cmd2: List[str]): log.info(f'Now running proxy for: "{" ".join(cmd1)} | {" ".join(cmd2)}"') p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE) p2 = subprocess.Popen(cmd2, stdin=p1.stdout)