Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cram view gs cloud streaming. #92

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions scripts/stream_cloud_file
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
import sys
import argparse

from terra_notebook_utils import gs
import gs_chunked_io as gscio
"""
This streams a cloud key directly to stdout. This allows:
1. The stream to be modified/sliced along the way.
2. Data to be piped with conventional bash pipes:
e.g., python stream_cloud_file.py | some_tool > output.txt

TODO: This might be able to be replaced with "gsutil cat" if it ever allows multiple slices.
Currently one slice is implemented, for example: "gsutil cat -r 0-99 gs://lons-test/ce#5b.cram"
which would print the first 100 bytes.
"""


def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(description='Streams out a provided gs:// file path.',
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--path', type=str, required=True, help='A google path to stream.')
parser.add_argument('--buffer', type=int, default=16384, help='Size of the buffer used to stream a single file.')
o = parser.parse_args(argv)

assert o.path.startswith('gs://'), 'Please pass in a google bucket path. ' \
'e.g. "gs://example/key/to/file.txt"'
gs_path = o.path.strip()[len('gs://'):].split('/', 1)
assert len(gs_path) == 2, 'Please specify a key in addition to the bucket path. ' \
'e.g. "gs://example/key/to/file.txt"'

bucket = gs.get_client().get_bucket(gs_path[0])
blob = bucket.get_blob(gs_path[1])

with gscio.Reader(blob) as fh:
portion = fh.read(o.buffer)
sys.stdout.buffer.write(portion)
while portion:
portion = fh.read(o.buffer)
sys.stdout.buffer.write(portion)


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion tests/test_cram.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def assert_cram_view_with_no_regions_generates_identical_output(self, cram_uri,
# This check allows us to change samtools versions without significant changes to the test.

def cram_cli(self, cram_uri, crai_uri):
output_file = str(uuid4())
output_file = f'{uuid4()}.cram'
self.clean_up.append(output_file)
cmd = f'xsamtools cram view --cram {cram_uri} --crai {crai_uri} -C --output {output_file}'
log.info(f'Now running: {cmd}')
Expand Down
51 changes: 36 additions & 15 deletions xsamtools/cram.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from collections import namedtuple
from tempfile import TemporaryDirectory
from typing import Optional, Dict, Any, Union
from typing import Optional, Dict, Any, Union, List
from urllib.request import urlretrieve
from terra_notebook_utils import xprofile

Expand Down Expand Up @@ -371,23 +371,36 @@ def format_and_check_cram(cram: str) -> str:
assert os.path.exists(cram)
return cram

def pipe_two_commands(cmd1: List[str], cmd2: List[str]):
log.info(f'Now running proxy for: "{" ".join(cmd1)} | {" ".join(cmd2)}"')
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
p2.communicate()

def write_final_file_with_samtools(cram: str,
crai: Optional[str],
regions: Optional[str],
cram_format: bool,
output: str) -> None:
region_args = ' '.join(regions.split(',')) if regions else ''
cram_format_arg = '-C' if cram_format else ''
if crai:
crai_arg = f'-X {crai}'
else:
log.warning('No crai file present, this may take a while.')
crai_arg = ''
if not crai:
log.warning('No crai file specified, this might take a long time while samtools generates it...')

cmd = f'samtools view {cram_format_arg} {cram} {crai_arg} {region_args}'
cmd1 = ['scripts/stream_cloud_file', '--path', cram] if cram.startswith('gs://') else ['cat', cram]

log.info(f'Now running: {cmd}')
subprocess.run(cmd, shell=True, stdout=open(output, 'w'), stderr=subprocess.PIPE, check=True)
cmd2 = ['samtools', 'view', '-']
if cram_format:
cmd2 += ['-C']
if crai:
cmd2 += ['-X', crai]
if regions:
cmd2 += regions.split(',')
if output:
cmd2 += ['-o', output]

# samtools seems to produce a benign(?) error: 'samtools view: error closing "-": -1'
# this appears to happen after the cram file is written however, & doesn't seem to affect the result
pipe_two_commands(cmd1, cmd2)
log.debug(f'Output CRAM successfully generated at: {output}')

def stage(uri: str, output: str) -> None:
Expand Down Expand Up @@ -427,13 +440,21 @@ def view(cram: str,
f'Only local file outputs are currently supported.'

with TemporaryDirectory() as staging_dir:
staged_cram = os.path.join(staging_dir, 'tmp.cram')
stage(uri=cram, output=staged_cram)
# defaults
staged_cram = cram
staged_crai = crai

if not cram.startswith('gs://'):
# gs files are skipped because they will be streamed from the cloud directly
# staged files are linked into the stage dir and then streamed with cat
staged_cram = os.path.join(staging_dir, 'tmp.cram')
stage(uri=cram, output=staged_cram)

if crai:
# always stage a crai if the user provides one, allowing samtools to find it...
# otherwise samtools may generate one which can take a very long time
staged_crai = os.path.join(staging_dir, 'tmp.crai')
stage(uri=crai, output=staged_crai)
else:
staged_crai = None

write_final_file_with_samtools(staged_cram, staged_crai, regions, cram_format, output)

Expand Down