Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model contracts + constraints with nested fields #738

Merged
merged 16 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230601-141255.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support model contracts + constraints on nested columns
time: 2023-06-01T14:12:55.433346-04:00
custom:
Author: MichelleArk
Issue: "673"
153 changes: 152 additions & 1 deletion dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,130 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
return False
return True

@available.parse(lambda *a, **k: {})
@classmethod
def nest_column_data_types(
cls,
columns: Dict[str, Dict[str, Any]],
constraints: Optional[Dict[str, str]] = None,
) -> Dict[str, Dict[str, str]]:
"""
columns:
* Dictionary where keys are of flat columns names and values are dictionary of column attributes
* column names with "." indicate a nested column within a STRUCT type
* e.g. {"a": {"name": "a", "data_type": "string", ...}}
constraints:
* Dictionary where keys are flat column names and values are rendered constraints for the column
* If provided, rendered column is included in returned "data_type" values.
returns:
* Dictionary where keys are root column names and values are corresponding nested data_type values.
* Fields other than "name" and "data_type" are not preserved in the return value.

Example:
columns: {
"a": {"name": "a", "data_type": "string", "description": ...},
"b.nested": {"name": "b.nested", "data_type": "string"},
"b.nested2": {"name": "b.nested2", "data_type": "string"}
}

returns: {
"a": {"name": "a", "data_type": "string"},
"b": {"name": "b": "data_type": "struct<nested string, nested2 string>}
}
"""
constraints = constraints or {}

nested_column_data_types: Dict[str, Union[str, Dict]] = {}
for column in columns.values():
cls._update_nested_column_data_types(
column["name"],
column["data_type"],
constraints.get(column["name"]),
nested_column_data_types,
)

formatted_nested_column_data_types: Dict[str, Dict[str, str]] = {}
for column_name, unformatted_column_type in nested_column_data_types.items():
formatted_nested_column_data_types[column_name] = {
"name": column_name,
"data_type": cls._format_nested_data_type(unformatted_column_type),
}

return formatted_nested_column_data_types

@classmethod
def _update_nested_column_data_types(
cls,
column_name: str,
column_data_type: str,
column_rendered_constraint: Optional[str],
nested_column_data_types: Dict[str, Union[str, Dict]],
) -> None:
"""
Recursively update nested_column_data_types given a column_name, column_data_type, and optional column_rendered_constraint.

Examples:
>>> nested_column_data_types = {}
>>> BigQueryAdapter._update_nested_column_data_types("a", "string", "not_null", nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null"}
>>> BigQueryAdapter._update_nested_column_data_types("b.c", "string", "not_null", nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null", "b": {"c": "string not null"}}
>>> BigQueryAdapter._update_nested_column_data_types("b.d", "string", None, nested_column_data_types)
>>> nested_column_data_types
{"a": "string not null", "b": {"c": "string not null", "d": "string"}}
"""
column_name_parts = column_name.split(".")
root_column_name = column_name_parts[0]

if len(column_name_parts) == 1:
# Base case: column is not nested - store its data_type concatenated with constraint if provided.
nested_column_data_types[root_column_name] = (
column_data_type
if column_rendered_constraint is None
else f"{column_data_type} {column_rendered_constraint}"
)
else:
# Initialize nested dictionary
if root_column_name not in nested_column_data_types:
nested_column_data_types[root_column_name] = {}

# Recursively process rest of remaining column name
remaining_column_name = ".".join(column_name_parts[1:])
remaining_column_data_types = nested_column_data_types[root_column_name]
assert isinstance(remaining_column_data_types, dict) # keeping mypy happy
cls._update_nested_column_data_types(
remaining_column_name,
column_data_type,
column_rendered_constraint,
remaining_column_data_types,
)

@classmethod
def _format_nested_data_type(
cls, unformatted_nested_data_type: Union[str, Dict[str, Any]]
) -> str:
"""
Recursively format a (STRUCT) data type given an arbitrarily nested data type structure.

Examples:
>>> BigQueryAdapter._format_nested_data_type("string")
'string'
>>> BigQueryAdapter._format_nested_data_type({'c': 'string not_null', 'd': 'string'})
'struct<c string not_null, d string>'
>>> BigQueryAdapter._format_nested_data_type({'c': 'string not_null', 'd': {'e': 'string'}})
'struct<c string not_null, d struct<e string>>'
"""
if isinstance(unformatted_nested_data_type, str):
return unformatted_nested_data_type
else:
formatted_nested_types = [
f"{column_name} {cls._format_nested_data_type(column_type)}"
for column_name, column_type in unformatted_nested_data_type.items()
]
return f"""struct<{", ".join(formatted_nested_types)}>"""

