Skip to content

Commit

Permalink
(#1576) use the information schema on BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Sep 29, 2019
1 parent 509e0e8 commit 1a97915
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 119 deletions.
1 change: 1 addition & 0 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class TableMetadata(JsonSchemaMixin):
name: str
comment: Optional[str]
owner: Optional[str]
shards: Optional[str]


@dataclass
Expand Down
146 changes: 27 additions & 119 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import agate


GET_CATALOG_MACRO_NAME = 'get_catalog'


def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database='',
Expand Down Expand Up @@ -385,127 +388,32 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

###
# The get_catalog implementation for bigquery
###
def _flat_columns_in_table(self, table):
"""An iterator over the flattened columns for a given schema and table.
Resolves child columns as having the name "parent.child".
"""
for col in self._get_dbt_columns_from_bq_table(table):
yield from col.flatten()

@classmethod
def _get_stats_column_names(cls):
"""Construct a tuple of the column names for stats. Each stat has 4
columns of data.
def get_catalog(self, manifest):
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""
columns = []
stats = ('num_bytes', 'num_rows', 'location', 'partitioning_type',
'clustering_fields')
stat_components = ('label', 'value', 'description', 'include')
for stat_id in stats:
for stat_component in stat_components:
columns.append('stats:{}:{}'.format(stat_id, stat_component))
return tuple(columns)

@classmethod
def _get_stats_columns(cls, table, relation_type):
"""Given a table, return an iterator of key/value pairs for stats
column names/values.
"""
column_names = cls._get_stats_column_names()

# agate does not handle the array of column names gracefully
clustering_value = None
if table.clustering_fields is not None:
clustering_value = ','.join(table.clustering_fields)
# cast num_bytes/num_rows to str before they get to agate, or else
# agate will incorrectly decide they are booleans.
column_values = (
'Number of bytes',
str(table.num_bytes),
'The number of bytes this table consumes',
relation_type == 'table',

'Number of rows',
str(table.num_rows),
'The number of rows in this table',
relation_type == 'table',

'Location',
table.location,
'The geographic location of this table',
True,

'Partitioning Type',
table.partitioning_type,
'The partitioning type used for this table',
relation_type == 'table',

'Clustering Fields',
clustering_value,
'The clustering fields for this table',
relation_type == 'table',
)
return zip(column_names, column_values)
information_schemas = self._get_cache_schemas(manifest)
# information schemas are addressed from datasets on BigQuery
info_schemas = []
for info_schema_rel, schemas in information_schemas.items():
for schema in schemas:
real_info_schema = self.Relation.create(
database=info_schema_rel.database,
schema=schema
)
info_schemas.append(real_info_schema)

def get_catalog(self, manifest):
connection = self.connections.get_thread_connection()
client = connection.handle
# make it a list so macros can index into it.
kwargs = {'information_schemas': info_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)

schemas = manifest.get_used_schemas()

column_names = (
'table_database',
'table_schema',
'table_name',
'table_type',
'table_comment',
# does not exist in bigquery, but included for consistency
'table_owner',
'column_name',
'column_index',
'column_type',
'column_comment',
)
all_names = column_names + self._get_stats_column_names()
columns = []
# BigQuery doesn't allow ":" chars in column names -- remap them here.
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

for database_name, schema_name in schemas:
relations = self.list_relations(database_name, schema_name)
for relation in relations:

# This relation contains a subset of the info we care about.
# Fetch the full table object here
table_ref = self.connections.table_ref(
database_name,
relation.schema,
relation.identifier,
connection
)
table = client.get_table(table_ref)

flattened = self._flat_columns_in_table(table)
relation_stats = dict(self._get_stats_columns(table,
relation.type))

for index, column in enumerate(flattened, start=1):
column_data = (
relation.database,
relation.schema,
relation.name,
relation.type,
None,
None,
column.name,
index,
column.data_type,
None,
)
column_dict = dict(zip(column_names, column_data))
column_dict.update(copy.deepcopy(relation_stats))

columns.append(column_dict)

return dbt.clients.agate_helper.table_from_data(columns, all_names)
results = self._catalog_filter_table(table, manifest)
return results
160 changes: 160 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@

{% macro bigquery__get_catalog(information_schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}
{% for information_schema in information_schemas %}
(
with tables as (
select
project_id as table_database,
dataset_id as table_schema,
table_id as original_table_name,

concat(project_id, '.', dataset_id, '.', table_id) as relation_id,

row_count,
size_bytes as size_bytes,
case
when type = 1 then 'table'
when type = 2 then 'view'
else concat('unknown (', cast(type as string), ')')
end as table_type,

REGEXP_CONTAINS(table_id, '^.+[0-9]{8}$') and type = 1 as is_date_shard,
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name

from {{ information_schema }}.__TABLES__

),

extracted as (

select *,
case
when is_date_shard then shard_base_name
else original_table_name
end as table_name

from tables

),

unsharded_tables as (

select
table_database,
table_schema,
table_name,
table_type,
is_date_shard,

struct(
min(shard_name) as shard_min,
max(shard_name) as shard_max,
count(*) as shard_count
) as table_shards,

sum(size_bytes) as size_bytes,
sum(row_count) as row_count,

max(relation_id) as relation_id

from extracted
group by 1,2,3,4,5

),

info_schema_columns as (

select *,
concat(table_catalog, '.', table_schema, '.', table_name) as relation_id,
table_catalog as table_database

from {{ information_schema }}.INFORMATION_SCHEMA.COLUMNS
where ordinal_position is not null

),

column_stats as (

select
table_database,
table_schema,
table_name,
max(relation_id) as relation_id,
max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned,
max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column,
max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered,
ARRAY_TO_STRING(array_agg(case when clustering_ordinal_position is not null then column_name else null end ignore nulls order by clustering_ordinal_position), ', ') as clustering_columns

from info_schema_columns
group by 1,2,3

),

columns as (

select
table_catalog as table_database,
table_schema,
table_name,
column_name,
relation_id,
ordinal_position as column_index,
data_type as column_type,
cast(null as string) as column_comment

from info_schema_columns

)

select
unsharded_tables.table_database,
unsharded_tables.table_schema,
case
when is_date_shard then concat(unsharded_tables.table_name, '*')
else unsharded_tables.table_name
end as table_name,
unsharded_tables.table_type,
case when is_date_shard then table_shards else null end as table_shards,

columns.column_name,
columns.column_index,
columns.column_type,
columns.column_comment,

'Row Count' as `stats__row_count__label`,
row_count as `stats__row_count__value`,
'Approximate count of rows in this table' as `stats__row_count__description`,
(row_count is not null) as `stats__row_count__include`,

'Approximate Size' as `stats__bytes__label`,
size_bytes as `stats__bytes__value`,
'Approximate size of table as reported by BigQuery' as `stats__bytes__description`,
(size_bytes is not null) as `stats__bytes__include`,

'Partitioning Type' as `stats__partitioning_column__label`,
partition_column as `stats__partitioning_column__value`,
'The partitioning column for this table' as `stats__partitioning_column__description`,
is_partitioned as `stats__partitioning_column__include`,

'Clustering Columns' as `stats__clustering_columns__label`,
clustering_columns as `stats__clustering_columns__value`,
'The clustering columns for this table' as `stats__clustering_columns__description`,
is_clustered as `stats__clustering_columns__include`

-- join using relation_id (an actual relation, not a shard prefix) to make
-- sure that column metadata is picked up through the join. This will only
-- return the column information for the "max" table in a date-sharded table set
from unsharded_tables
left join columns using (relation_id)
left join column_stats using (relation_id)
)

{% if not loop.last %} union all {% endif %}
{% endfor %}
{%- endcall -%}
{{ return(load_result('catalog').table) }}

{% endmacro %}

0 comments on commit 1a97915

Please sign in to comment.