Skip to content

Commit

Permalink
First round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
samanvp committed May 19, 2020
1 parent 3f3666a commit fab844d
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 23 deletions.
4 changes: 3 additions & 1 deletion cloudbuild_CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ steps:
- '--project ${PROJECT_ID}'
- '--image_tag ${COMMIT_SHA}'
- '--run_unit_tests'
- '--run_presubmit_tests'
- '--run_preprocessor_tests'
- '--run_bq_to_vcf_tests'
- '--run_all_tests'
- '--test_name_prefix cloud-ci-'
id: 'test-gcp-variant-transforms-docker'
entrypoint: '/opt/gcp_variant_transforms/src/deploy_and_run_tests.sh'
Expand Down
51 changes: 47 additions & 4 deletions gcp_variant_transforms/libs/bigquery_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
_BQ_EXTRACT_SCHEMA_COMMAND = (
'bq show --schema --format=prettyjson {FULL_TABLE_ID} > {SCHEMA_FILE_PATH}')
_GCS_DELETE_FILES_COMMAND = 'gsutil -m rm -f -R {ROOT_PATH}'
_BQ_NUM_RETRIES = 3
_BQ_NUM_RETRIES = 5
_MAX_NUM_CONCURRENT_BQ_LOAD_JOBS = 4

_GET_COLUMN_NAMES_QUERY = (
Expand Down Expand Up @@ -502,7 +502,21 @@ def create_sample_info_table(output_table_id):
_run_table_creation_command(bq_command)

class FlattenCallColumn(object):
"""Flattens call column to convert varinat opt tables to sample opt tables."""

def __init__(self, base_table_id, suffixes):
# type (str, List[str]) -> None
"""Initialize `FlattenCallColumn` object.
In preparation to convert variant lookup optimized tables to sample lookup
optimized tables, we initiate this class with the base table name of variant
opt table (set using --output_table flag) and the list of suffixes (which
are extracted from sharding config file).
Args:
base_table_id: Base name of variant opt outputs (set by --output_table).
suffixes: List of suffixes (extracted from sharding config file).
"""
(self._project_id,
self._dataset_id,
self._base_table) = parse_table_reference(base_table_id)
Expand Down Expand Up @@ -589,7 +603,7 @@ def _copy_to_flatten_table(self, output_table_id, cp_query):
break
logging.info('Copy to table query was successful: %s', output_table_id)

def _create_temp_flatten_table(self):
def _create_temp_flatten_table_with_1_row(self):
temp_suffix = time.strftime('%Y%m%d_%H%M%S')
temp_table_id = '{}{}'.format(self._schema_table_id, temp_suffix)
full_output_table_id = '{}.{}.{}'.format(
Expand All @@ -610,7 +624,21 @@ def _create_temp_flatten_table(self):
return temp_table_id

def get_flatten_table_schema(self, schema_file_path):
temp_table_id = self._create_temp_flatten_table()
# type: (str) -> bool
"""Write the flatten table's schema to the given json file.
This method basically performs the following tasks:
* Composes a 'flattening query' based on _schema_table_id table's schema.
* Runs the 'flattening query' to read 1 row and writes it to a temp table.
* Extracts the schema of the temp table using _BQ_EXTRACT_SCHEMA_COMMAND.
Args:
schema_file_path: The json schema will be written to this file.
Returns;
A bool value indicating if the schema was successfully extracted.
"""
temp_table_id = self._create_temp_flatten_table_with_1_row()
full_table_id = '{}:{}.{}'.format(
self._project_id, self._dataset_id, temp_table_id)
bq_command = _BQ_EXTRACT_SCHEMA_COMMAND.format(
Expand All @@ -626,9 +654,24 @@ def get_flatten_table_schema(self, schema_file_path):
logging.info('Successfully deleted temporary table: %s', full_table_id)
else:
logging.error('Was not able to delete temporary table: %s', full_table_id)
return result
return result == 0

def copy_to_flatten_table(self, output_base_table_id):
# type: (str) -> None
"""Copies data from variant lookup optimized tables to sample lookup tables.
Copies rows from _base_table_id__* to output_base_table_id__* for each value
in _suffixes. Here we assume destination tables are already created and are
partitioned based on call_sample_id column. The copying process is done via
a flattening query similar to the one used in get_flatten_table_schema().
Note that if source tables have repeated sample_ids then output table will
have more rows than input table. Essentially:
Number of output rows = Number of input rows * Number of repeated sample_ids
Args:
output_base_table_id: Base table name of output tables.
"""
# Here we assume all output_table_base + suffices[:] are already created.
(output_project_id,
output_dataset_id,
Expand Down
28 changes: 15 additions & 13 deletions gcp_variant_transforms/libs/bigquery_util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,24 +521,26 @@ class FlattenCallColumnTest(unittest.TestCase):
"""Test cases for class `FlattenCallColumn`."""

def setUp(self):
# We never query this table for running the following test, however, the
# mock values are based on this table's schema. In other words:
# mock_columns.return_value = self._flatter._get_column_names()
# mock_sub_fields.return_value = self._flatter._get_call_sub_fields()
input_base_table = ('gcp-variant-transforms-test:'
'bq_to_vcf_integration_tests.'
'merge_option_move_to_calls')
self._flatter = bigquery_util.FlattenCallColumn(input_base_table, ['chr20'])

def test_get_column_names(self):
expected_column_names = ['reference_name', 'start_position', 'end_position',
'reference_bases', 'alternate_bases', 'names',
'quality', 'filter', 'call', 'NS', 'DP', 'AA',
'DB', 'H2']
self.assertEqual(expected_column_names, self._flatter._get_column_names())

def test_get_call_sub_fields(self):
expected_sub_fields = \
['sample_id', 'genotype', 'phaseset', 'DP', 'GQ', 'HQ']
self.assertEqual(expected_sub_fields, self._flatter._get_call_sub_fields())

def test_get_flatten_column_names(self):
@mock.patch('gcp_variant_transforms.libs.bigquery_util_test.bigquery_util.'
'FlattenCallColumn._get_column_names')
@mock.patch('gcp_variant_transforms.libs.bigquery_util_test.bigquery_util.'
'FlattenCallColumn._get_call_sub_fields')
def test_get_flatten_column_names(self, mock_sub_fields, mock_columns):
mock_columns.return_value = (
['reference_name', 'start_position', 'end_position', 'reference_bases',
'alternate_bases', 'names', 'quality', 'filter', 'call', 'NS', 'DP',
'AA', 'DB', 'H2'])
mock_sub_fields.return_value = (
['sample_id', 'genotype', 'phaseset', 'DP', 'GQ', 'HQ'])
expected_select = (
'main_table.reference_name AS `reference_name`, '
'main_table.start_position AS `start_position`, '
Expand Down
16 changes: 12 additions & 4 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,14 @@ def add_arguments(self, parser):
'--sample_lookup_optimized_output_table',
default='',
help=('In addition to the default output tables (which are optimized '
'for variant look up queries), you can store a second copy of '
'for variant lookup queries), you can store a second copy of '
'your data in BigQuery tables that are optimized for sample '
'look up queries. Note that setting this option will double your '
'BigQuery storage costs.'))
'lookup queries using this flag.'
'Note that setting this flag will *at least* double your '
'BigQuery storage costs. If your input VCF files are joint '
'genotyped (say with n sample) then sample lookup tables will '
'have n * the number of rows of their corresponding variant '
'lookup table.'))
parser.add_argument(
'--output_avro_path',
default='',
Expand Down Expand Up @@ -236,6 +240,10 @@ def validate(self, parsed_args, client=None):
parsed_args.sharding_config_path, parsed_args.append)

if parsed_args.sample_lookup_optimized_output_table:
if (parsed_args.output_table ==
parsed_args.sample_lookup_optimized_output_table):
raise ValueError('sample_lookup_optimized_output_table cannot be the '
'same as output_table.')
self._validate_output_tables(
client, parsed_args.sample_lookup_optimized_output_table,
parsed_args.sharding_config_path, parsed_args.append)
Expand All @@ -245,7 +253,7 @@ def _validate_output_tables(self, client,
sharding_config_path, append):
if (output_table_base_name !=
bigquery_util.get_table_base_name(output_table_base_name)):
raise ValueError(('Output table cannot contain "{}" we reserve this '
raise ValueError(('Output table cannot contain "{}". we reserve this '
'string to mark sharded output tables.').format(
bigquery_util.TABLE_SUFFIX_SEPARATOR))

Expand Down
2 changes: 1 addition & 1 deletion gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def run(argv=None):
known_args.output_table, suffixes)
try:
flatten_schema_file = tempfile.mkstemp(suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
if flatten_call_column.get_flatten_table_schema(flatten_schema_file) != 0:
if not flatten_call_column.get_flatten_table_schema(flatten_schema_file):
raise ValueError('Failed to extract schema of flatten table')
# Create output flatten tables if needed
if not known_args.append:
Expand Down

0 comments on commit fab844d

Please sign in to comment.