From 030f527646b843baa6f80dc46ff63b1530cb20cc Mon Sep 17 00:00:00 2001 From: Judy Palimonka Date: Mon, 19 Jul 2021 11:36:35 +0200 Subject: [PATCH 1/3] feat(databuilder): add custom extractors --- .../extractor/king_extractor_bq_base.py | 564 ++++++++++++++++++ .../extractor/king_extractor_bq_stats.py | 164 +++++ .../extractor/king_extractor_bq_table.py | 162 +++++ .../extractor/king_extractor_bq_usage.py | 142 +++++ .../extractor/king_extractor_owner.py | 184 ++++++ databuilder/tests/king/__init__.py | 0 databuilder/tests/king/conftest.py | 529 ++++++++++++++++ .../tests/king/test_extractor_bq_base.py | 29 + .../tests/king/test_extractor_bq_table.py | 252 ++++++++ .../tests/king/test_extractor_bq_usage.py | 71 +++ .../tests/king/test_extractor_owner.py | 80 +++ 11 files changed, 2177 insertions(+) create mode 100644 databuilder/databuilder/extractor/king_extractor_bq_base.py create mode 100644 databuilder/databuilder/extractor/king_extractor_bq_stats.py create mode 100644 databuilder/databuilder/extractor/king_extractor_bq_table.py create mode 100644 databuilder/databuilder/extractor/king_extractor_bq_usage.py create mode 100644 databuilder/databuilder/extractor/king_extractor_owner.py create mode 100644 databuilder/tests/king/__init__.py create mode 100644 databuilder/tests/king/conftest.py create mode 100644 databuilder/tests/king/test_extractor_bq_base.py create mode 100644 databuilder/tests/king/test_extractor_bq_table.py create mode 100644 databuilder/tests/king/test_extractor_bq_usage.py create mode 100644 databuilder/tests/king/test_extractor_owner.py diff --git a/databuilder/databuilder/extractor/king_extractor_bq_base.py b/databuilder/databuilder/extractor/king_extractor_bq_base.py new file mode 100644 index 0000000000..8e71195b46 --- /dev/null +++ b/databuilder/databuilder/extractor/king_extractor_bq_base.py @@ -0,0 +1,564 @@ +import logging +import time +import datetime +from requests import get +from pyhocon import ConfigTree +from typing import Any, List, Dict, cast, Iterator, Union, Tuple +import re + +from databuilder.extractor.base_bigquery_extractor import BaseBigQueryExtractor, DatasetRef +from databuilder.extractor.bigquery_metadata_extractor import ColumnMetadata +from googleapiclient.errors import HttpError + + +""" +We use this class so that the following attributes can be re-used across all of our BQ subclasses: +- billing project +- filter_dataset_name +- filter_type +- bq_log_table +""" + +DC_TEMPLATE_NAME = 'projects/king-datacommons-prod/locations/europe-west1/tagTemplates/' \ + 'king_datacommons_prod_{part}_metadata_template_v1_0' +DEFAULT_TB_DESCRIPTION = 'This table is missing a description. ' +LOGGER = logging.getLogger(__name__) +NUM_RETRIES = 3 +EXTRACTION_PROJECTS = ['king-datacommons-prod', 'king-bp-tr-prod'] +COMMONLY_USED_WITH_SQL = """ + WITH + referenced_resources AS ( + SELECT protopayload_auditlog.authenticationInfo.principalEmail AS email, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobConfiguration.query.query AS sql, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedViews AS referencedViews, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedTables AS referencedTables, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id + FROM `{bq_log_table}` + WHERE resource.type='bigquery_resource' AND + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatus.state = 'DONE' AND + protopayload_auditlog.methodName = 'jobservice.jobcompleted' + ), + all_queried_resources AS ( + SELECT v2.projectId, + v2.datasetId, + v2.tableId, + email, + job_id, + sql + FROM referenced_resources v + CROSS JOIN UNNEST(v.referencedViews) AS v2 + + UNION ALL + + SELECT v2.projectId, + v2.datasetId, + v2.tableId, + email, + job_id, + sql + FROM referenced_resources v + CROSS JOIN UNNEST(v.referencedTables) AS v2 + ), + queried_resources AS ( + SELECT CONCAT(projectId,'.', datasetId,'.',tableId) AS table_id, + job_id, + email + FROM all_queried_resources + WHERE projectId in ({extraction_projects}) AND + sql not like '%INFORMATION_SCHEMA%' AND + REGEXP_CONTAINS(sql, CONCAT(datasetId, '[\\\.`]+', tableId, '[`]+')) AND + tableId NOT LIKE '%*' + ), + queried_pairs AS ( + SELECT v1.table_id AS table_id_1, + v2.table_id AS table_id_2, + v1.job_id AS job_id, + v1.email AS email + FROM queried_resources v1 + CROSS JOIN queried_resources v2 + WHERE v1.job_id = v2.job_id AND v1.table_id != v2.table_id + GROUP BY 1,2,3,4 + ), + top_pairs AS ( + SELECT table_id_1, + table_id_2, + COUNT(*) queries, + COUNT(DISTINCT email) users, + COUNT(DISTINCT(IF(email LIKE '%@king.com', email, NULL))) human_users, + MAX(IF(email LIKE '%@king.com', 1, 0)) has_human_queries + FROM queried_pairs + GROUP BY 1,2 + ), + top_pairs_ranked AS ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY table_id_1 ORDER BY has_human_queries DESC, users DESC, queries DESC) r, + FROM top_pairs + WHERE {dataset_clause} + ) + SELECT table_id_1 as table_id, + ARRAY_AGG(table_id_2 ORDER BY r ASC) commonly_used_with + FROM top_pairs_ranked + WHERE r <= 5 + GROUP BY 1 + """ + + +class BqResource: + """ + Coerces dictionary outputted by GCP to our own object for ease of reference and brevity + """ + + def __init__(self, bq_tb: Dict): + self.project = bq_tb['tableReference']['projectId'] + self.dataset = bq_tb['tableReference']['datasetId'] + self.table = bq_tb['tableReference']['tableId'] + self.name = self.project + '.' + self.dataset + '.' + self.table + self.dataset_ref = self.project + '.' + self.dataset + self.type = 'table' if bq_tb['type'] in ('TABLE', 'EXTERNAL_TABLE') else 'view' + self.clustered = True if bq_tb.get('clustering') else False + self.cluster_fields = bq_tb['clustering']['fields'] if bq_tb.get('clustering') else None + self.row_count = int(bq_tb['numRows']) if bq_tb.get('numRows') else None + self.description = bq_tb.get('description') + self.last_modified_timestamp = int(int(bq_tb['lastModifiedTime']) / 1000) if bq_tb.get( + 'lastModifiedTime') else None + self.data_retention_days = int(int(bq_tb['timePartitioning']['expirationMs']) / (1000 * 60 * 60 * 24)) \ + if bq_tb.get('timePartitioning', {}).get('expirationMs') else None + self.schema = bq_tb.get('schema') + self.columns = self.schema['fields'] if self.schema else [] + self.column_names = [col['name'] for col in self.columns] + self.ui_table_description = str.strip(self.description or ' ') if self.description else DEFAULT_TB_DESCRIPTION + self.row_count_scope = 'full table' + if bq_tb.get('timePartitioning'): + self.partitioned = True + self.partition_name = bq_tb['timePartitioning']['field'] + self.partition_type = bq_tb['timePartitioning']['type'] + elif bq_tb.get('rangePartitioning'): + self.partitioned = True + self.partition_name = bq_tb['rangePartitioning']['field'] + self.partition_type = 'Integer' + else: + self.partitioned = False + self.partition_name = None + self.partition_type = None + + self.source_project = self.project + self.source_dataset = self.dataset + self.source_table = self.table + + if bq_tb['type'] == 'VIEW': + self.view_sql = bq_tb['view']['query'] + try: + self.source_project, self.source_dataset, self.source_table = \ + re.findall(r'king-[A-Za-z0-9\_\-\.]*', self.view_sql)[0].split('.') + except (KeyError, ValueError, IndexError): + pass + + +class KingBaseBigQueryExtractor(BaseBigQueryExtractor): + BILLING_PROJECT = 'billing_project' + FILTER_TYPE = 'filter_type' + FILTER_DATASET_NAME = 'filter_dataset_name' + BQ_LOG_TABLE = 'bq_log_table' + QUERY_GB_CAP = 'query_gb_cap' + REFERENCIADOR_URL = 'http://referenciador.int.midasplayer.com' + _DEFAULT_SCOPES = ['https://www.googleapis.com/auth/bigquery.readonly', + 'https://www.googleapis.com/auth/cloud-platform'] + + def init(self, conf: ConfigTree): + BaseBigQueryExtractor.init(self, conf) + self.billing_project = conf.get_string(KingBaseBigQueryExtractor.BILLING_PROJECT) + self.bq_log_table = conf.get_string(KingBaseBigQueryExtractor.BQ_LOG_TABLE, '') + self.query_gb_cap = int(conf.get_string(KingBaseBigQueryExtractor.QUERY_GB_CAP, 50)) + self.iter: Iterator[Any] = iter([]) + + self.filter_dataset_name = conf.get_string(KingBaseBigQueryExtractor.FILTER_DATASET_NAME, None) + self.filter_type = conf.get_string(KingBaseBigQueryExtractor.FILTER_TYPE, None) + if self.filter_dataset_name and str.lower(self.filter_type or '') not in ('exclude', 'include', ''): + raise Exception(f'Filter type must be set to "exclude", or "include" or None, got: {self.filter_type}') + self.filter_dataset_name = self.filter_dataset_name.replace(' ', '') if self.filter_dataset_name \ + else self.filter_dataset_name + self.data_catalog_client = None + + def retrieve_data_catalog_tags(self, target: BqResource, template: str) -> list: + """ + Get all tags for this BQ resource + """ + from google.cloud import datacatalog_v1 + from google.cloud.datacatalog_v1 import LookupEntryRequest + from google.api_core.exceptions import PermissionDenied + self.data_catalog_client = datacatalog_v1.DataCatalogClient() + valid_templates: List[str] = ['table', 'column'] + + if template not in valid_templates: + raise Exception(f'Invalid template passed: {template}. Valid values: {str(valid_templates)}') + + source = f'//bigquery.googleapis.com/projects/{target.source_project}/datasets/{target.source_dataset}/' \ + f'tables/{target.source_table}' + try: + entry = self.data_catalog_client.lookup_entry(request=LookupEntryRequest({'linked_resource': source})) + all_tags = self.data_catalog_client.list_tags(parent=entry.name) + return [tag for tag in all_tags if tag.template == DC_TEMPLATE_NAME.format(part=template)] + except PermissionDenied as e: + logging.warning(f'Unable to get tag for ' + f'{target.source_project}:{target.source_dataset}.{target.source_table} - {str(e)}') + return [] + + def _iterate_over_cols(self, + parent: str, + column: Dict[str, str], + cols: List[ColumnMetadata], + total_cols: int, + column_tags: dict) -> int: + col_name = parent + '.' + column['name'] if len(parent) > 0 else column['name'] + column_tag = column_tags.get(col_name) + column_grain_desc = ('Column grain: ' + column_tag + '. ') if column_tag else '' + col_desc = column.get('description') or ' ' + full_column_description = column_grain_desc + col_desc + + if column['type'] == 'RECORD': + col = ColumnMetadata( + name=col_name, + description=full_column_description, + col_type=column['type'], + sort_order=total_cols) + cols.append(col) + total_cols += 1 + for field in column['fields']: + field_casted = cast(Dict[str, str], field) + total_cols = self._iterate_over_cols(col_name, field_casted, cols, total_cols, column_tags) + return total_cols + else: + col = ColumnMetadata( + name=col_name, + description=full_column_description, + col_type=column['type'], + sort_order=total_cols) + cols.append(col) + return total_cols + 1 + + def get_columns(self, bq_resource: BqResource): + tags = self.retrieve_data_catalog_tags(bq_resource, 'column') + column_tags: dict = {} + for t in tags: + column_tags[t.column] = t.fields['column_grain'].string_value + + cols, total_cols = [], 0 + for column in bq_resource.columns if bq_resource.columns else []: + total_cols = self._iterate_over_cols('', column, cols, total_cols + 1, column_tags) + return cols + + def _retrieve_tables(self, dataset) -> Any: + for page in self._page_table_list_results(dataset): + if 'tables' not in page: + continue + + for table in page['tables']: + yield table['tableReference'] + + def _retrieve_datasets(self) -> List[DatasetRef]: + datasets = [] + for page in self._page_dataset_list_results(): + if 'datasets' not in page: + continue + + for dataset in page['datasets']: + dataset_ref = dataset['datasetReference'] + ref = DatasetRef(**dataset_ref) + dataset_list = (self.filter_dataset_name or '').split(',') + if not self.filter_dataset_name or \ + (self.filter_type == 'include' and ref.datasetId in dataset_list) or \ + (self.filter_type == 'exclude' and ref.datasetId not in dataset_list): + logging.debug(f"Extracting {ref.datasetId}") + datasets.append(ref) + return datasets + + def run_bq_query(self, + sql, + use_legacy_sql, + retry_on_error, + wait_seconds_if_failed, + use_query_cache, + retry_limit): + """ + Runs BQ query. Retries if retry_limit is specified + """ + query_body = {'query': sql, + 'useLegacySql': use_legacy_sql, + 'jobTimeoutMs': wait_seconds_if_failed * 100, + 'useQueryCache': use_query_cache} + + if not retry_on_error: + return self.bigquery_service.jobs().query(projectId=self.billing_project, body=query_body).execute( + num_retries=NUM_RETRIES) + else: + query_job, retries = None, 0 + while True: + try: + query_job = self.bigquery_service.jobs().query( + projectId=self.billing_project, + body=query_body).execute(num_retries=NUM_RETRIES) + except HttpError as e: + if 400 <= int(e.resp['status']) < 500: + logging.warning(f'{e.resp["status"]} error on query. Will not be retrying {str(e)}') + break + + logging.warning(f'Query failed {retries + 1} times {e.content}. Will retry in ' + f'{wait_seconds_if_failed} seconds. Query: {sql}') + time.sleep(wait_seconds_if_failed) + retries += 1 + + if query_job or retries >= retry_limit: + break + + return query_job + + def get_query_results(self, + sql, + use_legacy_sql=False, + wait_seconds_if_failed=10, + wait_seconds_to_complete=900, + retry_on_error=True, + use_query_cache=True, + retry_limit=3): # TODO handle nested fields + """ + Waits for query to complete (if query does not return immediately) + """ + result = self.run_bq_query(sql=sql, + use_legacy_sql=use_legacy_sql, + retry_on_error=retry_on_error, + wait_seconds_if_failed=wait_seconds_if_failed, + use_query_cache=use_query_cache, + retry_limit=retry_limit) + if not result: + logging.warning(f'Error must have occurred while getting query result...') + return [] + + if not result.get('jobComplete'): + job_id, seconds_elapsed = result['jobReference']['jobId'], 0 + while True: + result = self.bigquery_service.jobs().getQueryResults( + projectId=self.billing_project, + jobId=job_id, + location='EU' + ).execute(num_retries=NUM_RETRIES) + if result.get('jobComplete'): + break + if seconds_elapsed >= wait_seconds_to_complete: + logging.warning( + "The defined timeline for this query has been exceeded, trying to cancel the query...") + self.bigquery_service.jobs().cancel(projectId=self.billing_project, jobId=job_id, location='EU') \ + .execute(num_retries=NUM_RETRIES) + return [] + logging.warning( + f'Query still running... Job ID: {job_id}. {round(seconds_elapsed / 60, 2)} minutes elapsed. ' + f'Timeout: {round(wait_seconds_to_complete / 60, 2)} ') + seconds_elapsed += 15 + time.sleep(15) + logging.info(f'Query took {round(seconds_elapsed / 60, 2)} minutes') + + # parse query + col_names = [col['name'] for col in result['schema']['fields']] + records = [] + for row in result.get('rows') or []: + records.append(dict(zip(col_names, [v['v'] for v in row['f']]))) + return records + + def get_query_estimated_gb_processed(self, sql: str, use_legacy_sql: bool = False) -> int: + """ + Get GB processed for this SQL + """ + return int(int(self.bigquery_service.jobs().query( + projectId=self.billing_project, + body={'query': sql, 'useLegacySql': use_legacy_sql, 'dryRun': True} + ).execute(num_retries=NUM_RETRIES)['totalBytesProcessed']) / 10000000000) + + def _retrieve_table_detail(self, project: str, dataset: str, table: str) -> BqResource: + """ + Get table detail + """ + return BqResource( + self.bigquery_service.tables().get( + projectId=project, + datasetId=dataset, + tableId=table + ).execute(num_retries=NUM_RETRIES)) + + def _retrieve_row_count(self, + target: BqResource, + referenced_tables: List[BqResource]) \ + -> Tuple[Union[int, None], Union[str, None]]: + """ + Try getting row count of the BQ view, if query GB processed does not exceed the defined cap + """ + partitions = set() + sql = f'SELECT COUNT(*) c FROM {target.name} WHERE TRUE ' + max_dt = (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y-%m-%d') + + for tb in referenced_tables or []: + partitions.add(tb.partition_name) if tb.partition_name in target.column_names else None + + for p in partitions: + sql += f' AND {p} = "{max_dt}" ' + try: + gb_processed = self.get_query_estimated_gb_processed(sql) + except HttpError as e: + logging.warning(f'Unable to get estimated gb processed due to error {str(e)}') + return None, None + + if gb_processed <= self.query_gb_cap: + logging.info(f'Getting SQL row count for {target.name}: {sql}') + row_count = int(self.get_query_results(sql)[0]['c']) + row_count_scope = ','.join(partitions) + ': ' + max_dt if len(partitions) > 0 else 'whole table' + if row_count > 0: + return row_count, row_count_scope + else: + logging.warning(f'Unable to get row count for {target.name}. The query would have processed ' + f'{gb_processed} which exceeds the defined cap of {self.query_gb_cap}') + return None, None + + def _retrieve_last_updated_ts(self, referenced_tables: List[BqResource]) -> Union[None, int]: + """ + Get the timestamp of the most recently updated resource + """ + latest_ts = 0 + for tb in referenced_tables or []: + if tb.last_modified_timestamp > latest_ts: + latest_ts = tb.last_modified_timestamp + return latest_ts if latest_ts != 0 else None + + def _retrieve_referenced_tables(self, + target, + retry_limit: int = 3, + wait_seconds_if_failed: int = 10) -> dict: + """ + Find out what resources this view references + """ + logging.info(f'Getting referenced tables for {target.name}') + result = self.run_bq_query(sql=f'SELECT * FROM {target.name} LIMIT 0', + use_legacy_sql=False, + retry_on_error=True, + use_query_cache=False, + retry_limit=retry_limit, + wait_seconds_if_failed=wait_seconds_if_failed) + + default_return = {'tables': [], 'is_singular_source': None, 'uses_wildcard': None, 'is_logical_view': None} + + if not result: # best effort basis + logging.warning(f'Could not execute query for referenced tables {target.name} after ' + f'{retry_limit} times.') + return default_return + + try: + referenced_tables = self.bigquery_service.jobs().get( + projectId=self.billing_project, + jobId=result['jobReference']['jobId'] + ).execute(num_retries=NUM_RETRIES)['statistics']['query'].get('referencedTables') or [] + + wildcard_resources = [tb for tb in referenced_tables if '*' in tb['tableId']] + singular_resources = [self._retrieve_table_detail(tb['projectId'], tb['datasetId'], tb['tableId']) + for tb in referenced_tables if '*' not in tb['tableId']] + + return {'tables': singular_resources, + 'is_singular_source': len(singular_resources) == 1 == len(referenced_tables), + 'uses_wildcard': len(wildcard_resources) > 0, + 'is_logical_view': len(referenced_tables) == 0} + + except (TypeError, KeyError, HttpError) as e: # best-effort basis + logging.warning(f'Could not get referenced tables for {target.name} due to error: {e}') + return default_return + + def _retrieve_source_table(self, target: BqResource) -> Union[BqResource, None]: + """ + Return the source object (if singular) or transformed target object (if multiple sources detected) + """ + source_info = self._retrieve_referenced_tables(target) + + if source_info['is_singular_source']: + return source_info['tables'][0] + + if source_info['uses_wildcard'] is False or source_info['is_logical_view'] is True: + target.row_count, target.row_count_scope = self._retrieve_row_count(target, source_info['tables']) + target.last_modified_timestamp = self._retrieve_last_updated_ts(source_info['tables']) + return target + else: + logging.warning(f'{target.name} uses wildcard - aborting extraction of row count and ts') + + def _get_source_table(self, target: BqResource) -> Union[BqResource, None]: + """ + If target is a table, return it. Else, return the source table for the view (if singular and non-sharded) + """ + return target if target.type == 'table' else self._retrieve_source_table(target) + + def retrieve_event_classification(self) -> Union[Dict, None]: + """ + Get event tags from referenciador + """ + url, m = f'{self.REFERENCIADOR_URL}/king_constants/latest/EventClassification.tsv', dict() + + try: + for line in [line for line in get(url).text.split('\n') if line != '']: + _, event_name, category_string = line.split('\t') + if all(x not in str.lower(category_string) for x in ['unknown', 'nan']): + m[str.lower(event_name)] = category_string.split(',') + return m + except Exception as e: + logging.warning(f'Could not load event classification from referenciador {str(e)}') + return + + def _retrieve_commonly_used(self) -> Dict: + """ + Get commonly used tables + """ + logging.info(f'Getting commonly used with...') + + current_year = datetime.datetime.now().strftime('%Y') + dataset_clause = f"table_id_1 LIKE '{self.project_id}.%' " + + if self.filter_dataset_name: + dataset_clause += ' AND \n' + filter_expr = ' NOT LIKE ' if str.lower(self.filter_type) == 'exclude' else ' LIKE ' + dataset_clause += ' AND \n'.join([f' table_id_1 {filter_expr} "{self.project_id}.{dataset}.%"' + for dataset in self.filter_dataset_name.split(',')]) + + sql = COMMONLY_USED_WITH_SQL.format( + bq_log_table=self.bq_log_table + '_' + current_year + '*', + extraction_projects=','.join([f'"{p}"' for p in EXTRACTION_PROJECTS]), + dataset_clause=dataset_clause) + + result = {} + + for row in self.get_query_results(sql): + main_table, commonly_joined_with = row['table_id'], [] + for tb_name in row['commonly_used_with']: + project, dataset, table = tb_name['v'].split('.') + frontend_url = '{frontend_url}' + link = f'[{dataset}.{table}]({frontend_url}/table_detail/{project}/bigquery/{dataset}/{table})' + commonly_joined_with.append(link) + result[main_table] = commonly_joined_with + return result + + def _retrieve_watermark(self, source: BqResource) -> Union[None, Dict]: + """ + Get min and max partition + """ + if source.partition_type == 'DAY': + sql = f"""SELECT CONCAT(project_id, '.', dataset_id, '.', table_id) tb_name, + MIN(partition_id) low, + MAX(partition_id) high, + MAX(last_modified_time) ts + FROM [{source.project}:{source.dataset}.{source.table}$__PARTITIONS_SUMMARY__] + WHERE partition_id != '__UNPARTITIONED__' AND partition_id != '__NULL__' + GROUP BY 1 + """ + result = self.get_query_results(sql, use_legacy_sql=True) + if len(result) == 1: + return result[0] + + def extract(self) -> Any: + try: + return next(self.iter) + except StopIteration: + return None + + def get_scope(self) -> str: + return 'extractor.bq_base_extractor' diff --git a/databuilder/databuilder/extractor/king_extractor_bq_stats.py b/databuilder/databuilder/extractor/king_extractor_bq_stats.py new file mode 100644 index 0000000000..a605c361b1 --- /dev/null +++ b/databuilder/databuilder/extractor/king_extractor_bq_stats.py @@ -0,0 +1,164 @@ +import logging +from datetime import datetime +from pyhocon import ConfigTree + +from databuilder.models.table_stats import TableColumnStats + +from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor +from databuilder.extractor.king_extractor_bq_base import BqResource + +LOGGER = logging.getLogger(__name__) +STAT_LABELS = ['approximate_distinct_values', 'non_null_values', 'null_values'] +GB_PROCESSED_CAP = 50 +NUM_RETRIES = 3 + + +def get_stat_sql(columns: [str], + target: BqResource, + partition_clause: str = None): + nested_records = {} + for c in [c for c in target.columns if c.get('mode') == 'REPEATED']: + nested_records[c['name']] = c['type'] + + def get_final_col(c): + if '.' in c: + col_alias, col_name = c.split('.') + return f'{col_alias}.`{col_name}`' + elif c in nested_records.keys(): + return f'`{c}`' + else: + return f'mainTbAlias.`{c}`' + + sql = 'SELECT\n' + + for c in columns: + col = get_final_col(c) + alias = c.replace('.', '_') + sql += f'\t/* Stats for column {col} */\n' + sql += f"\tAPPROX_COUNT_DISTINCT({col}) as {alias}_approximate_distinct_values,\n" + sql += f"\tCOUNTIF({col} IS NOT NULL) as {alias}_non_null_values,\n" + sql += f"\tCOUNTIF({col} IS NULL) as {alias}_null_values,\n" + + sql += f'FROM `{target.name}` mainTbAlias \n' + + for k, v in nested_records.items() or {}: + sql += f'LEFT JOIN UNNEST({k}) {k} \n' + + sql += f'\nWHERE\n' + + if partition_clause: + sql += partition_clause + else: + for c in columns: + col = get_final_col(c) + sql += f'({col} IS NULL OR {col} IS NOT NULL) AND \n' + sql += 'TRUE ' + return sql + + +def format_stat_display_value(stat_value): + """Separate thousands by comma or return original value on error""" + try: + return format(int(stat_value), ',') + except Exception as e: + return stat_value + + +def format_stat_display_name(col_name, stat_value): + """ + Populate display name of col stat + e.g. Converts stat_name such as 'col_name_non_null_values' and col col_name to 'Non Null Values' + """ + return ' '.join([str.capitalize(s) for s in stat_value.replace(col_name, '').split('_')]) + + +class KingColumnStatExtractor(KingBaseBigQueryExtractor): + + def init(self, conf: ConfigTree) -> None: + KingBaseBigQueryExtractor.init(self, conf) + self.iter = iter(self._iterate_over_tables()) + + def _get_tb_partition_details(self, source: BqResource): + partition_value, start_ts, end_ts = None, None, None + try: + sql = "SELECT MAX(partition_id) max_partition " \ + f" FROM [{source.name}$__PARTITIONS_SUMMARY__] " \ + " WHERE partition_id != '__UNPARTITIONED__' AND partition_id != '__NULL__' " + max_partition = self.get_query_results(use_legacy_sql=True, sql=sql)[0]['max_partition'] + dt = datetime.strptime(max_partition, '%Y%m%d') + ts, partition_value = int(dt.timestamp()), dt.strftime('%Y-%m-%d') + start_ts, end_ts = ts, ts + + except Exception as e: + logging.warning(f'Error getting table partition details for {source.name}. ' + f'Partition value: {partition_value}. ' + f'Partition {source.partition_name}. Error: {str(e)}') + finally: + return partition_value, source.partition_name, start_ts, end_ts + + def _get_view_partition_details(self, target: BqResource): + max_partition, partition_col, start_ts, end_ts = None, None, None, None + try: + max_partition = self.get_query_results(f'SELECT MAX(dt) as m from `{target.name}`')[0]['m'] + partition_col = 'dt' + ts = int(datetime.strptime(max_partition, '%Y-%m-%d').timestamp()) + start_ts, end_ts = ts, ts + except Exception as e: + logging.warning(f'Error getting view partition details for {target.name}. ' + f'Partition value: {max_partition}. ' + f'Partition {target.partition_name}. Error: {str(e)}') + finally: + return max_partition, partition_col, start_ts, end_ts + + def _iterate_over_tables(self): + """ + Get references to the customer-facing view and its underlying table + """ + for dataset in self._retrieve_datasets(): + for t in self._retrieve_tables(dataset): + target: BqResource = self._retrieve_table_detail(t['projectId'], t['datasetId'], t['tableId']) + source: BqResource = self._get_source_table(target) + + col_names = [c.name for c in self.get_columns(target) if c.type != 'RECORD'] + partition_value, partition_col, start_ts, end_ts = None, None, 0, 0 + + if source and source.partitioned and source.partition_type == 'DAY': + partition_value, partition_col, start_ts, end_ts = self._get_tb_partition_details(source) + + if not source and 'dt' in col_names: + partition_value, partition_col, start_ts, end_ts = self._get_view_partition_details(target) + + try: + partition_clause = f' {partition_col} = "{partition_value}" ' if partition_value else None + stat_sql = get_stat_sql(col_names, target, partition_clause=partition_clause) + estimated_gb_processed = self.get_query_estimated_gb_processed(stat_sql) + + if estimated_gb_processed <= GB_PROCESSED_CAP: + logging.warning(f'Getting stats for {target.name}... ' + f'The stats query will process {estimated_gb_processed} GB. ' + f'Partition clause: {partition_clause}') + + tb_stats = self.get_query_results(sql=stat_sql, wait_seconds_to_complete=90)[0] + + for col_name in col_names: + for stat_name in [col_name.replace('.', '_') + '_' + s for s in STAT_LABELS]: + yield TableColumnStats( + db='bigquery', + cluster=target.project, + schema=target.dataset, + table_name=target.table, + col_name=col_name, + stat_name=format_stat_display_name(col_name, stat_name), + stat_val=format_stat_display_value(tb_stats[stat_name]), + start_epoch=str(start_ts), + end_epoch=str(end_ts)) + logging.info(f'table_stats:SUCCESS: {target.name}') + else: + logging.warning(f"table_stats:SKIPPED: {target.name} the estimated bytes processed " + f"{estimated_gb_processed} exceeded the limit of {GB_PROCESSED_CAP} GB") + + except Exception as e: + logging.warning(f'table_stats:FAILURE: {target.name}. Error: {str(e)}') + + def get_scope(self) -> str: + return 'extractor.king_column_stat_extractor' diff --git a/databuilder/databuilder/extractor/king_extractor_bq_table.py b/databuilder/databuilder/extractor/king_extractor_bq_table.py new file mode 100644 index 0000000000..ab7d634112 --- /dev/null +++ b/databuilder/databuilder/extractor/king_extractor_bq_table.py @@ -0,0 +1,162 @@ +import logging +from typing import Dict, List +import datetime +from time import time + +from databuilder.models.es_last_updated import ESLastUpdated +from databuilder.models.table_metadata import TableMetadata +from databuilder.models.badge import BadgeMetadata, Badge +from databuilder.models.table_last_updated import TableLastUpdated +from databuilder.models.watermark import Watermark + +from pyhocon import ConfigTree + +from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor +from databuilder.extractor.king_extractor_bq_base import BqResource + + +LOGGER = logging.getLogger(__name__) +MD_SECTION_SEPARATOR = '\n\\\n' + + +class KingTableExtractor(KingBaseBigQueryExtractor): + FRONTEND_URL = 'frontend_url' + BATCH = 'batch' + + def init(self, conf: ConfigTree) -> None: + KingBaseBigQueryExtractor.init(self, conf) + self.iter = iter(self._iterate_over_tables()) + self.frontend_url = conf.get_string(KingTableExtractor.FRONTEND_URL) + self.batch = conf.get_string(KingTableExtractor.BATCH, None) + self.dataset_tags = {} + + def _get_ui_metadata_description(self, source: BqResource, target: BqResource): + md_space = f'{MD_SECTION_SEPARATOR} \n##### ' + grain, doc_url, slack, row_count_scope, row_count = None, None, None, None, None + try: + dc_tag = self.retrieve_data_catalog_tags(target, 'table') + if len(dc_tag) == 1: + tag = dc_tag[0] + grain = tag.fields['table_grain'].string_value if tag.fields.get('table_grain') else None + doc_url = tag.fields['documentation_url'].string_value if tag.fields.get('documentation_url') else None + slack = tag.fields['slack_channel'].string_value if tag.fields.get('slack_channel') else None + except Exception as e: + logging.error(f'Could not fetch an attribute from data catalog. Error: {str(e)}') + + if source and source.row_count: + row_count_scope, row_count = source.row_count_scope, source.row_count + elif target.row_count: + row_count_scope, row_count = target.row_count_scope, target.row_count + + ui_metadata_description = \ + (f"{md_space}Documentation\n Click [here]({doc_url}) to access further documentation for this resource." if doc_url else '') + \ + (f"{md_space}Data Retention\n{source.data_retention_days} days" if source and source.data_retention_days else '') + \ + (f"{md_space}Partition\n{source.partition_name} ({source.partition_type})" if source and source.partitioned else '') + \ + (f"{md_space}Cluster Columns\n{', '.join(source.cluster_fields)}" if source and source.clustered else '') + \ + (f"{md_space}Rows\n{format(row_count, ',d')} ({row_count_scope})" if row_count else '') + \ + (f"{md_space}Table Grain\n{grain}" if grain else '') + \ + (f"{md_space}Slack Channel\n{slack}" if slack else '') + + return ui_metadata_description + + def _iterate_over_tables(self): + """ + Iterates over BigQuery datasets and tables and outputs relevant Amundsen model objects + """ + commonly_used_with: Dict = self._retrieve_commonly_used() + + for dataset in self._retrieve_datasets(): + self._set_custom_dataset_tags(dataset) + + current_index, start_index, stop_index = 0, 0, 0 + if self.batch: + start_index, stop_index = [int(x) for x in self.batch.split('-')] + logging.info(f'Batch parameter passed. Processing tables: {start_index} to {stop_index}') + + for t in self._retrieve_tables(dataset): + current_index += 1 + if not self.batch or (start_index <= current_index <= stop_index): + final_output = [] + logging.info(f'Processing table {current_index}...') + target: BqResource = self._retrieve_table_detail(t['projectId'], t['datasetId'], t['tableId']) + source: BqResource = self._get_source_table(target) + + # output final Amundsen objects + final_output.append(TableMetadata( + database='bigquery', + cluster=target.project, + schema=target.dataset, + name=target.table, + description=target.ui_table_description + self._get_ui_metadata_description(source, target), + columns=self.get_columns(target), + is_view=False, + tags=self.dataset_tags.get(target.dataset_ref, {}).get(target.table) or [])) + + final_output.append(TableMetadata( + database='bigquery', + cluster=target.project, + schema=target.dataset, + name=target.table, + description_source='COMMONLY_USED_WITH', # programmatic description + description=MD_SECTION_SEPARATOR.join([t.format(frontend_url=self.frontend_url) + for t in commonly_used_with.get(target.name, [])]))) + + if (source and source.last_modified_timestamp) or target.last_modified_timestamp: + final_output.append(TableLastUpdated( + last_updated_time_epoch=source.last_modified_timestamp + if source and source.last_modified_timestamp else target.last_modified_timestamp, + db='bigquery', + cluster=target.project, + schema=target.dataset, + table_name=target.table)) + + if source and source.partitioned: + final_output.append(BadgeMetadata( + start_label='Column', + start_key=f"bigquery://{target.project}.{target.dataset}/{target.table}/{source.partition_name}", + badges=[Badge(name='partition', category='column')])) + + for field in source.cluster_fields if source and source.clustered else []: + final_output.append(BadgeMetadata( + start_label='Column', + start_key=f"bigquery://{target.project}.{target.dataset}/{target.table}/{field}", + badges=[Badge(name='cluster', category='column')])) + + if source and source.partition_type == 'DAY': + high_watermark, low_watermark = self._extract_watermark(source, target) + if high_watermark and low_watermark: + final_output.append(high_watermark) + final_output.append(low_watermark) + + for i in final_output: + yield i + + yield ESLastUpdated(timestamp=int(time())) + + def _extract_watermark(self, source: BqResource, target: BqResource) -> List: + """ + If source table has been found and partition dates could be retrieved, output Watermark model + """ + watermark_data = self._retrieve_watermark(source) + if watermark_data: + ts = datetime.datetime.utcfromtimestamp(float(watermark_data['ts']) / 1000.).strftime('%Y-%m-%d %H:%M:%S') + watermarks = [] + for w in ['high', 'low']: + watermarks.append( + Watermark(part_type=f'{w}_watermark', + part_name=f'{source.partition_name}={watermark_data[w]}', + create_time=ts, + database='bigquery', + cluster=target.project, + schema=target.dataset, + table_name=target.table)) + return watermarks + else: + return [None, None] + + def _set_custom_dataset_tags(self, dataset): + if dataset.projectId == 'king-datacommons-prod' and dataset.datasetId == 'event': + self.dataset_tags['king-datacommons-prod.event'] = self.retrieve_event_classification() + + def get_scope(self) -> str: + return 'extractor.king_table_extractor' diff --git a/databuilder/databuilder/extractor/king_extractor_bq_usage.py b/databuilder/databuilder/extractor/king_extractor_bq_usage.py new file mode 100644 index 0000000000..3be4cd6754 --- /dev/null +++ b/databuilder/databuilder/extractor/king_extractor_bq_usage.py @@ -0,0 +1,142 @@ +import logging +import datetime +from typing import Dict + +from databuilder.models.user import User +from databuilder.models.table_column_usage import ColumnReader +from pyhocon import ConfigTree + +from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor + + +""" +Writes table usage data +""" + +LOGGER = logging.getLogger(__name__) + + +def get_filter_clause(filter_dataset_name, filter_type): + dataset_clause = ' ' + if filter_dataset_name: + filter_expr = ' NOT IN ' if filter_type == 'EXCLUDE' else ' IN ' + dataset_clause += f'datasetId {filter_expr} (' + \ + ','.join([f'"{d}"' for d in filter_dataset_name.split(',')]) + ') ' + + +def get_usage_sql(bq_log_table, project_id, filter_dataset_name=None, filter_type=None) -> str: + dataset_clause = get_filter_clause(filter_dataset_name, filter_type) or '' + + return f""" + WITH referenced_resources AS ( + SELECT protopayload_auditlog.authenticationInfo.principalEmail AS email, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobConfiguration.query.query AS sql, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedViews AS referencedViews, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatistics.referencedTables AS referencedTables, + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobName.jobId AS job_id + FROM `{bq_log_table}` + WHERE resource.type='bigquery_resource' AND + protopayload_auditlog.servicedata_v1_bigquery.jobCompletedEvent.job.jobStatus.state = 'DONE' AND + protopayload_auditlog.methodName = 'jobservice.jobcompleted' + ), + all_queried_resources AS ( + SELECT v2.projectId, + v2.datasetId, + v2.tableId, + email, + job_id, + sql + FROM referenced_resources v + CROSS JOIN UNNEST(v.referencedViews) AS v2 + + UNION ALL + + SELECT v2.projectId, + v2.datasetId, + v2.tableId, + email, + job_id, + sql + FROM referenced_resources v + CROSS JOIN UNNEST(v.referencedTables) AS v2 + ), + queried_resources AS ( + SELECT + projectId as project, + datasetId as dataset, + tableId as table, + email, + COUNT(*) as query_count + FROM all_queried_resources + WHERE projectId = '{project_id}' AND + sql not like '%INFORMATION_SCHEMA%' AND + REGEXP_CONTAINS(sql, CONCAT(datasetId, '[\\\.`]+', tableId, '[`]+')) + {dataset_clause} + GROUP BY 1,2,3,4 + ) + SELECT *, + DENSE_RANK() OVER (PARTITION BY project, dataset, table ORDER BY query_count DESC) user_frequency_rank + FROM queried_resources + WHERE email NOT LIKE '%amundsen%' + """ + + +class KingUsageExtractor(KingBaseBigQueryExtractor): + + def init(self, conf: ConfigTree): + KingBaseBigQueryExtractor.init(self, conf) + self.iter = iter(self._count_usage()) + + def _get_usage_records(self) -> Dict: + """ + Extracts bigquery log data by looking at the principalEmail in the + authenticationInfo block and referencedTables in the jobStatistics. + + :return: Provides a record or None if no more to extract + """ + current_month = datetime.datetime.now().strftime('%Y%m') + sql = get_usage_sql(self.bq_log_table + '_' + current_month + '*', + self.project_id, + self.filter_dataset_name, + self.filter_type) + return self.get_query_results(sql) + + def _count_usage(self) -> [ColumnReader]: + for entry in self._get_usage_records(): + logging.debug(f'Adding user for {entry["dataset"]}.{entry["table"]}') + yield ColumnReader(database='bigquery', + cluster=entry['project'], + schema=entry['dataset'], + table=entry['table'], + user_email=entry['email'], + read_count=int(entry['query_count']), + column='*') + + logging.debug(f'Adding user... Count {int(entry["query_count"])}') + email = entry['email'] + username, domain = email.split('@') + firstname, surname, full_name, github_username, role_name, team_name = ' ', ' ', ' ', ' ', ' ', ' ' + try: + if '@king.com' in email: + firstname, surname = username.split('.') + full_name = firstname.capitalize() + ' ' + surname.capitalize() + github_username = f'https://github.int.midasplayer.com/{firstname}-{surname}' + elif 'serviceaccount' in email: + firstname = username + surname = domain.split('.')[0] + full_name = firstname + ' ' + surname + role_name = firstname + team_name = surname + except (ValueError, IndexError) as e: + pass + + yield User(email=email, + first_name=firstname, + last_name=surname, + full_name=full_name, + github_username=github_username, + role_name=role_name, + team_name=team_name) + + def get_scope(self) -> str: + return 'extractor.king_usage_extractor' diff --git a/databuilder/databuilder/extractor/king_extractor_owner.py b/databuilder/databuilder/extractor/king_extractor_owner.py new file mode 100644 index 0000000000..805ca0819c --- /dev/null +++ b/databuilder/databuilder/extractor/king_extractor_owner.py @@ -0,0 +1,184 @@ +import logging +from typing import Any, Dict, List +from requests import get + +import google.auth +import googleapiclient.discovery + +from databuilder.models.table_owner import TableOwner +from databuilder.models.table_source import TableSource +from databuilder.models.application import Application + +from pyhocon import ConfigTree + +from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor +from databuilder.extractor.king_extractor_bq_base import BqResource + +""" +Extracts source, application and owner of tables out of KDN +""" + + +class OwnerExtractor(KingBaseBigQueryExtractor): + KDN_URL = 'kdn_url' + SERVICE_ACCOUNT_NAME = 'service_account_name' + TEAM_TO_DATASET_MAPPING = {} + + def init(self, conf: ConfigTree) -> None: + KingBaseBigQueryExtractor.init(self, conf) + self.iter = iter(self._iterate_over_tables()) + self.kdn_url = conf.get_string(OwnerExtractor.KDN_URL) + self.service_account_name = conf.get_string(OwnerExtractor.SERVICE_ACCOUNT_NAME) + + def _retrieve_auth_token(self): + """ + Get Authentication Identity token + """ + credentials, _ = google.auth.default() + service = googleapiclient.discovery.build("iamcredentials", "v1", credentials=credentials, + cache_discovery=False) + + request = service.projects().serviceAccounts().generateIdToken( + name=f"projects/-/serviceAccounts/{self.service_account_name}", + body={"audience": self.kdn_url, 'includeEmail': True}) + + return request.execute()['token'] + + def _retrieve_from_kdn(self, entity) -> []: + """ + Makes an authenticated request to KDN + """ + return get(f'{self.kdn_url}/{entity}', + headers={'Authorization': f'Bearer {self._retrieve_auth_token()}'} + ).json()[entity] + + def _retrieve_kdn_tables(self): + """ + Get KDN tables + """ + return self._retrieve_from_kdn('tables') + + def _retrieve_kdn_operations(self): + """ + Get KDN operations + """ + return self._retrieve_from_kdn('operations') + + def _retrieve_bq_tables(self) -> List[dict]: + """ + Get all tables in BQ + """ + bigquery_tables = [] + for dataset in self._retrieve_datasets(): + for t in self._retrieve_tables(dataset): + bigquery_tables.append(t) + return bigquery_tables + + def _get_table_base(self): + base_tables = [] + for t in self._retrieve_bq_tables(): + project, dataset, table = t['projectId'], t['datasetId'], t['tableId'] + tb = {'gcpProject': project, 'dataset': dataset, 'tableName': table} + target = self._retrieve_table_detail(project, dataset, table) + table_tag = self.retrieve_data_catalog_tags(target, 'table') + + if len(table_tag) == 1: + tag = table_tag[0] + for dc_label, final_label in \ + {'owner_email_address': 'owningTeamEmail', + 'github_url': 'gitHub', + 'etl_url': 'parentJobUrl'}.items(): + field_value = tag.fields.get(dc_label) + if field_value: + tb[final_label] = field_value.string_value + + base_tables.append(tb) + return base_tables + + def _get_kdn_table_map(self) -> Dict: + """ + Get all tables from KDN. Merge attributes + """ + operations_dict = dict() + for o in self._retrieve_kdn_operations(): + operations_dict[o['name']] = {'parentJobUrl': o.get('parentJobUrl'), + 'gitHub': o.get('gitHub'), + 'owningTeamEmail': o.get('owningTeamEmail')} + + kdn_tables = [t for t in self._retrieve_kdn_tables() if t['gcpProject'] == self.project_id] + kdn_table_map = {} + for tb in kdn_tables: + parent_name = tb['parentObject'] + + for kdn_attribute in ['parentJobUrl', 'gitHub', 'owningTeamEmail']: + if not tb.get(kdn_attribute): # fall back to parent operation + tb[kdn_attribute] = operations_dict[parent_name].get(kdn_attribute) + + fully_qualified_tb = f"{tb['gcpProject']}.{tb['dataset']}.{tb['tableName']}" + kdn_table_map[fully_qualified_tb] = tb + return kdn_table_map + + def _get_merged_tables(self): + """ + Merge KDN data into retrieved BQ tables + """ + bigquery_tables = self._get_table_base() + kdn_table_map = self._get_kdn_table_map() + merged_tables = [] + + for tb in bigquery_tables: + dataset_ref = f"{tb['gcpProject']}.{tb['dataset']}" + kdn_tb = kdn_table_map.get(f"{dataset_ref}.{tb['tableName']}", {}) + + for attribute, fallback in { + 'owningTeamEmail': kdn_tb.get('owningTeamEmail') or self.TEAM_TO_DATASET_MAPPING.get(dataset_ref), + 'gitHub': kdn_tb.get('gitHub'), + 'parentJobUrl': kdn_tb.get('parentJobUrl') + }.items(): + if not tb.get(attribute): + tb[attribute] = fallback + + merged_tables.append(tb) + + return merged_tables + + def _iterate_over_tables(self) -> Any: + """ + For every table we find, output Amundsen Owner, Source or Application + """ + final_output = [] + for tb in self._get_merged_tables(): + if tb.get('owningTeamEmail'): + logging.debug("Writing TableOwner " + tb['tableName'] + " : " + tb['owningTeamEmail']) + final_output.append(TableOwner(cluster=tb['gcpProject'], + db_name='bigquery', + schema=tb['dataset'], + table_name=tb['tableName'], + owners=[tb['owningTeamEmail']])) + + if tb.get('gitHub'): + logging.debug("Writing TableSource " + tb['tableName'] + " : " + tb['gitHub']) + final_output.append(TableSource(cluster=tb['gcpProject'], + db_name='bigquery', + schema=tb['dataset'], + table_name=tb['tableName'], + source=tb['gitHub'])) + + if tb.get('parentJobUrl'): + logging.debug("Writing parentJobUrl " + tb['tableName'] + " : " + tb['parentJobUrl']) + final_output.append(Application(cluster=tb['gcpProject'], + db_name='bigquery', + schema=tb['dataset'], + table_name=tb['tableName'], + application_url_template=tb['parentJobUrl'], + task_id=tb['dataset'], + dag_id=tb['tableName'])) + + for i in final_output: + yield i + + def get_scope(self) -> str: + """ + Extractor namespace. This is used by config + """ + return 'extractor.owner_extractor' diff --git a/databuilder/tests/king/__init__.py b/databuilder/tests/king/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/databuilder/tests/king/conftest.py b/databuilder/tests/king/conftest.py new file mode 100644 index 0000000000..ae65bd4e6a --- /dev/null +++ b/databuilder/tests/king/conftest.py @@ -0,0 +1,529 @@ +from mock import MagicMock, patch +import pytest +from pyhocon import ConfigFactory + +from databuilder.extractor.king_extractor_bq_base import BqResource +from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor +from databuilder.extractor.king_extractor_bq_table import KingTableExtractor +from databuilder.extractor.king_extractor_owner import OwnerExtractor + +from databuilder.extractor.base_bigquery_extractor import DatasetRef +from databuilder import Scoped + + +@pytest.fixture() +def base_extractor_1(): + """ + Return base extractor fixture + """ + extractor = KingBaseBigQueryExtractor() + extractor._page_dataset_list_results = MagicMock(name='_page_dataset_list_results') + extractor._page_dataset_list_results.return_value = [{ + 'datasets': + [{'datasetReference': {'datasetId': 'advertising_model', 'projectId': 'king-datacommons-prod'}}, + {'datasetReference': {'datasetId': 'campaign_roi', 'projectId': 'king-datacommons-prod'}}, + {'datasetReference': {'datasetId': 'core_model', 'projectId': 'king-datacommons-prod'}}, + {'datasetReference': {'datasetId': 'random', 'projectId': 'king-datacommons-prod'}} + ]}] + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + + return extractor + + +def get_extractor_mock(): + """ + Mock functions that read directly BQ + """ + extractor = KingTableExtractor() + extractor._retrieve_datasets = MagicMock(name='_retrieve_datasets') + extractor._retrieve_watermark = MagicMock(name='_retrieve_watermark') + extractor._retrieve_commonly_used = MagicMock(name='_retrieve_commonly_used') + extractor._retrieve_tables = MagicMock(name='_retrieve_tables') + extractor._retrieve_table_detail = MagicMock(name='_retrieve_table_detail') + extractor._retrieve_source_table = MagicMock(name='_retrieve_source_table') + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_1(_): + """ + For testing the scenario where target is a table + """ + extractor = get_extractor_mock() + + bq_output = {'tableReference': {'projectId': 'king-bp-tr-prod', + 'datasetId': 'niveles', + 'tableId': 'aps_moves'}, + 'description': 'This is aps_moves table', + 'type': 'TABLE', + 'numRows': '436768996037', + 'lastModifiedTime': '1616640834900', + 'timePartitioning': {'type': 'DAY', + 'field': 'dt', + 'requirePartitionFilter': True, + 'expirationMs': 2592000000}, + 'clustering': {'fields': ['kingpdjoinkey']}, + 'schema': {'fields': + [{'name': 'flavourid', 'type': 'INTEGER', 'mode': 'NULLABLE', + 'description': 'flavourid description'}, + {'name': 'dt', 'type': 'DATE', 'mode': 'NULLABLE', + 'description': 'The dt is the date field'}, + {'name': 'kingpdjoinkey', 'type': 'STRING', 'mode': 'NULLABLE', + 'description': 'kingpdjoinkey description'}, + ]}} + extractor._retrieve_datasets.return_value = [DatasetRef('dummy_dataset', 'dummy_project')] + extractor._retrieve_watermark.return_value = {'ts': '1616640834900', 'low': '19700101', 'high': '20210101'} + extractor._retrieve_table_detail.return_value = BqResource(bq_output) + extractor._retrieve_source_table.return_value = BqResource(bq_output) + extractor._retrieve_commonly_used.return_value = {'king-bp-tr-prod.niveles.aps_moves': + [ + '[core_model.f_purchase]({frontend_url}/table_detail/king-datacommons-prod/bigquery/core_model/f_purchase)', + '[reference_data.date]({frontend_url}/table_detail/king-datacommons-prod/bigquery/reference_data/date)']} + extractor._retrieve_tables.return_value = [{'projectId': 'king-bp-tr-prod', + 'datasetId': 'niveles', + 'tableId': 'aps_moves'}] + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-bp-tr-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com' + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_2(_): + """ + For testing the scenario where target is a view and with a single source table + """ + extractor = get_extractor_mock() + bq_target = {'tableReference': {'projectId': 'king-datacommons-prod', + 'datasetId': 'core_model', + 'tableId': 'd_abtestassignment'}, + 'description': " The table provides a snapshot of information for test case assignments per Title", + 'schema': {'fields': [{'name': 'kingappid', 'type': 'INTEGER', 'mode': 'NULLABLE', + 'description': 'kingappid target'}, + {'name': 'abtestname', 'type': 'STRING', 'mode': 'NULLABLE', + 'description': 'abtestname target description'}, + ]}, + 'numBytes': '0', + 'numLongTermBytes': '0', + 'numRows': '0', + 'creationTime': '1541526128483', + 'lastModifiedTime': '1615017627104', + 'type': 'VIEW', + 'view': {'query': ' SELECT * FROM `king-coredatasets-prod.core_model.d_abtestassignment`'}, + 'location': 'EU'} + + bq_source = {'tableReference': {'projectId': 'king-coredatasets-prod', 'datasetId': 'core_model', + 'tableId': 'd_abtestassignment'}, + 'description': None, + 'schema': { + 'fields': [{'name': 'kingappid', 'type': 'INTEGER', 'description': 'kingappid source description'}, + ]}, + 'numRows': '624863297365', + 'creationTime': '1593702292959', + 'lastModifiedTime': '1616817449510', + 'type': 'TABLE', + 'location': 'EU', + 'clustering': {'fields': ['kingappid', 'abtestname', 'abtestversion']}, + 'rangePartitioning': {'field': 'kingappid', 'range': {'start': '0', 'end': '9999', 'interval': '1'}} + } + + extractor._retrieve_datasets.return_value = [DatasetRef('dummy_dataset', 'dummy_project')] + extractor._retrieve_watermark.return_value = {'ts': '1616640834900', 'low': '19700101', 'high': '20210101'} + extractor._retrieve_table_detail.return_value = BqResource(bq_target) + extractor._retrieve_source_table.return_value = BqResource(bq_source) + extractor._retrieve_commonly_used.return_value = {} + extractor._retrieve_tables.return_value = [{'projectId': 'king-datacommons-prod', + 'datasetId': 'core_model', + 'tableId': 'd_abtestassignment'}] + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-bp-tr-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com' + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_3(_): + """ + For testing the scenario where target is a view but no singular source has been found. + E.g. there are multiple source tables or none at all + """ + extractor = get_extractor_mock() + bq_target = {'tableReference': {'projectId': 'king-target-prod', + 'datasetId': 'target_dataset', + 'tableId': 'target'}, + 'description': "target table view", + 'schema': {'fields': [{'name': 'column_1', 'type': 'INTEGER', 'description': 'column_1 target'}, + {'name': 'column_2', 'type': 'STRING', 'description': 'column_2 target'} + ]}, + 'numBytes': '0', + 'numLongTermBytes': '0', + 'numRows': '0', + 'creationTime': '1541526128483', + 'lastModifiedTime': '1615017627104', + 'type': 'VIEW', + 'view': {'query': ' SELECT * FROM `king-sourceproject-prod.source_dataset.source`'}, + 'location': 'EU'} + + extractor._retrieve_datasets.return_value = [DatasetRef('dummy_dataset', 'dummy_project')] + extractor._retrieve_watermark.return_value = None + extractor._retrieve_table_detail.return_value = BqResource(bq_target) + extractor._retrieve_source_table.return_value = None + extractor._retrieve_commonly_used.return_value = {} + extractor._retrieve_tables.return_value = [{'projectId': 'king-target-prod', + 'datasetId': 'target_dataset', + 'tableId': 'target'}] + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-target-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com' + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_4(_): + """ + This mock returns 10 tables but our extractor is configured to processes only 2 of those - this tests if batches + works correctly + """ + extractor = get_extractor_mock() + tb = {'tableReference': {'projectId': 'king-target-prod', 'datasetId': 'target_dataset', 'tableId': 'target'}, + 'lastModifiedTime': '1615017627104', + 'type': 'VIEW', + 'view': {'query': ' SELECT * FROM `king-random-prod.random.random`'}, + } + extractor._retrieve_datasets.return_value = [DatasetRef('dummy_dataset', 'dummy_project')] + extractor._retrieve_watermark.return_value = None + extractor._retrieve_source_table.return_value = None + extractor._retrieve_table_detail.return_value = BqResource(tb) + extractor._retrieve_commonly_used.return_value = {} + extractor._retrieve_tables.return_value = [{'projectId': 'p', 'datasetId': 'd', 'tableId': 't'}] * 10 + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-target-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com', + 'extractor.king_table_extractor.batch': '0-2' + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_5(_): + """ + Output an event table to test tags + """ + bq_target = {'tableReference': {'projectId': 'king-datacommons-prod', + 'datasetId': 'event', + 'tableId': 'sample_event'}, + 'projectId': 'king-datacommons-prod', + 'datasetId': 'event', + 'tableId': 'sample_event', + 'type': 'VIEW', + 'view': {'query': ' SELECT * FROM `king-random-prod.random.random`'}, + 'lastModifiedTime': '1615017627104' + } + extractor = get_extractor_mock() + extractor._retrieve_datasets.return_value = [DatasetRef('event', 'king-datacommons-prod')] + extractor._retrieve_watermark.return_value = None + extractor._retrieve_source_table.return_value = None + extractor._retrieve_commonly_used.return_value = {} + extractor._retrieve_table_detail.return_value = BqResource(bq_target) + extractor._retrieve_tables.return_value = [bq_target] + extractor.retrieve_event_classification = MagicMock(name='retrieve_event_classification', + return_value={'sample_event': ['tag_1', 'tag_2']}) + + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-datacommons-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com', + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def table_extractor_6(_): + """ + For testing the scenario where target has multiple source tables. + """ + bq_target = {'projectId': 'king-target-prod', 'datasetId': 'target_dataset', 'tableId': 'target', + 'tableReference': {'projectId': 'king-target-prod', 'datasetId': 'target_dataset', + 'tableId': 'target'}, + 'description': "target table view description", + 'lastModifiedTime': '1434345513000', + 'type': 'VIEW', + 'view': {'query': ' SELECT * FROM `king-random-prod.random.random`'}} + + referenced_tables = \ + { + 'is_singular_source': False, + 'uses_wildcard': False, + 'is_logical_view': False, + 'tables': [ + BqResource( + {'tableReference': {'projectId': 'king-source-prod', 'datasetId': 'source_dataset', + 'tableId': 'source_1'}, + 'lastModifiedTime': '1615017627104', 'type': 'TABLE'}), + BqResource( + {'tableReference': {'projectId': 'king-source-prod', 'datasetId': 'source_dataset', + 'tableId': 'source_2'}, + 'lastModifiedTime': '1560575913000', 'type': 'TABLE'}) + ]} + + extractor = KingTableExtractor() + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + extractor._retrieve_datasets = MagicMock(name='_retrieve_datasets', + return_value=[DatasetRef('dummy_dataset', 'dummy_project')]) + extractor._retrieve_watermark = MagicMock(name='_retrieve_watermark', return_value=None) + extractor._retrieve_commonly_used = MagicMock(name='_retrieve_commonly_used', return_value={}) + extractor._retrieve_tables = MagicMock(name='_retrieve_tables', return_value=[bq_target]) + extractor._retrieve_table_detail = MagicMock(name='_retrieve_table_detail', return_value=BqResource(bq_target)) + extractor._retrieve_referenced_tables = MagicMock(name='_retrieve_referenced_tables', + return_value=referenced_tables) + extractor._retrieve_row_count = MagicMock(name='_retrieve_row_count', return_value=(10, 'full table')) + + extractor.init(Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + { + 'extractor.king_table_extractor.project_id': 'king-target-prod', + 'extractor.king_table_extractor.billing_project': 'king-billing-project-prod', + 'extractor.king_table_extractor.frontend_url': 'https://frontend_url.king.com' + } + ), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture +def usage_extractor_config(): + config_dict = { + 'extractor.king_usage_extractor.project_id': 'king-datacommons-prod', + 'extractor.king_usage_extractor.billing_project': 'king-billing-project-prod' + } + return ConfigFactory.from_dict(config_dict) + + +@pytest.fixture +def usage_basic_extraction_input(): + return [ + {'project': 'king-datacommons-prod', + 'dataset': 'core_model', + 'table': 'f_activity_summary', + 'email': 'john.constantine@king.com', + 'query_count': 10}, + {'project': 'king-datacommons-prod', + 'dataset': 'core_model', + 'table': 'f_purchase', + 'email': 'sa@king-project-prod.iam.gserviceaccount.com ', + 'query_count': 10} + ] + + +def owner_extractor_config(): + return ConfigFactory.from_dict({ + 'extractor.owner_extractor.project_id': 'king-datacommons-prod', + 'extractor.owner_extractor.billing_project': 'king-billing-project-prod', + 'extractor.owner_extractor.kdn_url': 'https://example-url.com/bla', + 'extractor.owner_extractor.service_account_name': 'sa@king-project-prod.gserviceaccount.coom'}) + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_full_attributes(_): + extractor = OwnerExtractor() + extractor._get_merged_tables = MagicMock( + name='_get_merged_tables', + return_value=[{'gcpProject': 'king-datacommons-prod', + 'dataset': 'ds_spear_of_destiny', + 'tableName': 'f_last_known_locations', + 'owningTeamEmail': 'the_cool_guys@king.com', + 'gitHub': 'https://github.com/angel-city', + 'parentJobUrl': 'https://airflow-job'}]) + + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_empty_attributes(_): + extractor = OwnerExtractor() + extractor._get_merged_tables = MagicMock( + name='_get_merged_tables', + return_value=[{'name': 'ancient-artifacts', + 'owningTeamEmail': None, + 'gitHub': None, + 'parentJobUrl': None}]) + + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_map_owners(_): + extractor = OwnerExtractor() + ref = {'projectId': 'king-datacommons-prod', 'datasetId': 'ds_spear_of_destiny', 'tableId': 'f_last_known_locations'} + bq_table = {**{'tableReference': ref}, **ref, 'type': 'TABLE'} + extractor._retrieve_bq_tables = MagicMock( + name='_retrieve_bq_tables', + return_value=[bq_table]) + + extractor._retrieve_table_detail = MagicMock( + name='_retrieve_table_detail', + return_value=BqResource(bq_table)) + + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + + extractor._get_kdn_table_map = MagicMock( + name='_get_kdn_table_map', + return_value={'king-datacommons-prod.ds_spear_of_destiny.f_last_known_locations': {}}) + extractor.TEAM_TO_DATASET_MAPPING = { + 'king-datacommons-prod.ds_spear_of_destiny': 'owner_mapped_from_config@king.com'} + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_fallback_to_operation(_): + extractor = OwnerExtractor() + ref = {'projectId': 'king-datacommons-prod', + 'datasetId': 'ds_spear_of_destiny', + 'tableId': 'f_last_known_locations'} + bq_table = {**{'tableReference': ref}, **ref, 'type': 'TABLE'} + extractor._retrieve_bq_tables = MagicMock( + name='_retrieve_kdn_operations', + return_value=[bq_table]) + + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + + extractor._retrieve_table_detail = MagicMock( + name='_retrieve_table_detail', + return_value=BqResource(bq_table)) + + extractor._retrieve_kdn_operations = MagicMock( + name='_retrieve_kdn_operations', + return_value=[{'name': 'ancient-artifacts', + 'owningTeamEmail': 'operations email', + 'gitHub': 'operations repo', + 'parentJobUrl': 'operations app link'}]) + + extractor._retrieve_kdn_tables = MagicMock( + name='_retrieve_kdn_tables', + return_value=[{'gcpProject': 'king-datacommons-prod', + 'dataset': 'ds_spear_of_destiny', + 'tableName': 'f_last_known_locations', + 'parentObject': 'ancient-artifacts', + 'owningTeamEmail': None, + 'gitHub': None, + 'parentJobUrl': None}]) + + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_default_to_table(_): + extractor = OwnerExtractor() + ref = {'projectId': 'king-datacommons-prod', 'datasetId': 'ds_spear_of_destiny', + 'tableId': 'f_last_known_locations', 'type': 'TABLE'} + bq_table = {**{'tableReference': ref}, **ref} + + extractor._retrieve_bq_tables = MagicMock( + name='_retrieve_bq_tables', + return_value=[bq_table]) + + extractor._retrieve_table_detail = MagicMock( + name='_retrieve_table_detail', + return_value=BqResource(bq_table)) + + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[]) + + extractor._retrieve_kdn_operations = MagicMock( + name='_retrieve_kdn_operations', + return_value=[{'name': 'ancient-artifacts', + 'owningTeamEmail': None, + 'gitHub': None, + 'parentJobUrl': None}]) + + extractor._retrieve_kdn_tables = MagicMock( + name='_retrieve_kdn_tables', + return_value=[{'gcpProject': 'king-datacommons-prod', + 'dataset': 'ds_spear_of_destiny', + 'tableName': 'f_last_known_locations', + 'parentObject': 'ancient-artifacts', + 'owningTeamEmail': 'table email', + 'gitHub': 'table repo', + 'parentJobUrl': 'table app link'}]) + + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor + + +@pytest.fixture() +@patch('databuilder.extractor.base_bigquery_extractor.build') +def owner_extractor_default_to_data_catalog(_): + extractor = OwnerExtractor() + ref = {'projectId': 'king-datacommons-prod', 'datasetId': 'ds_spear_of_destiny', + 'tableId': 'f_last_known_locations', 'type': 'TABLE'} + bq_table = {**{'tableReference': ref}, **ref} + + from google.cloud.datacatalog_v1 import Tag, TagField + tag = Tag() + data_owner_field, github_field, etl_field = TagField(), TagField(), TagField() + data_owner_field.string_value = 'data catalog email' + github_field.string_value = 'data catalog repo' + etl_field.string_value = 'data catalog etl' + tag.fields = {'owner_email_address': data_owner_field, + 'github_url': github_field, + 'etl_url': etl_field} + + extractor.retrieve_data_catalog_tags = MagicMock(name='retrieve_data_catalog_tags', return_value=[tag]) + + extractor._retrieve_bq_tables = MagicMock( + name='_retrieve_bq_tables', + return_value=[bq_table]) + + extractor._retrieve_table_detail = MagicMock( + name='_retrieve_table_detail', + return_value=BqResource(bq_table)) + + extractor._retrieve_kdn_operations = MagicMock( + name='_retrieve_kdn_operations', + return_value=[{'name': 'ancient-artifacts', + 'owningTeamEmail': None, + 'gitHub': None, + 'parentJobUrl': None}]) + + extractor._retrieve_kdn_tables = MagicMock( + name='_retrieve_kdn_tables', + return_value=[{'gcpProject': 'king-datacommons-prod', + 'dataset': 'ds_spear_of_destiny', + 'tableName': 'f_last_known_locations', + 'parentObject': 'ancient-artifacts', + 'owningTeamEmail': 'table email', + 'gitHub': 'table repo', + 'parentJobUrl': 'table app link'}]) + + extractor.init(Scoped.get_scoped_conf(conf=owner_extractor_config(), scope=extractor.get_scope())) + return extractor diff --git a/databuilder/tests/king/test_extractor_bq_base.py b/databuilder/tests/king/test_extractor_bq_base.py new file mode 100644 index 0000000000..4a9331c20f --- /dev/null +++ b/databuilder/tests/king/test_extractor_bq_base.py @@ -0,0 +1,29 @@ +import pytest +from pyhocon import ConfigFactory +from databuilder import Scoped +from mock import Mock, patch + + +@pytest.mark.parametrize('filter_type,filter_dataset_name,expected_result,expected_length', { + ('exclude', 'core_model,campaign_roi', 'advertising_model,random', 2), + ('exclude', 'random', 'advertising_model,campaign_roi,core_model', 3), + ('include', 'core_model', 'core_model', 1), + ('include', 'advertising_model,campaign_roi,random', 'advertising_model,campaign_roi,random', 3), + ('include', 'advertising_model,campaign_roi,random', 'advertising_model, campaign_roi, random', 3), + ('include', 'some_other_random_dataset', '', 0), + ('exclude', 'some_other_random_dataset', 'advertising_model,campaign_roi,core_model,random', 4), + (None, None, 'advertising_model,campaign_roi,core_model,random', 4) +}) +@patch('databuilder.extractor.base_bigquery_extractor.build') +def test_filter(_, base_extractor_1, filter_type, filter_dataset_name, expected_result, expected_length): + base_extractor_1.init( + Scoped.get_scoped_conf(conf=ConfigFactory.from_dict( + {'extractor.bq_base_extractor.project_id': 'king-datacommons-prod', + 'extractor.bq_base_extractor.billing_project': 'king-billing-project-prod', + 'extractor.bq_base_extractor.filter_dataset_name': filter_dataset_name, + 'extractor.bq_base_extractor.filter_type': filter_type}), + scope=base_extractor_1.get_scope())) + expected_result = [] if expected_length == 0 else [str.strip(i) for i in expected_result.split(',')] + actual_result = base_extractor_1._retrieve_datasets() + assert len(actual_result) == expected_length + assert [d.datasetId for d in actual_result] == expected_result diff --git a/databuilder/tests/king/test_extractor_bq_table.py b/databuilder/tests/king/test_extractor_bq_table.py new file mode 100644 index 0000000000..1cb53a4c80 --- /dev/null +++ b/databuilder/tests/king/test_extractor_bq_table.py @@ -0,0 +1,252 @@ +def run_full_extraction(extractor): + extractor_output = [] + while True: + result = extractor.extract() + if not result: + break + extractor_output.append(result) + return extractor_output + + +def test_output_table_target(table_extractor_1): + """ + For a target that is a table - check we output the right types of objects and the right number of them + """ + result = run_full_extraction(table_extractor_1) + assert len(result) == 8 + assert result[0].__class__.__name__ == 'TableMetadata' + assert result[1].__class__.__name__ == 'TableMetadata' + assert result[2].__class__.__name__ == 'TableLastUpdated' + assert result[3].__class__.__name__ == 'BadgeMetadata' + assert result[4].__class__.__name__ == 'BadgeMetadata' + assert result[5].__class__.__name__ == 'Watermark' + assert result[6].__class__.__name__ == 'Watermark' + assert result[7].__class__.__name__ == 'ESLastUpdated' + + +def test_metadata_table_target(table_extractor_1): + """ + A target that is a table - verify the output of TableMetadata + """ + extractor_output = run_full_extraction(table_extractor_1) + result = extractor_output[0] + assert result.__class__.__name__ == 'TableMetadata' + assert result.database == 'bigquery' + assert result.cluster == 'king-bp-tr-prod' + assert result.schema == 'niveles' + assert result.name == 'aps_moves' + assert result.is_view is False + assert result.tags == [] + assert len(result.columns) == 3 + assert result.columns[0].name == 'flavourid' + assert result.columns[0].type == 'INTEGER' + assert result.columns[0].description.text == 'flavourid description' + assert result.columns[1].name == 'dt' + assert result.columns[1].type == 'DATE' + assert result.columns[1].description.text == 'The dt is the date field' + assert result.columns[2].name == 'kingpdjoinkey' + assert result.columns[2].type == 'STRING' + assert result.columns[2].description.text == 'kingpdjoinkey description' + assert result.description.text \ + == 'This is aps_moves table\n\\\n \n' \ + '##### Data Retention\n30 days\n\\\n \n' \ + '##### Partition\ndt (DAY)\n\\\n \n' \ + '##### Cluster Columns\nkingpdjoinkey\n\\\n \n' \ + '##### Rows\n436,768,996,037 (full table)' + + +def test_commonly_used_with_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[1] + assert result.__class__.__name__ == 'TableMetadata' + assert result.database == 'bigquery' + assert result.cluster == 'king-bp-tr-prod' + assert result.schema == 'niveles' + assert result.name == 'aps_moves' + assert result.columns == [] + assert result.tags == [] + assert result.description.text \ + == '[core_model.f_purchase]' \ + '(https://frontend_url.king.com/table_detail/king-datacommons-prod/bigquery/core_model/f_purchase)' \ + '\n\\\n' \ + '[reference_data.date]' \ + '(https://frontend_url.king.com/table_detail/king-datacommons-prod/bigquery/reference_data/date)' + + +def test_last_updated_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[2] + assert result.__class__.__name__ == 'TableLastUpdated' + assert result.db == 'bigquery' + assert result.cluster == 'king-bp-tr-prod' + assert result.schema == 'niveles' + assert result.table_name == 'aps_moves' + assert result.last_updated_time == 1616640834 + + +def test_badge_partition_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[3] + assert result.__class__.__name__ == 'BadgeMetadata' + assert result.start_key == 'bigquery://king-bp-tr-prod.niveles/aps_moves/dt' + assert result.start_label == 'Column' + + +def test_badge_cluster_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[4] + assert result.__class__.__name__ == 'BadgeMetadata' + assert result.start_key == 'bigquery://king-bp-tr-prod.niveles/aps_moves/kingpdjoinkey' + assert result.start_label == 'Column' + + +def test_high_watermark_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[5] + assert result.__class__.__name__ == 'Watermark' + assert result.database == 'bigquery' + assert result.cluster == 'king-bp-tr-prod' + assert result.schema == 'niveles' + assert result.table == 'aps_moves' + assert result.parts == [('dt', '20210101')] + assert result.part_type == 'high_watermark' + assert result.create_time == '2021-03-25 02:53:54' + + +def test_low_watermark_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[6] + assert result.__class__.__name__ == 'Watermark' + assert result.database == 'bigquery' + assert result.cluster == 'king-bp-tr-prod' + assert result.schema == 'niveles' + assert result.table == 'aps_moves' + assert result.parts == [('dt', '19700101')] + assert result.part_type == 'low_watermark' + assert result.create_time == '2021-03-25 02:53:54' + + +def test_neo4j_updated_table_target(table_extractor_1): + result = run_full_extraction(table_extractor_1)[7] + assert result.__class__.__name__ == 'ESLastUpdated' + assert result.timestamp.__class__.__name__ == 'int' + + +def test_output_target_view_with_singular_source(table_extractor_2): + """ + For a target that is a view which has a singular source - check we output the right types of objects and the right number of them + """ + result = run_full_extraction(table_extractor_2) + assert len(result) == 8 + assert result[0].__class__.__name__ == 'TableMetadata' + assert result[1].__class__.__name__ == 'TableMetadata' + assert result[2].__class__.__name__ == 'TableLastUpdated' + assert result[3].__class__.__name__ == 'BadgeMetadata' + assert result[4].__class__.__name__ == 'BadgeMetadata' + assert result[5].__class__.__name__ == 'BadgeMetadata' + assert result[6].__class__.__name__ == 'BadgeMetadata' + assert result[7].__class__.__name__ == 'ESLastUpdated' + + +def test_metadata_view_with_singular_source(table_extractor_2): + result = run_full_extraction(table_extractor_2)[0] + assert result.cluster == 'king-datacommons-prod' + assert result.schema == 'core_model' + assert result.name == 'd_abtestassignment' + assert len(result.columns) == 2 + assert result.description.text \ + == 'The table provides a snapshot of information for test case assignments per Title\n\\\n \n' \ + '##### Partition\nkingappid (Integer)\n\\\n \n' \ + '##### Cluster Columns\nkingappid, abtestname, abtestversion\n\\\n \n' \ + '##### Rows\n624,863,297,365 (full table)' + + +def test_last_updated_view_with_singular_source(table_extractor_2): + result = run_full_extraction(table_extractor_2)[2] + assert result.table_name == 'd_abtestassignment' + assert result.last_updated_time == 1616817449 + + +def test_badge_partition_view_with_singular_source(table_extractor_2): + result = run_full_extraction(table_extractor_2)[3] + assert result.__class__.__name__ == 'BadgeMetadata' + assert result.start_key == 'bigquery://king-datacommons-prod.core_model/d_abtestassignment/kingappid' + + +def test_badge_cluster_view_with_singular_source(table_extractor_2): + result = run_full_extraction(table_extractor_2) + assert result[4].__class__.__name__ == 'BadgeMetadata' + assert result[5].__class__.__name__ == 'BadgeMetadata' + assert result[6].__class__.__name__ == 'BadgeMetadata' + assert result[4].start_key == 'bigquery://king-datacommons-prod.core_model/d_abtestassignment/kingappid' + assert result[5].start_key == 'bigquery://king-datacommons-prod.core_model/d_abtestassignment/abtestname' + assert result[6].start_key == 'bigquery://king-datacommons-prod.core_model/d_abtestassignment/abtestversion' + + +def test_neo4j_updated_view_with_singular_source(table_extractor_2): + result = run_full_extraction(table_extractor_2)[7] + assert result.__class__.__name__ == 'ESLastUpdated' + assert result.timestamp.__class__.__name__ == 'int' + + +def test_output_no_source(table_extractor_3): + """ + A target with no identified source - check we output the right types of objects and the right number of them + """ + result = run_full_extraction(table_extractor_3) + assert len(result) == 4 + assert result[0].__class__.__name__ == 'TableMetadata' + assert result[1].__class__.__name__ == 'TableMetadata' + assert result[2].__class__.__name__ == 'TableLastUpdated' + assert result[3].__class__.__name__ == 'ESLastUpdated' + + +def test_metadata_no_source(table_extractor_3): + result = run_full_extraction(table_extractor_3)[0] + assert result.__class__.__name__ == 'TableMetadata' + assert result.cluster == 'king-target-prod' + assert result.schema == 'target_dataset' + assert result.name == 'target' + assert len(result.columns) == 2 + assert result.description.text == 'target table view' + + +def test_commonly_used_with_no_source(table_extractor_3): + result = run_full_extraction(table_extractor_3)[1] + assert result.__class__.__name__ == 'TableMetadata' + assert result.cluster == 'king-target-prod' + assert result.schema == 'target_dataset' + assert result.name == 'target' + assert result.columns == [] + assert result.description.text == '' + + +def test_batches(table_extractor_4): + """Batch parameter is passed - process only tables accordingly to the batch parameter """ + result = run_full_extraction(table_extractor_4) + assert len(result) == 7 + assert result[0].__class__.__name__ == 'TableMetadata' + assert result[1].__class__.__name__ == 'TableMetadata' + assert result[2].__class__.__name__ == 'TableLastUpdated' + assert result[3].__class__.__name__ == 'TableMetadata' + assert result[4].__class__.__name__ == 'TableMetadata' + assert result[5].__class__.__name__ == 'TableLastUpdated' + assert result[6].__class__.__name__ == 'ESLastUpdated' + + +def test_event_tag(table_extractor_5): + """Test that event tables get tags """ + result = table_extractor_5.extract() + assert table_extractor_5.dataset_tags == {'king-datacommons-prod.event': {'sample_event': ['tag_1', 'tag_2']}} + assert result.tags == ['tag_1', 'tag_2'] + assert result.name == 'sample_event' + + +def test_multiple_sources(table_extractor_6): + """ + When multiple sources found, test that extractor outputs the ts found in the most recently updated + referenced table. + Test that row count returned by _retrieve_row_count function is added. + """ + result = run_full_extraction(table_extractor_6) + assert len(result) == 4 + assert result[0].__class__.__name__ == 'TableMetadata' + assert result[1].__class__.__name__ == 'TableMetadata' + assert result[2].__class__.__name__ == 'TableLastUpdated' + assert result[3].__class__.__name__ == 'ESLastUpdated' + assert result[0].description.text == 'target table view description\n\\\n \n##### Rows\n10 (full table)' + assert result[2].last_updated_time == 1615017627 diff --git a/databuilder/tests/king/test_extractor_bq_usage.py b/databuilder/tests/king/test_extractor_bq_usage.py new file mode 100644 index 0000000000..8a425eab5d --- /dev/null +++ b/databuilder/tests/king/test_extractor_bq_usage.py @@ -0,0 +1,71 @@ +from databuilder.extractor.king_extractor_bq_usage import KingUsageExtractor + +from mock import patch +from databuilder import Scoped + + +@patch('google.auth.default', lambda scopes: ['dummy', 'dummy']) +@patch('databuilder.extractor.king_extractor_bq_usage.KingUsageExtractor._get_usage_records') +@patch('databuilder.extractor.base_bigquery_extractor.build') +def test_basic_extraction(_, _get_usage_records, usage_extractor_config, usage_basic_extraction_input): + """ + Basic extraction + """ + _get_usage_records.return_value = usage_basic_extraction_input + extractor = KingUsageExtractor() + extractor.init(Scoped.get_scoped_conf(conf=usage_extractor_config, scope=extractor.get_scope())) + + column_usage_result, user_result, _, sa_result = extractor.extract(), extractor.extract(), extractor.extract(), extractor.extract() + + assert extractor.get_scope() == 'extractor.king_usage_extractor' + assert extractor.project_id == 'king-datacommons-prod' + assert extractor.billing_project == 'king-billing-project-prod' + + assert column_usage_result.__class__.__name__ == 'ColumnReader' + assert user_result.__class__.__name__ == 'User' + + assert column_usage_result.start_key == 'bigquery://king-datacommons-prod.core_model/f_activity_summary' + assert column_usage_result.user_email == 'john.constantine@king.com' + assert column_usage_result.read_count == 10 + + # assert user_result.__getattribute__('first_name') == 'john' also works + assert user_result.first_name == 'john' + assert user_result.last_name == 'constantine' + assert user_result.full_name == 'John Constantine' + assert user_result.email == 'john.constantine@king.com' + assert user_result.github_username == 'https://github.int.midasplayer.com/john-constantine' + assert user_result.team_name == ' ' + assert user_result.manager_email == '' + assert user_result.employee_type == '' + assert user_result.slack_id == '' + assert user_result.is_active is True + assert user_result.updated_at == 0 + assert user_result.role_name == ' ' + + assert sa_result.first_name == 'sa' + assert sa_result.last_name == 'king-project-prod' + assert sa_result.full_name == 'sa king-project-prod' + assert sa_result.email == 'sa@king-project-prod.iam.gserviceaccount.com ' + assert sa_result.github_username == ' ' + assert sa_result.team_name == 'king-project-prod' + assert sa_result.manager_email == '' + assert sa_result.employee_type == '' + assert sa_result.slack_id == '' + assert sa_result.is_active is True + assert sa_result.updated_at == 0 + assert sa_result.role_name == 'sa' + + +@patch('google.auth.default', lambda scopes: ['dummy', 'dummy']) +@patch('databuilder.extractor.king_extractor_bq_usage.KingUsageExtractor._get_usage_records') +@patch('databuilder.extractor.base_bigquery_extractor.build') +def test_no_entries(_, get_usage_records, usage_extractor_config): + get_usage_records.return_value = [] + extractor = KingUsageExtractor() + extractor.init(Scoped.get_scoped_conf(conf=usage_extractor_config, scope=extractor.get_scope())) + + result = extractor.extract() + result_2 = extractor.extract() + + assert result is None + assert result_2 is None diff --git a/databuilder/tests/king/test_extractor_owner.py b/databuilder/tests/king/test_extractor_owner.py new file mode 100644 index 0000000000..48d91f0ec0 --- /dev/null +++ b/databuilder/tests/king/test_extractor_owner.py @@ -0,0 +1,80 @@ +def test_basic_extraction(owner_extractor_full_attributes): + """ + Base case + """ + extractor = owner_extractor_full_attributes + owner, source, pipeline = extractor.extract(), \ + extractor.extract(), \ + extractor.extract() + + assert owner.__class__.__name__ == 'TableOwner' + assert owner.start_key == 'bigquery://king-datacommons-prod.ds_spear_of_destiny/f_last_known_locations' + assert owner.start_label == 'Table' + + assert source.__class__.__name__ == 'TableSource' + assert source.cluster == 'king-datacommons-prod' + assert source.db == 'bigquery' + assert source.schema == 'ds_spear_of_destiny' + assert source.table == 'f_last_known_locations' + assert source.source_type == 'github' + assert source.source == 'https://github.com/angel-city' + + assert pipeline.__class__.__name__ == 'Application' + assert pipeline.cluster == 'king-datacommons-prod' + assert pipeline.database == 'bigquery' + assert pipeline.schema == 'ds_spear_of_destiny' + assert pipeline.table == 'f_last_known_locations' + assert pipeline.dag == 'f_last_known_locations' + assert pipeline.application_url == 'https://airflow-job' + + +def test_extract_empty(owner_extractor_empty_attributes): + """ + Don't output any records if no data found in kdn + """ + extractor = owner_extractor_empty_attributes + owner, source, pipeline = extractor.extract(), extractor.extract(), extractor.extract() + + assert owner is None + assert source is None + assert pipeline is None + + +def test_use_owner_dataset_mapping(owner_extractor_map_owners): + """ + Make sure we get owners from the provided owner_mapping if no owner found in KDN + """ + assert owner_extractor_map_owners.extract().owner_emails == ['owner_mapped_from_config@king.com'] + + +def test_default_to_table(owner_extractor_default_to_table): + """ + If we don't find the data in KDN operation, we source it from KDN table + """ + extractor = owner_extractor_default_to_table + owner, source, pipeline = extractor.extract(), extractor.extract(), extractor.extract() + + assert owner.owner_emails == ['table email'] + assert source.source == 'table repo' + assert pipeline.application_url == 'table app link' + + +def test_fallback_to_operation(owner_extractor_fallback_to_operation): + """ + If we don't find the data in KDN table, fall back to KDN operation + """ + extractor = owner_extractor_fallback_to_operation + owner, source, pipeline = extractor.extract(), extractor.extract(), extractor.extract() + + assert owner.owner_emails == ['operations email'] + assert source.source == 'operations repo' + assert pipeline.application_url == 'operations app link' + + +def test_pick_data_catalog_over_kdn(owner_extractor_default_to_data_catalog): + extractor = owner_extractor_default_to_data_catalog + owner, source, pipeline = extractor.extract(), extractor.extract(), extractor.extract() + + assert owner.owner_emails == ['data catalog email'] + assert source.source == 'data catalog repo' + assert pipeline.application_url == 'data catalog etl' From 78118b0c49151f92912da22791548c01ce90560e Mon Sep 17 00:00:00 2001 From: Judy Palimonka Date: Mon, 19 Jul 2021 13:30:22 +0200 Subject: [PATCH 2/3] chore(databuilder): update python package name --- databuilder/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databuilder/setup.py b/databuilder/setup.py index 4f9cd2aff0..6be0fb35a1 100644 --- a/databuilder/setup.py +++ b/databuilder/setup.py @@ -83,7 +83,7 @@ bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds + atlas setup( - name='amundsen-databuilder', + name='amundsen-databuilder-king', version=__version__, description='Amundsen Data builder', url='https://www.github.com/amundsen-io/amundsen/tree/main/databuilder', From 0195ab7773cb7f2a9ee541b3081436db9c599a3f Mon Sep 17 00:00:00 2001 From: Judy Palimonka Date: Mon, 19 Jul 2021 13:31:19 +0200 Subject: [PATCH 3/3] ci: create .drone.yml to start testing databuilder --- .drone.yml | 16 ++++++++++++++++ databuilder/setup.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 .drone.yml diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000000..576f32a5ec --- /dev/null +++ b/.drone.yml @@ -0,0 +1,16 @@ +--- +kind: pipeline +name: default +type: kubernetes + + +environment: + PIP_INDEX_URL: https://artifactory-edge.ess.midasplayer.com/artifactory/api/pypi/pypi-all/simple + +steps: + - name: Databuilder Tests + image: python:3.7-slim + commands: + - cd databuilder + - pip3 install -r requirements-dev.txt + - python3 -bb -m pytest tests/king/test_extractor_owner.py --cov-fail-under=0 diff --git a/databuilder/setup.py b/databuilder/setup.py index 6be0fb35a1..6f0aa072f1 100644 --- a/databuilder/setup.py +++ b/databuilder/setup.py @@ -4,7 +4,7 @@ from setuptools import find_packages, setup -__version__ = '5.2.2' +__version__ = '5.2.3.dev1' requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') with open(requirements_path) as requirements_file: