Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage |
"OpenLineage provider method failed to import OpenLineage integration. "
"This should not happen."
)
except Exception:
self.log.warning("OpenLineage provider method failed to extract data from provider.")
except Exception as e:
self.log.warning(
"OpenLineage method failed to extract data from Operator with the following exception: `%s`",
e,
)
self.log.debug("OpenLineage extraction failure details:", exc_info=True)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def extract_metadata(
e,
task_info,
)
self.log.debug("OpenLineage extraction failure details:", exc_info=True)
elif (hook_lineage := self.get_hook_lineage()) is not None:
inputs, outputs = hook_lineage
task_metadata = OperatorLineage(inputs=inputs, outputs=outputs)
Expand Down Expand Up @@ -199,12 +200,9 @@ def extract_inlets_and_outlets(
if d:
task_metadata.outputs.append(d)

@staticmethod
def get_hook_lineage() -> tuple[list[Dataset], list[Dataset]] | None:
def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
try:
from airflow.providers.common.compat.lineage.hook import (
get_hook_lineage_collector,
)
from airflow.providers.common.compat.lineage.hook import get_hook_lineage_collector
except ImportError:
return None

Expand All @@ -213,6 +211,7 @@ def get_hook_lineage() -> tuple[list[Dataset], list[Dataset]] | None:
if not get_hook_lineage_collector().has_collected:
return None

self.log.debug("OpenLineage will extract lineage from Hook Lineage Collector.")
return (
[
asset
Expand Down Expand Up @@ -322,5 +321,5 @@ def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
job_facets=task_metadata.job_facets,
)
except AttributeError:
self.log.warning("Extractor returns non-valid metadata: %s", task_metadata)
self.log.warning("OpenLineage extractor returns non-valid metadata: `%s`", task_metadata)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,20 @@ def emit(self, event: RunEvent):
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
except Exception:
self.log.info(
"Successfully emitted OpenLineage `%s` event of id `%s`",
event_type.upper(),
event.run.runId,
)
except Exception as e:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", exc_info=True)
self.log.warning(
"Failed to emit OpenLineage `%s` event of id `%s` with the following exception: `%s`",
event_type.upper(),
event.run.runId,
e,
)
self.log.debug("OpenLineage emission failure details:", exc_info=True)

return redacted_event

Expand Down Expand Up @@ -371,7 +380,7 @@ def dag_started(
# Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them.
# This ensures that any unexpected exceptions are logged for debugging purposes.
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG started event: \n %s", traceback.format_exc())
self.log.warning("Failed to emit OpenLineage DAG started event: \n %s", traceback.format_exc())

def dag_success(
self,
Expand Down Expand Up @@ -409,7 +418,7 @@ def dag_success(
# Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them.
# This ensures that any unexpected exceptions are logged for debugging purposes.
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG success event: \n %s", traceback.format_exc())
self.log.warning("Failed to emit OpenLineage DAG success event: \n %s", traceback.format_exc())

def dag_failed(
self,
Expand Down Expand Up @@ -453,7 +462,7 @@ def dag_failed(
# Catch all exceptions to prevent ProcessPoolExecutor from silently swallowing them.
# This ensures that any unexpected exceptions are logged for debugging purposes.
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG failed event: \n %s", traceback.format_exc())
self.log.warning("Failed to emit OpenLineage DAG failed event: \n %s", traceback.format_exc())

@staticmethod
def _build_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,9 @@ def _fork_execute(self, callable, callable_name: str):
process.wait(conf.execution_timeout())
except psutil.TimeoutExpired:
self.log.warning(
"OpenLineage process %s expired. This should not affect process execution.", pid
"OpenLineage process with pid `%s` expired and will be terminated by listener. "
"This has no impact on actual task execution status.",
pid,
)
self._terminate_with_wait(process)
except BaseException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,9 @@ def wrapper(*args, **kwargs):
return f(*args, **kwargs)
except Exception:
log.warning(
"OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.",
"OpenLineage event emission failed. "
"Exception below is being caught but it's printed for visibility. "
"This has no impact on actual task execution status.",
exc_info=True,
)

Expand Down