From 4d177c529c21e1e6b5b4172c40ac43c34ce738cc Mon Sep 17 00:00:00 2001 From: Saman Vaisipour Date: Tue, 31 Mar 2020 11:12:20 -0400 Subject: [PATCH] Enable users to store sample look up optimized tables Add input flag and validate its value. --- .../options/variant_transform_options.py | 132 +++++++++++------- 1 file changed, 81 insertions(+), 51 deletions(-) diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index d3391ab26..763efdeed 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -115,9 +115,27 @@ class BigQueryWriteOptions(VariantTransformsOptions): def add_arguments(self, parser): # type: (argparse.ArgumentParser) -> None - parser.add_argument('--output_table', - default='', - help='BigQuery table to store the results.') + parser.add_argument( + '--output_table', + default='', + help=('Base name of the BigQuery tables which will store the results. ' + 'Note that sharded tables will be named as following: ' + ' * `output_table`__chr1 ' + ' * `output_table`__chr2 ' + ' * ... ' + ' * `output_table`__residual ' + 'where "chr1", "chr2", ..., and "residual" suffixes correspond ' + 'to the value of `table_name_suffix` in the sharding config file ' + '(see --sharding_config_path).')) + + parser.add_argument( + '--sample_lookup_optimized_output_table', + default='', + help=('In addition to the default output tables (which are optimized ' + 'for variant look up queries), you can store a second copy of ' + 'your data in BigQuery tables that are optimized for sample ' + 'look up queries. Note that setting this option will double your ' + 'BigQuery storage costs.')) parser.add_argument( '--sharding_config_path', @@ -187,55 +205,67 @@ def validate(self, parsed_args, client=None): not parsed_args.sharding_config_path.strip()): raise ValueError( '--sharding_config_path must point to a valid config file.') - # Ensuring (not) existence of output tables is aligned with --append value. - if parsed_args.output_table: - if (parsed_args.output_table != - bigquery_util.get_table_base_name(parsed_args.output_table)): - raise ValueError(('Output table cannot contain "{}" we reserve this ' + + if not client: + credentials = GoogleCredentials.get_application_default().create_scoped( + ['https://www.googleapis.com/auth/bigquery']) + client = bigquery.BigqueryV2(credentials=credentials) + if not parsed_args.output_table: + raise ValueError('--output_table must have a value.') + self._validate_output_tables( + client, parsed_args.output_table, + parsed_args.sharding_config_path, parsed_args.append) + + if parsed_args.sample_lookup_optimized_output_table: + self._validate_output_tables( + client, parsed_args.sample_lookup_optimized_output_table, + parsed_args.sharding_config_path, parsed_args.append) + + def _validate_output_tables(self, client, + output_table_base_name, + sharding_config_path, append): + if (output_table_base_name != + bigquery_util.get_table_base_name(output_table_base_name)): + raise ValueError(('Output table cannot contain "{}" we reserve this ' + 'string to mark sharded output tables.').format( + bigquery_util.TABLE_SUFFIX_SEPARATOR)) + + project_id, dataset_id, table_id = bigquery_util.parse_table_reference( + output_table_base_name) + bigquery_util.raise_error_if_dataset_not_exists(client, project_id, + dataset_id) + all_output_tables = [] + all_output_tables.append( + bigquery_util.compose_table_name( + table_id, bigquery_util.TABLE_SUFFIX, is_sample=True)) + sharding = variant_sharding.VariantSharding(sharding_config_path) + num_shards = sharding.get_num_shards() + # In case there is no residual in config we will ignore the last shard. + if not sharding.should_keep_shard(sharding.get_residual_index()): + num_shards -= 1 + for i in range(num_shards): + table_suffix = sharding.get_output_table_suffix(i) + if table_suffix != bigquery_util.get_table_base_name(table_suffix): + raise ValueError(('Table suffix cannot contain "{}" we reserve this ' 'string to mark sharded output tables.').format( - bigquery_util.TABLE_SUFFIX_SEPARATOR)) - if not client: - credentials = GoogleCredentials.get_application_default().create_scoped( - ['https://www.googleapis.com/auth/bigquery']) - client = bigquery.BigqueryV2(credentials=credentials) - - project_id, dataset_id, table_id = bigquery_util.parse_table_reference( - parsed_args.output_table) - bigquery_util.raise_error_if_dataset_not_exists(client, project_id, - dataset_id) - all_output_tables = [] - all_output_tables.append( - bigquery_util.compose_table_name( - table_id, bigquery_util.TABLE_SUFFIX, is_sample=True)) - sharding = variant_sharding.VariantSharding( - parsed_args.sharding_config_path) - num_shards = sharding.get_num_shards() - # In case there is no residual in config we will ignore the last shard. - if not sharding.should_keep_shard(sharding.get_residual_index()): - num_shards -= 1 - for i in range(num_shards): - table_suffix = sharding.get_output_table_suffix(i) - if table_suffix != bigquery_util.get_table_base_name(table_suffix): - raise ValueError(('Table suffix cannot contain "{}" we reserve this ' - 'string to mark sharded output tables.').format( - bigquery_util.TABLE_SUFFIX_SEPARATOR)) - all_output_tables.append(bigquery_util.compose_table_name(table_id, - table_suffix)) - - for output_table in all_output_tables: - if parsed_args.append: - if not bigquery_util.table_exist(client, project_id, - dataset_id, output_table): - raise ValueError( - 'Table {}:{}.{} does not exist, cannot append to it.'.format( - project_id, dataset_id, output_table)) - else: - if bigquery_util.table_exist(client, project_id, - dataset_id, output_table): - raise ValueError( - ('Table {}:{}.{} already exists, cannot overwrite it. Please ' - 'set `--append True` if you want to append to it.').format( - project_id, dataset_id, output_table)) + bigquery_util.TABLE_SUFFIX_SEPARATOR)) + all_output_tables.append(bigquery_util.compose_table_name(table_id, + table_suffix)) + + for output_table in all_output_tables: + if append: + if not bigquery_util.table_exist(client, project_id, + dataset_id, output_table): + raise ValueError( + 'Table {}:{}.{} does not exist, cannot append to it.'.format( + project_id, dataset_id, output_table)) + else: + if bigquery_util.table_exist(client, project_id, + dataset_id, output_table): + raise ValueError( + ('Table {}:{}.{} already exists, cannot overwrite it. Please ' + 'set `--append True` if you want to append to it.').format( + project_id, dataset_id, output_table)) class AnnotationOptions(VariantTransformsOptions):