Skip to content

Commit

Permalink
Merge pull request amundsen-io#1 from data-foundations/feature/DFND-761
Browse files Browse the repository at this point in the history
feat(databuilder): add custom extractors
  • Loading branch information
Judy Palimonka authored and GitHub Enterprise committed Jul 19, 2021
2 parents 9db18bc + 0195ab7 commit f49245f
Show file tree
Hide file tree
Showing 13 changed files with 2,195 additions and 2 deletions.
16 changes: 16 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
kind: pipeline
name: default
type: kubernetes


environment:
PIP_INDEX_URL: https://artifactory-edge.ess.midasplayer.com/artifactory/api/pypi/pypi-all/simple

steps:
- name: Databuilder Tests
image: python:3.7-slim
commands:
- cd databuilder
- pip3 install -r requirements-dev.txt
- python3 -bb -m pytest tests/king/test_extractor_owner.py --cov-fail-under=0
564 changes: 564 additions & 0 deletions databuilder/databuilder/extractor/king_extractor_bq_base.py

Large diffs are not rendered by default.

164 changes: 164 additions & 0 deletions databuilder/databuilder/extractor/king_extractor_bq_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import logging
from datetime import datetime
from pyhocon import ConfigTree

from databuilder.models.table_stats import TableColumnStats

from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor
from databuilder.extractor.king_extractor_bq_base import BqResource

LOGGER = logging.getLogger(__name__)
STAT_LABELS = ['approximate_distinct_values', 'non_null_values', 'null_values']
GB_PROCESSED_CAP = 50
NUM_RETRIES = 3


def get_stat_sql(columns: [str],
target: BqResource,
partition_clause: str = None):
nested_records = {}
for c in [c for c in target.columns if c.get('mode') == 'REPEATED']:
nested_records[c['name']] = c['type']

def get_final_col(c):
if '.' in c:
col_alias, col_name = c.split('.')
return f'{col_alias}.`{col_name}`'
elif c in nested_records.keys():
return f'`{c}`'
else:
return f'mainTbAlias.`{c}`'

sql = 'SELECT\n'

for c in columns:
col = get_final_col(c)
alias = c.replace('.', '_')
sql += f'\t/* Stats for column {col} */\n'
sql += f"\tAPPROX_COUNT_DISTINCT({col}) as {alias}_approximate_distinct_values,\n"
sql += f"\tCOUNTIF({col} IS NOT NULL) as {alias}_non_null_values,\n"
sql += f"\tCOUNTIF({col} IS NULL) as {alias}_null_values,\n"

sql += f'FROM `{target.name}` mainTbAlias \n'

for k, v in nested_records.items() or {}:
sql += f'LEFT JOIN UNNEST({k}) {k} \n'

sql += f'\nWHERE\n'

if partition_clause:
sql += partition_clause
else:
for c in columns:
col = get_final_col(c)
sql += f'({col} IS NULL OR {col} IS NOT NULL) AND \n'
sql += 'TRUE '
return sql


def format_stat_display_value(stat_value):
"""Separate thousands by comma or return original value on error"""
try:
return format(int(stat_value), ',')
except Exception as e:
return stat_value


def format_stat_display_name(col_name, stat_value):
"""
Populate display name of col stat
e.g. Converts stat_name such as 'col_name_non_null_values' and col col_name to 'Non Null Values'
"""
return ' '.join([str.capitalize(s) for s in stat_value.replace(col_name, '').split('_')])


class KingColumnStatExtractor(KingBaseBigQueryExtractor):

def init(self, conf: ConfigTree) -> None:
KingBaseBigQueryExtractor.init(self, conf)
self.iter = iter(self._iterate_over_tables())

def _get_tb_partition_details(self, source: BqResource):
partition_value, start_ts, end_ts = None, None, None
try:
sql = "SELECT MAX(partition_id) max_partition " \
f" FROM [{source.name}$__PARTITIONS_SUMMARY__] " \
" WHERE partition_id != '__UNPARTITIONED__' AND partition_id != '__NULL__' "
max_partition = self.get_query_results(use_legacy_sql=True, sql=sql)[0]['max_partition']
dt = datetime.strptime(max_partition, '%Y%m%d')
ts, partition_value = int(dt.timestamp()), dt.strftime('%Y-%m-%d')
start_ts, end_ts = ts, ts

except Exception as e:
logging.warning(f'Error getting table partition details for {source.name}. '
f'Partition value: {partition_value}. '
f'Partition {source.partition_name}. Error: {str(e)}')
finally:
return partition_value, source.partition_name, start_ts, end_ts

