Skip to content

Commit

Permalink
working on bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Mar 29, 2018
1 parent 4b0737b commit af9b3bb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
27 changes: 19 additions & 8 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.adapters.postgres import PostgresAdapter
from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.schema import BigQueryColumn

import google.auth
import google.oauth2
Expand All @@ -29,7 +30,8 @@ class BigQueryAdapter(PostgresAdapter):
"drop",
"execute",
"quote_schema_and_table",
"make_date_partitioned_table"
"make_date_partitioned_table",
"get_columns_in_table"
]

SCOPE = ('https://www.googleapis.com/auth/bigquery',
Expand Down Expand Up @@ -378,14 +380,23 @@ def get_existing_schemas(cls, profile, model_name=None):
@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
model_name=None):
raise dbt.exceptions.NotImplementedException(
'`get_columns_in_table` is not implemented for this adapter!')

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
model_name=None):
raise dbt.exceptions.NotImplementedException(
'`get_columns_in_table` is not implemented for this adapter!')
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset_ref = client.dataset(schema_name)
table_ref = dataset_ref.table(table_name)
table = client.get_table(table_ref)

columns = []
for col in table.schema:
name = col.name
data_type = col.field_type

column = BigQueryColumn(col.name, col.field_type, col.fields)
columns.append(column)

return columns

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
Expand Down
59 changes: 57 additions & 2 deletions dbt/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class Column(object):
def __init__(self, column, dtype, char_size, numeric_size):
def __init__(self, column, dtype, char_size=None, numeric_size=None):
self.column = column
self.dtype = dtype
self.char_size = char_size
Expand All @@ -18,7 +18,6 @@ def quoted(self):

@property
def data_type(self):
print("ASKING FOR DATA TYPE. IT IS '{}'".format(self.dtype))
if self.is_string():
return Column.string_type(self.string_size())
elif self.is_numeric():
Expand Down Expand Up @@ -62,3 +61,59 @@ def numeric_type(cls, dtype, size):

def __repr__(self):
return "<Column {} ({})>".format(self.name, self.data_type)


class BigQueryColumn(Column):
def __init__(self, column, dtype, fields):
super(BigQueryColumn, self).__init__(column, dtype)

self.fields = self.wrap_subfields(fields)

@classmethod
def wrap_subfields(cls, fields):
return [BigQueryColumn.create(field) for field in fields]

@classmethod
def create(cls, field):
return BigQueryColumn(field.name, field.field_type, field.fields)

@classmethod
def _flatten_recursive(cls, col, prefix=None):
if prefix is None:
prefix = []

if len(col.fields) == 0:
prefixed_name = ".".join(prefix + [col.column])
new_col = BigQueryColumn(prefixed_name, col.dtype, col.fields)
return [new_col]

new_fields = []
for field in col.fields:
new_prefix = prefix + [col.column]
new_fields.extend(cls._flatten_recursive(field, new_prefix))

return new_fields

def flatten(self):
return self._flatten_recursive(self)

@property
def quoted(self):
return '`{}`'.format(self.column)

@property
def data_type(self):
return self.dtype

def is_string(self):
return self.dtype.lower() == 'string'

def is_numeric(self):
return False

def can_expand_to(self, other_column):
"""returns True if both columns are strings"""
return self.is_string() and other_column.is_string()

def __repr__(self):
return "<BigQueryColumn {} ({})>".format(self.name, self.data_type)

0 comments on commit af9b3bb

Please sign in to comment.