Skip to content

Commit

Permalink
Merge pull request #1795 from fishtown-analytics/fix/bq-catalog-query
Browse files Browse the repository at this point in the history
(#1576) use the information schema on BigQuery
  • Loading branch information
drewbanin authored Oct 15, 2019
2 parents 73d0308 + f757a08 commit 00a22e1
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 143 deletions.
131 changes: 6 additions & 125 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import copy

import dbt.deprecations
import dbt.exceptions
import dbt.flags as flags
Expand Down Expand Up @@ -387,127 +385,10 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

###
# The get_catalog implementation for bigquery
###
def _flat_columns_in_table(self, table):
"""An iterator over the flattened columns for a given schema and table.
Resolves child columns as having the name "parent.child".
"""
for col in self._get_dbt_columns_from_bq_table(table):
yield from col.flatten()

@classmethod
def _get_stats_column_names(cls):
"""Construct a tuple of the column names for stats. Each stat has 4
columns of data.
"""
columns = []
stats = ('num_bytes', 'num_rows', 'location', 'partitioning_type',
'clustering_fields')
stat_components = ('label', 'value', 'description', 'include')
for stat_id in stats:
for stat_component in stat_components:
columns.append('stats:{}:{}'.format(stat_id, stat_component))
return tuple(columns)

@classmethod
def _get_stats_columns(cls, table, relation_type):
"""Given a table, return an iterator of key/value pairs for stats
column names/values.
"""
column_names = cls._get_stats_column_names()

# agate does not handle the array of column names gracefully
clustering_value = None
if table.clustering_fields is not None:
clustering_value = ','.join(table.clustering_fields)
# cast num_bytes/num_rows to str before they get to agate, or else
# agate will incorrectly decide they are booleans.
column_values = (
'Number of bytes',
str(table.num_bytes),
'The number of bytes this table consumes',
relation_type == 'table',

'Number of rows',
str(table.num_rows),
'The number of rows in this table',
relation_type == 'table',

'Location',
table.location,
'The geographic location of this table',
True,

'Partitioning Type',
table.partitioning_type,
'The partitioning type used for this table',
relation_type == 'table',

'Clustering Fields',
clustering_value,
'The clustering fields for this table',
relation_type == 'table',
)
return zip(column_names, column_values)

def get_catalog(self, manifest):
connection = self.connections.get_thread_connection()
client = connection.handle

schemas = manifest.get_used_schemas()

column_names = (
'table_database',
'table_schema',
'table_name',
'table_type',
'table_comment',
# does not exist in bigquery, but included for consistency
'table_owner',
'column_name',
'column_index',
'column_type',
'column_comment',
)
all_names = column_names + self._get_stats_column_names()
columns = []
def _catalog_filter_table(self, table, manifest):
# BigQuery doesn't allow ":" chars in column names -- remap them here.
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

for database_name, schema_name in schemas:
relations = self.list_relations(database_name, schema_name)
for relation in relations:

# This relation contains a subset of the info we care about.
# Fetch the full table object here
table_ref = self.connections.table_ref(
database_name,
relation.schema,
relation.identifier,
connection
)
table = client.get_table(table_ref)

flattened = self._flat_columns_in_table(table)
relation_stats = dict(self._get_stats_columns(table,
relation.type))

for index, column in enumerate(flattened, start=1):
column_data = (
relation.database,
relation.schema,
relation.name,
relation.type,
None,
None,
column.name,
index,
column.data_type,
None,
)
column_dict = dict(zip(column_names, column_data))
column_dict.update(copy.deepcopy(relation_stats))

columns.append(column_dict)

return dbt.clients.agate_helper.table_from_data(columns, all_names)
return super()._catalog_filter_table(table, manifest)
34 changes: 34 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
BaseRelation, ComponentName
)
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar('Self', bound='BigQueryRelation')


@dataclass(frozen=True, eq=False, repr=False)
Expand Down Expand Up @@ -40,3 +44,33 @@ def project(self):
@property
def dataset(self):
return self.schema

def information_schema(self: Self, identifier=None) -> Self:
# BigQuery (usually) addresses information schemas at the dataset
# level. This method overrides the BaseRelation method to return an
# Information Schema relation as project.dataset.information_schem

include_policy = self.include_policy.replace(
database=self.database is not None,
schema=self.schema is not None,
identifier=True
)

# Quote everything on BigQuery -- identifiers are case-sensitive,
# even when quoted.
quote_policy = self.quote_policy.replace(
database=True,
schema=True,
identifier=True,
)

path = self.path.replace(
schema=self.schema,
identifier='INFORMATION_SCHEMA'
)

return self.replace(
quote_policy=quote_policy,
include_policy=include_policy,
path=path,
)
Loading

0 comments on commit 00a22e1

Please sign in to comment.