def _get_view_partition_details(self, target: BqResource):
max_partition, partition_col, start_ts, end_ts = None, None, None, None
try:
max_partition = self.get_query_results(f'SELECT MAX(dt) as m from `{target.name}`')[0]['m']
partition_col = 'dt'
ts = int(datetime.strptime(max_partition, '%Y-%m-%d').timestamp())
start_ts, end_ts = ts, ts
except Exception as e:
logging.warning(f'Error getting view partition details for {target.name}. '
f'Partition value: {max_partition}. '
f'Partition {target.partition_name}. Error: {str(e)}')
finally:
return max_partition, partition_col, start_ts, end_ts

def _iterate_over_tables(self):
"""
Get references to the customer-facing view and its underlying table
"""
for dataset in self._retrieve_datasets():
for t in self._retrieve_tables(dataset):
target: BqResource = self._retrieve_table_detail(t['projectId'], t['datasetId'], t['tableId'])
source: BqResource = self._get_source_table(target)

col_names = [c.name for c in self.get_columns(target) if c.type != 'RECORD']
partition_value, partition_col, start_ts, end_ts = None, None, 0, 0

if source and source.partitioned and source.partition_type == 'DAY':
partition_value, partition_col, start_ts, end_ts = self._get_tb_partition_details(source)

if not source and 'dt' in col_names:
partition_value, partition_col, start_ts, end_ts = self._get_view_partition_details(target)

try:
partition_clause = f' {partition_col} = "{partition_value}" ' if partition_value else None
stat_sql = get_stat_sql(col_names, target, partition_clause=partition_clause)
estimated_gb_processed = self.get_query_estimated_gb_processed(stat_sql)

if estimated_gb_processed <= GB_PROCESSED_CAP:
logging.warning(f'Getting stats for {target.name}... '
f'The stats query will process {estimated_gb_processed} GB. '
f'Partition clause: {partition_clause}')

tb_stats = self.get_query_results(sql=stat_sql, wait_seconds_to_complete=90)[0]

for col_name in col_names:
for stat_name in [col_name.replace('.', '_') + '_' + s for s in STAT_LABELS]:
yield TableColumnStats(
db='bigquery',
cluster=target.project,
schema=target.dataset,
table_name=target.table,
col_name=col_name,
stat_name=format_stat_display_name(col_name, stat_name),
stat_val=format_stat_display_value(tb_stats[stat_name]),
start_epoch=str(start_ts),
end_epoch=str(end_ts))
logging.info(f'table_stats:SUCCESS: {target.name}')
else:
logging.warning(f"table_stats:SKIPPED: {target.name} the estimated bytes processed "
f"{estimated_gb_processed} exceeded the limit of {GB_PROCESSED_CAP} GB")

except Exception as e:
logging.warning(f'table_stats:FAILURE: {target.name}. Error: {str(e)}')

def get_scope(self) -> str:
return 'extractor.king_column_stat_extractor'
162 changes: 162 additions & 0 deletions databuilder/databuilder/extractor/king_extractor_bq_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import logging
from typing import Dict, List
import datetime
from time import time

from databuilder.models.es_last_updated import ESLastUpdated
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.badge import BadgeMetadata, Badge
from databuilder.models.table_last_updated import TableLastUpdated
from databuilder.models.watermark import Watermark

from pyhocon import ConfigTree

from databuilder.extractor.king_extractor_bq_base import KingBaseBigQueryExtractor
from databuilder.extractor.king_extractor_bq_base import BqResource


LOGGER = logging.getLogger(__name__)
MD_SECTION_SEPARATOR = '\n\\\n'


class KingTableExtractor(KingBaseBigQueryExtractor):
FRONTEND_URL = 'frontend_url'
BATCH = 'batch'

def init(self, conf: ConfigTree) -> None:
KingBaseBigQueryExtractor.init(self, conf)
self.iter = iter(self._iterate_over_tables())
self.frontend_url = conf.get_string(KingTableExtractor.FRONTEND_URL)
self.batch = conf.get_string(KingTableExtractor.BATCH, None)
self.dataset_tags = {}

def _get_ui_metadata_description(self, source: BqResource, target: BqResource):
md_space = f'{MD_SECTION_SEPARATOR}&nbsp;\n##### '
grain, doc_url, slack, row_count_scope, row_count = None, None, None, None, None
try:
dc_tag = self.retrieve_data_catalog_tags(target, 'table')
if len(dc_tag) == 1:
tag = dc_tag[0]
grain = tag.fields['table_grain'].string_value if tag.fields.get('table_grain') else None
doc_url = tag.fields['documentation_url'].string_value if tag.fields.get('documentation_url') else None
slack = tag.fields['slack_channel'].string_value if tag.fields.get('slack_channel') else None
except Exception as e:
logging.error(f'Could not fetch an attribute from data catalog. Error: {str(e)}')

