Skip to content

Commit

Permalink
feat(dbt): catch exceptions when invoking adapter to build column lin…
Browse files Browse the repository at this point in the history
…eage (#22641)

## Summary & Motivation
On issues like
#22358 (comment),
we should not fail the computation. Instead, we should surface the
exception as a warning.

## How I Tested These Changes
pytest
  • Loading branch information
rexledesma authored Jun 21, 2024
1 parent 2a9de8b commit 46d7bd2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ def to_default_asset_events(
f" `{self._event_history_metadata}` for the dbt resource"
f" `{dbt_resource_props['original_file_path']}`."
" Column schema metadata will not be included in the event.\n\n"
f"Exception: {e}"
f"Exception: {e}",
exc_info=True,
)

default_metadata = {
Expand Down Expand Up @@ -294,7 +295,8 @@ def to_default_asset_events(
"An error occurred while building column lineage metadata for the dbt resource"
f" `{dbt_resource_props['original_file_path']}`."
" Lineage metadata will not be included in the event.\n\n"
f"Exception: {e}"
f"Exception: {e}",
exc_info=True,
)

if has_asset_def:
Expand Down Expand Up @@ -937,12 +939,22 @@ def _fetch_column_metadata(
dbt_resource_props = _get_dbt_resource_props_from_event(invocation, event)

with adapter.connection_named(f"column_metadata_{dbt_resource_props['unique_id']}"):
relation = adapter.get_relation(
database=dbt_resource_props["database"],
schema=dbt_resource_props["schema"],
identifier=dbt_resource_props["name"],
)
cols: List[BaseColumn] = adapter.get_columns_in_relation(relation=relation)
try:
relation = adapter.get_relation(
database=dbt_resource_props["database"],
schema=dbt_resource_props["schema"],
identifier=dbt_resource_props["name"],
)
cols: List[BaseColumn] = adapter.get_columns_in_relation(relation=relation)
except Exception as e:
logger.warning(
"An error occurred while fetching column schema metadata for the dbt resource"
f" `{dbt_resource_props['original_file_path']}`."
" Column metadata will not be included in the event.\n\n"
f"Exception: {e}",
exc_info=True,
)
return {}
column_schema_data = {col.name: {"data_type": col.data_type} for col in cols}

if with_column_lineage:
Expand Down Expand Up @@ -976,7 +988,8 @@ def _fetch_column_metadata(
f" `{col_data}` for the dbt resource"
f" `{dbt_resource_props['original_file_path']}`."
" Column schema metadata will not be included in the event.\n\n"
f"Exception: {e}"
f"Exception: {e}",
exc_info=True,
)

lineage_metadata = {}
Expand All @@ -998,7 +1011,8 @@ def _fetch_column_metadata(
"An error occurred while building column lineage metadata for the dbt resource"
f" `{dbt_resource_props['original_file_path']}`."
" Lineage metadata will not be included in the event.\n\n"
f"Exception: {e}"
f"Exception: {e}",
exc_info=True,
)

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,38 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
assert table_schema_by_asset_key == expected_table_schema_by_asset_key


def test_exception_fetch_column_schema_with_adapter(
monkeypatch: pytest.MonkeyPatch, mocker: MockFixture, test_metadata_manifest: Dict[str, Any]
):
monkeypatch.setenv("DBT_LOG_COLUMN_METADATA", "false")

mock_adapter = mocker.patch(
"dagster_dbt.core.resources_v2.DbtCliInvocation.adapter",
return_value=mocker.MagicMock(),
new_callable=mocker.PropertyMock,
)
mock_adapter.return_value.get_columns_in_relation.side_effect = Exception("An error occurred")

@dbt_assets(manifest=test_metadata_manifest)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from (
dbt.cli(["build"], context=context)
.stream()
.fetch_column_metadata(with_column_lineage=False)
)

result = materialize(
[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
)

assert result.success
assert all(
not TableMetadataSet.extract(event.materialization.metadata).column_schema
for event in result.get_asset_materialization_events()
)


@pytest.mark.parametrize(
"use_experimental_fetch_column_schema",
[True, False],
Expand Down

0 comments on commit 46d7bd2

Please sign in to comment.