Skip to content

Add a PoC disk_estimator feature to the vcf_to_bq preprocessor. #335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions gcp_variant_transforms/beam_io/vcf_file_size_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A source for estimating the size of VCF files when processed by vcf_to_bq."""

from __future__ import absolute_import

from typing import Iterable, List, Tuple # pylint: disable=unused-import
import logging
import itertools

import apache_beam as beam
from apache_beam import coders
from apache_beam import transforms
from apache_beam.io import filebasedsource
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io import iobase
from apache_beam.io import range_trackers # pylint: disable=unused-import

from gcp_variant_transforms.beam_io import vcfio


# Number of lines from each VCF that should be read when estimating disk usage.
SNIPPET_READ_SIZE = 50

def _get_file_size(file_name):
# type: (str) -> List[FileSizeInfo]
matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list
if len(matched_files) != 1:
raise IOError("File name {} did not correspond to exactly 1 result. "
"Instead, got {} matches.".format(file_name,
len(matched_files)))
file_metadata = matched_files[0]

compression_type = filesystem.CompressionTypes.detect_compression_type(
file_metadata.path)
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED:
logging.error("VCF file %s is compressed; disk requirement estimator "
"will not be accurate.", file_metadata.path)
return file_metadata.size_in_bytes


class FileSizeInfo(object):
def __init__(self, raw_size, encoded_size=None, name="[no filename]"):
# type: (int, int, str) -> None
self.raw_size = raw_size
self.encoded_size = encoded_size # Allow direct initialization
self.name = name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you use the name anywhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this in the generate_report module as well; WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I just cannot find where you are using it - in generate_report you call _append_disk_usage_estimate_to_report which seems to only be getting raw size and encoded size, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this was my fault; I either cited some earlier revision incorrectly or misspoke/misread entirely

The self.name is only consumed when there's some issue reading the file in estimate_encoded_file_size so that a more descriptive message can be logged:

      logging.warning("File %s appears to have no valid Variant lines. File "
                      "will be ignored for size estimation.", self.name)


def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are populating self.encoded_size, lets use the same name here (ie by dropping 'file_') or vice versa.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we seem to be missing the types for the args.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment might be from an earlier revision; there are type annotations at line 63 for this method, are you referring to something else?

# type: (int, int) -> None
"""Estimate encoded file size, given the sizes for the raw file, sample raw
lines and sample encoded lines.
"""
if raw_sample_size == 0:
# Propagate in-band error state to avoid divide-by-zero.
logging.warning("File %s appears to have no valid Variant lines. File "
"will be ignored for size estimation.", self.name)
self.encoded_size = 0
self.raw_size = 0
else:
self.encoded_size = (self.raw_size * encoded_sample_size /
raw_sample_size)


class FileSizeInfoSumFn(beam.CombineFn):
"""Combiner Function, used to sum up the size fields of FileSizeInfo objects.
"""
def create_accumulator(self):
# type: (None) -> Tuple[int, int]
return (0, 0) # (raw, encoded) sums

def add_input(self, (raw, encoded), file_size_info):
# type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int]
return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size

def merge_accumulators(self, accumulators):
# type: (Iterable[Tuple[int, int]]) -> Tuple[int, int]
raw, encoded = zip(*accumulators)
return sum(raw), sum(encoded)

def extract_output(self, (raw, encoded)):
# type: (Tuple[int, int]) -> FileSizeInfo
return FileSizeInfo(raw, encoded)