if source and source.row_count:
row_count_scope, row_count = source.row_count_scope, source.row_count
elif target.row_count:
row_count_scope, row_count = target.row_count_scope, target.row_count

ui_metadata_description = \
(f"{md_space}Documentation\n Click [here]({doc_url}) to access further documentation for this resource." if doc_url else '') + \
(f"{md_space}Data Retention\n{source.data_retention_days} days" if source and source.data_retention_days else '') + \
(f"{md_space}Partition\n{source.partition_name} ({source.partition_type})" if source and source.partitioned else '') + \
(f"{md_space}Cluster Columns\n{', '.join(source.cluster_fields)}" if source and source.clustered else '') + \
(f"{md_space}Rows\n{format(row_count, ',d')} ({row_count_scope})" if row_count else '') + \
(f"{md_space}Table Grain\n{grain}" if grain else '') + \
(f"{md_space}Slack Channel\n{slack}" if slack else '')

return ui_metadata_description

def _iterate_over_tables(self):
"""
Iterates over BigQuery datasets and tables and outputs relevant Amundsen model objects
"""
commonly_used_with: Dict = self._retrieve_commonly_used()

for dataset in self._retrieve_datasets():
self._set_custom_dataset_tags(dataset)

current_index, start_index, stop_index = 0, 0, 0
if self.batch:
start_index, stop_index = [int(x) for x in self.batch.split('-')]
logging.info(f'Batch parameter passed. Processing tables: {start_index} to {stop_index}')

for t in self._retrieve_tables(dataset):
current_index += 1
if not self.batch or (start_index <= current_index <= stop_index):
final_output = []
logging.info(f'Processing table {current_index}...')
target: BqResource = self._retrieve_table_detail(t['projectId'], t['datasetId'], t['tableId'])
source: BqResource = self._get_source_table(target)

# output final Amundsen objects
final_output.append(TableMetadata(
database='bigquery',
cluster=target.project,
schema=target.dataset,
name=target.table,
description=target.ui_table_description + self._get_ui_metadata_description(source, target),
columns=self.get_columns(target),
is_view=False,
tags=self.dataset_tags.get(target.dataset_ref, {}).get(target.table) or []))

final_output.append(TableMetadata(
database='bigquery',
cluster=target.project,
schema=target.dataset,
name=target.table,
description_source='COMMONLY_USED_WITH', # programmatic description
description=MD_SECTION_SEPARATOR.join([t.format(frontend_url=self.frontend_url)
for t in commonly_used_with.get(target.name, [])])))

if (source and source.last_modified_timestamp) or target.last_modified_timestamp:
final_output.append(TableLastUpdated(
last_updated_time_epoch=source.last_modified_timestamp
if source and source.last_modified_timestamp else target.last_modified_timestamp,
db='bigquery',
cluster=target.project,
schema=target.dataset,
table_name=target.table))

if source and source.partitioned:
final_output.append(BadgeMetadata(
start_label='Column',
start_key=f"bigquery://{target.project}.{target.dataset}/{target.table}/{source.partition_name}",
badges=[Badge(name='partition', category='column')]))

for field in source.cluster_fields if source and source.clustered else []:
final_output.append(BadgeMetadata(
start_label='Column',
start_key=f"bigquery://{target.project}.{target.dataset}/{target.table}/{field}",
badges=[Badge(name='cluster', category='column')]))

if source and source.partition_type == 'DAY':
high_watermark, low_watermark = self._extract_watermark(source, target)
if high_watermark and low_watermark:
final_output.append(high_watermark)
final_output.append(low_watermark)

for i in final_output:
yield i

yield ESLastUpdated(timestamp=int(time()))

def _extract_watermark(self, source: BqResource, target: BqResource) -> List:
"""
If source table has been found and partition dates could be retrieved, output Watermark model
"""
watermark_data = self._retrieve_watermark(source)
if watermark_data:
ts = datetime.datetime.utcfromtimestamp(float(watermark_data['ts']) / 1000.).strftime('%Y-%m-%d %H:%M:%S')
watermarks = []
for w in ['high', 'low']:
watermarks.append(
Watermark(part_type=f'{w}_watermark',
part_name=f'{source.partition_name}={watermark_data[w]}',
create_time=ts,
database='bigquery',
cluster=target.project,
schema=target.dataset,
table_name=target.table))
return watermarks
else:
return [None, None]

def _set_custom_dataset_tags(self, dataset):
if dataset.projectId == 'king-datacommons-prod' and dataset.datasetId == 'event':
self.dataset_tags['king-datacommons-prod.event'] = self.retrieve_event_classification()

def get_scope(self) -> str:
return 'extractor.king_table_extractor'
Loading

0 comments on commit f49245f

Please sign in to comment.