diff --git a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py index 435a365e80429..0868ba6c16e83 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py +++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py @@ -152,6 +152,10 @@ def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage | "OpenLineage provider method failed to import OpenLineage integration. " "This should not happen." ) - except Exception: - self.log.warning("OpenLineage provider method failed to extract data from provider.") + except Exception as e: + self.log.warning( + "OpenLineage method failed to extract data from Operator with the following exception: `%s`", + e, + ) + self.log.debug("OpenLineage extraction failure details:", exc_info=True) return None diff --git a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py index 964616382f095..1a5232ae35d74 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py +++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py @@ -144,6 +144,7 @@ def extract_metadata( e, task_info, ) + self.log.debug("OpenLineage extraction failure details:", exc_info=True) elif (hook_lineage := self.get_hook_lineage()) is not None: inputs, outputs = hook_lineage task_metadata = OperatorLineage(inputs=inputs, outputs=outputs) @@ -199,12 +200,9 @@ def extract_inlets_and_outlets( if d: task_metadata.outputs.append(d) - @staticmethod - def get_hook_lineage() -> tuple[list[Dataset], list[Dataset]] | None: + def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None: try: - from airflow.providers.common.compat.lineage.hook import ( - get_hook_lineage_collector, - ) + from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector except ImportError: return None @@ -213,6 +211,7 @@ def get_hook_lineage() -> tuple[list[Dataset], list[Dataset]] | None: if not get_hook_lineage_collector().has_collected: return None + self.log.debug("OpenLineage will extract lineage from Hook Lineage Collector.") return ( [ asset @@ -322,5 +321,5 @@ def validate_task_metadata(self, task_metadata) -> OperatorLineage | None: job_facets=task_metadata.job_facets, ) except AttributeError: - self.log.warning("Extractor returns non-valid metadata: %s", task_metadata) + self.log.warning("OpenLineage extractor returns non-valid metadata: `%s`", task_metadata) return None diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index a7c7d5e1f9f8b..83dc218186c51 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -159,11 +159,20 @@ def emit(self, event: RunEvent): stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}")) stack.enter_context(Stats.timer("ol.emit.attempts")) self._client.emit(redacted_event) - self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId) - except Exception: + self.log.info( + "Successfully emitted OpenLineage `%s` event of id `%s`", + event_type.upper(), + event.run.runId, + ) + except Exception as e: Stats.incr("ol.emit.failed") - self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId) - self.log.debug("OpenLineage emission failure: %s", exc_info=True) + self.log.warning( + "Failed to emit OpenLineage `%s` event of id `%s` with the following exception: `%s`", + event_type.upper(), + event.run.runId, + e, + ) + self.log.debug("OpenLineage emission failure details:", exc_info=True) return redacted_event @@ -371,7 +380,7 @@ def dag_started( # Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them. # This ensures that any unexpected exceptions are logged for debugging purposes. # This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing. - self.log.warning("Failed to emit DAG started event: \n %s", traceback.format_exc()) + self.log.warning("Failed to emit OpenLineage DAG started event: \n %s", traceback.format_exc()) def dag_success( self, @@ -409,7 +418,7 @@ def dag_success( # Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them. # This ensures that any unexpected exceptions are logged for debugging purposes. # This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing. - self.log.warning("Failed to emit DAG success event: \n %s", traceback.format_exc()) + self.log.warning("Failed to emit OpenLineage DAG success event: \n %s", traceback.format_exc()) def dag_failed( self, @@ -453,7 +462,7 @@ def dag_failed( # Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them. # This ensures that any unexpected exceptions are logged for debugging purposes. # This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing. - self.log.warning("Failed to emit DAG failed event: \n %s", traceback.format_exc()) + self.log.warning("Failed to emit OpenLineage DAG failed event: \n %s", traceback.format_exc()) @staticmethod def _build_run( diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 1860da211b264..c62aaabbf135d 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -482,7 +482,9 @@ def _fork_execute(self, callable, callable_name: str): process.wait(conf.execution_timeout()) except psutil.TimeoutExpired: self.log.warning( - "OpenLineage process %s expired. This should not affect process execution.", pid + "OpenLineage process with pid `%s` expired and will be terminated by listener. " + "This has no impact on actual task execution status.", + pid, ) self._terminate_with_wait(process) except BaseException: diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index e4cc5c964adc1..f2c289bb906f0 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -757,7 +757,9 @@ def wrapper(*args, **kwargs): return f(*args, **kwargs) except Exception: log.warning( - "OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.", + "OpenLineage event emission failed. " + "Exception below is being caught but it's printed for visibility. " + "This has no impact on actual task execution status.", exc_info=True, )