Skip to content

Commit

Permalink
improve the BigQuery Relations understanding of the information schema
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Oct 15, 2019
1 parent 414716b commit f757a08
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 30 deletions.
28 changes: 2 additions & 26 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import agate


GET_CATALOG_MACRO_NAME = 'get_catalog'


def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database='',
Expand Down Expand Up @@ -388,31 +385,10 @@ def load_dataframe(self, database, schema, table_name, agate_table,
with self.connections.exception_handler("LOAD TABLE"):
self.poll_until_job_completes(job, timeout)

def get_catalog(self, manifest):
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""

information_schemas = []
for database, schema in manifest.get_used_schemas():
information_schemas.append(self.Relation.create(
database=database,
schema=schema,
quote_policy={
'database': True,
'schema': True
}
))

# make it a list so macros can index into it.
kwargs = {'information_schemas': information_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)

def _catalog_filter_table(self, table, manifest):
# BigQuery doesn't allow ":" chars in column names -- remap them here.
table = table.rename(column_names={
col.name: col.name.replace('__', ':') for col in table.columns
})

return self._catalog_filter_table(table, manifest)
return super()._catalog_filter_table(table, manifest)
34 changes: 34 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
BaseRelation, ComponentName
)
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar('Self', bound='BigQueryRelation')


@dataclass(frozen=True, eq=False, repr=False)
Expand Down Expand Up @@ -40,3 +44,33 @@ def project(self):
@property
def dataset(self):
return self.schema

def information_schema(self: Self, identifier=None) -> Self:
# BigQuery (usually) addresses information schemas at the dataset
# level. This method overrides the BaseRelation method to return an
# Information Schema relation as project.dataset.information_schem

include_policy = self.include_policy.replace(
database=self.database is not None,
schema=self.schema is not None,
identifier=True
)

# Quote everything on BigQuery -- identifiers are case-sensitive,
# even when quoted.
quote_policy = self.quote_policy.replace(
database=True,
schema=True,
identifier=True,
)

path = self.path.replace(
schema=self.schema,
identifier='INFORMATION_SCHEMA'
)

return self.replace(
quote_policy=quote_policy,
include_policy=include_policy,
path=path,
)
8 changes: 4 additions & 4 deletions plugins/bigquery/dbt/include/bigquery/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
schema_name as table_schema,
location

from `{{ information_schema.database }}`.INFORMATION_SCHEMA.SCHEMATA
from {{ information_schema.include(schema=False) }}.SCHEMATA

),

Expand All @@ -35,7 +35,7 @@
REGEXP_EXTRACT(table_id, '^(.+)[0-9]{8}$') as shard_base_name,
REGEXP_EXTRACT(table_id, '^.+([0-9]{8})$') as shard_name

from {{ information_schema }}.__TABLES__
from {{ information_schema.include(identifier=False) }}.__TABLES__

),

Expand Down Expand Up @@ -92,7 +92,7 @@
is_partitioning_column,
clustering_ordinal_position

from {{ information_schema }}.INFORMATION_SCHEMA.COLUMNS
from {{ information_schema }}.COLUMNS
where ordinal_position is not null

),
Expand All @@ -105,7 +105,7 @@
data_type as column_type,
column_name as base_column_name

from {{ information_schema }}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
from {{ information_schema }}.COLUMN_FIELD_PATHS
where data_type not like 'STRUCT%'

),
Expand Down

0 comments on commit f757a08

Please sign in to comment.