class _EstimateVcfSizeSource(filebasedsource.FileBasedSource):
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`.

This source first obtains the raw file sizes of a set of VCF files. Then,
the source reads a limited number of variants from a set of VCF files,
both as raw strings and encoded `Variant` objects. Finally, the reader
returns a single `FileSizeInfo` object with an estimate of the input size
if all sizes had been encoded as `Variant` objects.

Lines that are malformed are skipped.
"""

DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB

def __init__(self,
file_pattern,
sample_size,
compression_type=filesystem.CompressionTypes.AUTO,
validate=True,
vcf_parser_type=vcfio.VcfParserType.PYVCF):
# type: (str, int, str, bool, vcfio.VcfParserType) -> None
super(_EstimateVcfSizeSource, self).__init__(
file_pattern,
compression_type=compression_type,
validate=validate,
splittable=False)
self._compression_type = compression_type
self._sample_size = sample_size
self._vcf_parser_type = vcf_parser_type

def read_records(
self,
file_name, # type: str
range_tracker # type: range_trackers.UnsplittableRangeTracker
):
# type: (...) -> Iterable[FileSizeInfo]
"""This "generator" only emits a single FileSizeInfo object per file."""
vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type)
record_iterator = vcf_parser_class(
file_name,
range_tracker,
self._compression_type,
allow_malformed_records=True,
file_pattern=self._pattern,
representative_header_lines=None,
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE,
skip_header_lines=0)

raw_file_size = _get_file_size(file_name)

# Open distinct channel to read lines as raw bytestrings.
with filesystems.FileSystems.open(file_name,
self._compression_type) as raw_iterator:
count, raw_size, encoded_size = 0, 0, 0
for encoded_record, raw_record in itertools.izip(record_iterator,
raw_iterator):
while raw_record and raw_record.startswith('#'):
# Skip headers. Assume that header size is negligible.
raw_record = raw_iterator.next()
logging.debug(
"Reading record for disk usage estimation. Encoded variant: %s\n"
"Raw variant: %s", encoded_record, raw_record)
if count >= self._sample_size:
break
if not isinstance(encoded_record, vcfio.Variant):
logging.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it for malformed variants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I understand is that we have two iterators here, one for the encoded variant, one for raw record. While we skip some encoded variants here, should be skip the raw record as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct; I was concerned about ill-formed lines but didn't want to crash the estimator, so I just decided to try skipping these (though if an enormous proportion of lines are malformed, then this will tend to make the tool overestimate size). I don't have a very specific idea of how often/why variants tend to be malformed though, and I'm am open to ideas here, especially around the condition that I'm checking for

Oh this is a bug, I was intending to skip both raw and encoded; sorry about that, I'm rewriting this to use itertools which should be less errorprone

"Skipping VCF line that could not be decoded as a "
"`vcfio.Variant` in file %s: %s", file_name, raw_record)
continue
# Encoding in `utf-8` should represent the string as one byte per char,
# even for non-ASCII chars. Python adds significant overhead to the
# bytesize of the full str object.
raw_size += len(raw_record.encode('utf-8'))
encoded_size += coders.registry.get_coder(vcfio.Variant).estimate_size(
encoded_record)
count += 1

file_size_info = FileSizeInfo(raw_file_size, name=file_name)
file_size_info.estimate_encoded_file_size(raw_size, encoded_size)
yield file_size_info


class EstimateVcfSize(transforms.PTransform):
"""PTransform estimating encoded size of VCFs without reading whole files.

Output is a PCollection with a single FileSizeInfo object representing the
aggregate encoded size estimate.
"""

def __init__(
self,
file_pattern, # type: str
sample_size, # type: int
compression_type=filesystem.CompressionTypes.AUTO, # type: str
validate=True, # type: bool
**kwargs # type: **str
):
# type: (...) -> None
"""Initialize the :class:`ReadVcfHeaders` transform.

Args:
file_pattern: The file path to read from either as a single file or a glob
pattern.
sample_size: The number of lines that should be read from the file.
compression_type: Used to handle compressed input files.
Typical value is :attr:`CompressionTypes.AUTO
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
underlying file_path's extension will be used to detect the compression.
validate: Flag to verify that the files exist during the pipeline creation
time.
"""
super(EstimateVcfSize, self).__init__(**kwargs)
self._source = _EstimateVcfSizeSource(
file_pattern, sample_size, compression_type, validate=validate)

def expand(self, pvalue):
return (pvalue.pipeline
| iobase.Read(self._source)
| beam.CombineGlobally(FileSizeInfoSumFn()))
Empty file.
47 changes: 25 additions & 22 deletions gcp_variant_transforms/beam_io/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

from __future__ import absolute_import

from typing import Any, Iterable, List, Tuple # pylint: disable=unused-import
from typing import Any, Iterable, List, Tuple, Type # pylint: disable=unused-import
from functools import partial
import enum

import apache_beam as beam
from apache_beam import transforms
from apache_beam.coders import coders
from apache_beam.io import filebasedsource
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io import iobase
from apache_beam.io import range_trackers # pylint: disable=unused-import
from apache_beam.io import textio
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.transforms import PTransform

from gcp_variant_transforms.beam_io import bgzf_io
from gcp_variant_transforms.beam_io import vcf_parser
Expand All @@ -56,6 +56,16 @@ class VcfParserType(enum.Enum):
PYVCF = 0
NUCLEUS = 1

def get_vcf_parser(vcf_parser_type):
# type: (VcfParserType) -> Type[vcf_parser.VcfParser]
if vcf_parser_type == VcfParserType.PYVCF:
return vcf_parser.PyVcfParser
elif vcf_parser_type == VcfParserType.NUCLEUS:
return vcf_parser.NucleusParser
else:
raise ValueError(
'Unrecognized _vcf_parser_type: %s.' % str(vcf_parser_type))


class _ToVcfRecordCoder(coders.Coder):
"""Coder for encoding :class:`Variant` objects as VCF text lines."""
Expand Down Expand Up @@ -193,7 +203,7 @@ class _VcfSource(filebasedsource.FileBasedSource):
def __init__(self,
file_pattern, # type: str
representative_header_lines=None, # type: List[str]
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int
validate=True, # type: bool
allow_malformed_records=False, # type: bool
Expand All @@ -214,14 +224,7 @@ def read_records(self,
range_tracker # type: range_trackers.OffsetRangeTracker
):
# type: (...) -> Iterable[MalformedVcfRecord]
vcf_parser_class = None
if self._vcf_parser_type == VcfParserType.PYVCF:
vcf_parser_class = vcf_parser.PyVcfParser
elif self._vcf_parser_type == VcfParserType.NUCLEUS:
vcf_parser_class = vcf_parser.NucleusParser
else:
raise ValueError(
'Unrecognized _vcf_parser_type: %s.' % str(self._vcf_parser_type))
vcf_parser_class = get_vcf_parser(self._vcf_parser_type)
record_iterator = vcf_parser_class(
file_name,
range_tracker,
Expand Down Expand Up @@ -280,7 +283,7 @@ def expand(self, pcoll):
| 'ReadBlock' >> beam.ParDo(self._read_records))


class ReadFromVcf(PTransform):
class ReadFromVcf(transforms.PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading VCF
files.

Expand All @@ -294,7 +297,7 @@ def __init__(
self,
file_pattern=None, # type: str
representative_header_lines=None, # type: List[str]
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
validate=True, # type: bool
allow_malformed_records=False, # type: bool
vcf_parser_type=VcfParserType.PYVCF, # type: int
Expand Down Expand Up @@ -325,7 +328,7 @@ def __init__(
vcf_parser_type=vcf_parser_type)

def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)
return pvalue.pipeline | iobase.Read(self._source)


def _create_vcf_source(
Expand All @@ -337,7 +340,7 @@ def _create_vcf_source(
allow_malformed_records=allow_malformed_records)


class ReadAllFromVcf(PTransform):
class ReadAllFromVcf(transforms.PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading a
:class:`~apache_beam.pvalue.PCollection` of VCF files.

Expand All @@ -355,7 +358,7 @@ def __init__(
self,
representative_header_lines=None, # type: List[str]
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, # type: int
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
allow_malformed_records=False, # type: bool
**kwargs # type: **str
):
Expand Down Expand Up @@ -384,21 +387,21 @@ def __init__(
allow_malformed_records=allow_malformed_records)
self._read_all_files = filebasedsource.ReadAllFiles(
True, # splittable
CompressionTypes.AUTO, desired_bundle_size,
filesystem.CompressionTypes.AUTO, desired_bundle_size,
0, # min_bundle_size
source_from_file)

def expand(self, pvalue):
return pvalue | 'ReadAllFiles' >> self._read_all_files


class WriteToVcf(PTransform):
class WriteToVcf(transforms.PTransform):
"""A PTransform for writing to VCF files."""

def __init__(self,
file_path,
num_shards=1,
compression_type=CompressionTypes.AUTO,
compression_type=filesystem.CompressionTypes.AUTO,
headers=None):
# type: (str, int, str, List[str]) -> None
"""Initialize a WriteToVcf PTransform.
Expand Down Expand Up @@ -449,7 +452,7 @@ def process(self, (file_path, variants), *args, **kwargs):
file_to_write.write(self._coder.encode(variant))


class WriteVcfDataLines(PTransform):
class WriteVcfDataLines(transforms.PTransform):
"""A PTransform for writing VCF data lines.

This PTransform takes PCollection<`file_path`, `variants`> as input, and
Expand Down
Loading