diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index a00c7cae27d1d..c0dce3861eca2 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -134,7 +134,11 @@ def register_asset_change( ) ) if not asset_model: - cls.logger().warning("AssetModel %s not found", asset) + msg = f"AssetModel {asset} not found; cannot create asset event." + cls.logger().warning(msg) + # if there is a task_instance, write to task log + if task_instance is not None and hasattr(task_instance, "log"): + task_instance.log.warning(msg) return None if not asset_model.active: diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 46a59198d8018..94b63e72501fd 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -69,6 +69,7 @@ def create_mock_dag(): class TestAssetManager: def test_register_asset_change_asset_doesnt_exist(self, mock_task_instance): + mock_task_instance = mock.Mock() asset = Asset(uri="asset_doesnt_exist", name="not exist") mock_session = mock.Mock(spec=Session) @@ -84,6 +85,7 @@ def test_register_asset_change_asset_doesnt_exist(self, mock_task_instance): # AssetDagRunQueue rows mock_session.add.assert_not_called() mock_session.merge.assert_not_called() + mock_task_instance.log.warning.assert_called() def test_register_asset_change(self, session, dag_maker, mock_task_instance, testing_dag_bundle): asset_manager = AssetManager()