From 9de21f63be60166613dfad62a592e769032ae97f Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Fri, 21 Jun 2024 12:34:35 -0400 Subject: [PATCH] feat(dbt): catch exceptions when invoking adapter to build column lineage (#22641) ## Summary & Motivation On issues like https://github.com/dagster-io/dagster/issues/22358#issuecomment-2182522019, we should not fail the computation. Instead, we should surface the exception as a warning. ## How I Tested These Changes pytest --- .../dagster_dbt/core/resources_v2.py | 34 +++++++++++++------ .../dbt_packages/test_columns_metadata.py | 32 +++++++++++++++++ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index d980a9a12c976..5e748047bc129 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -238,7 +238,8 @@ def to_default_asset_events( f" `{self._event_history_metadata}` for the dbt resource" f" `{dbt_resource_props['original_file_path']}`." " Column schema metadata will not be included in the event.\n\n" - f"Exception: {e}" + f"Exception: {e}", + exc_info=True, ) default_metadata = { @@ -294,7 +295,8 @@ def to_default_asset_events( "An error occurred while building column lineage metadata for the dbt resource" f" `{dbt_resource_props['original_file_path']}`." " Lineage metadata will not be included in the event.\n\n" - f"Exception: {e}" + f"Exception: {e}", + exc_info=True, ) if has_asset_def: @@ -937,12 +939,22 @@ def _fetch_column_metadata( dbt_resource_props = _get_dbt_resource_props_from_event(invocation, event) with adapter.connection_named(f"column_metadata_{dbt_resource_props['unique_id']}"): - relation = adapter.get_relation( - database=dbt_resource_props["database"], - schema=dbt_resource_props["schema"], - identifier=dbt_resource_props["name"], - ) - cols: List[BaseColumn] = adapter.get_columns_in_relation(relation=relation) + try: + relation = adapter.get_relation( + database=dbt_resource_props["database"], + schema=dbt_resource_props["schema"], + identifier=dbt_resource_props["name"], + ) + cols: List[BaseColumn] = adapter.get_columns_in_relation(relation=relation) + except Exception as e: + logger.warning( + "An error occurred while fetching column schema metadata for the dbt resource" + f" `{dbt_resource_props['original_file_path']}`." + " Column metadata will not be included in the event.\n\n" + f"Exception: {e}", + exc_info=True, + ) + return {} column_schema_data = {col.name: {"data_type": col.data_type} for col in cols} if with_column_lineage: @@ -976,7 +988,8 @@ def _fetch_column_metadata( f" `{col_data}` for the dbt resource" f" `{dbt_resource_props['original_file_path']}`." " Column schema metadata will not be included in the event.\n\n" - f"Exception: {e}" + f"Exception: {e}", + exc_info=True, ) lineage_metadata = {} @@ -998,7 +1011,8 @@ def _fetch_column_metadata( "An error occurred while building column lineage metadata for the dbt resource" f" `{dbt_resource_props['original_file_path']}`." " Lineage metadata will not be included in the event.\n\n" - f"Exception: {e}" + f"Exception: {e}", + exc_info=True, ) return { diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py index c5400c3c497f6..7017b9e70c117 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py @@ -93,6 +93,38 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): assert table_schema_by_asset_key == expected_table_schema_by_asset_key +def test_exception_fetch_column_schema_with_adapter( + monkeypatch: pytest.MonkeyPatch, mocker: MockFixture, test_metadata_manifest: Dict[str, Any] +): + monkeypatch.setenv("DBT_LOG_COLUMN_METADATA", "false") + + mock_adapter = mocker.patch( + "dagster_dbt.core.resources_v2.DbtCliInvocation.adapter", + return_value=mocker.MagicMock(), + new_callable=mocker.PropertyMock, + ) + mock_adapter.return_value.get_columns_in_relation.side_effect = Exception("An error occurred") + + @dbt_assets(manifest=test_metadata_manifest) + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from ( + dbt.cli(["build"], context=context) + .stream() + .fetch_column_metadata(with_column_lineage=False) + ) + + result = materialize( + [my_dbt_assets], + resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))}, + ) + + assert result.success + assert all( + not TableMetadataSet.extract(event.materialization.metadata).column_schema + for event in result.get_asset_materialization_events() + ) + + @pytest.mark.parametrize( "use_experimental_fetch_column_schema", [True, False],