def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]:
try:
table = self.connections.get_bq_table(
Expand Down Expand Up @@ -526,7 +650,10 @@ def get_column_schema_from_query(self, sql: str) -> List[BigQueryColumn]:
"""
_, iterator = self.connections.raw_execute(sql)
columns = [self.Column.create_from_field(field) for field in iterator.schema]
return columns
flattened_columns = []
for column in columns:
flattened_columns += column.flatten()
return flattened_columns

@available.parse(lambda *a, **k: False)
def get_columns_in_select_sql(self, select_sql: str) -> List[BigQueryColumn]:
Expand Down Expand Up @@ -958,6 +1085,30 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
"serverless": ServerlessDataProcHelper,
}

@available
@classmethod
def render_raw_columns_constraints(cls, raw_columns: Dict[str, Dict[str, Any]]) -> List:
rendered_constraints: Dict[str, str] = {}
for raw_column in raw_columns.values():
for con in raw_column.get("constraints", None):
constraint = cls._parse_column_constraint(con)
rendered_constraint = cls.process_parsed_constraint(
constraint, cls.render_column_constraint
)

if rendered_constraint:
column_name = raw_column["name"]
if column_name not in rendered_constraints:
rendered_constraints[column_name] = rendered_constraint
else:
rendered_constraints[column_name] += f" {rendered_constraint}"

nested_columns = cls.nest_column_data_types(raw_columns, rendered_constraints)
rendered_column_constraints = [
f"{column['name']} {column['data_type']}" for column in nested_columns.values()
]
return rendered_column_constraints

@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
c = super().render_column_constraint(constraint) # type: ignore
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/bigquery/macros/utils/get_columns_spec_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,19 @@
{% set formatted = column.column.lower() ~ " " ~ data_type %}
{{ return({'name': column.name, 'data_type': data_type, 'formatted': formatted}) }}
{%- endmacro -%}

{% macro bigquery__get_empty_schema_sql(columns) %}
{%- set columns = adapter.nest_column_data_types(columns) -%}
{{ return(dbt.default__get_empty_schema_sql(columns)) }}
{% endmacro %}

{% macro bigquery__get_select_subquery(sql) %}
{%- set columns = adapter.nest_column_data_types(model['columns']) -%}
select
{% for column in columns %}
{{ column }}{{ ", " if not loop.last }}
{% endfor %}
from (
{{ sql }}
) as model_subq
{%- endmacro %}
118 changes: 118 additions & 0 deletions tests/functional/adapter/constraints/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
my_model_struct_wrong_data_type_sql = """
{{ config(materialized = "table") }}

select
STRUCT(1 AS struct_column_being_tested, "test" AS another_struct_column) as a
"""

my_model_struct_correct_data_type_sql = """
{{ config(materialized = "table")}}

select
STRUCT("test" AS struct_column_being_tested, "test" AS b) as a
"""

model_struct_data_type_schema_yml = """
version: 2
models:
- name: contract_struct_wrong
config:
contract:
enforced: true
columns:
- name: a.struct_column_being_tested
data_type: string
- name: a.b
data_type: string

- name: contract_struct_correct
config:
contract:
enforced: true
columns:
- name: a.struct_column_being_tested
data_type: string
- name: a.b
data_type: string
"""

my_model_double_struct_wrong_data_type_sql = """
{{ config(materialized = "table") }}

select
STRUCT(
STRUCT(1 AS struct_column_being_tested, "test" AS c) as b,
"test" as d
) as a
"""

my_model_double_struct_correct_data_type_sql = """
{{ config(materialized = "table") }}

select
STRUCT(
STRUCT("test" AS struct_column_being_tested, "test" AS c) as b,
"test" as d
) as a
"""

model_double_struct_data_type_schema_yml = """
version: 2
models:
- name: contract_struct_wrong
config:
contract:
enforced: true
columns:
- name: a.b.struct_column_being_tested
data_type: string
- name: a.b.c
data_type: string
- name: a.d
data_type: string

- name: contract_struct_correct
config:
contract:
enforced: true
columns:
- name: a.b.struct_column_being_tested
data_type: string
- name: a.b.c
data_type: string
- name: a.d
data_type: string
"""


my_model_struct_sql = """
{{
config(
materialized = "table"
)
}}

select STRUCT("test" as nested_column, "test" as nested_column2) as id
"""


model_struct_schema_yml = """
version: 2
models:
- name: my_model
config:
contract:
enforced: true
columns:
- name: id.nested_column
quote: true
data_type: string
description: hello
constraints:
- type: not_null
- type: unique
- name: id.nested_column2
data_type: string
constraints:
- type: unique
"""
Loading