Skip to content

Commit

Permalink
utilize more information to deterministically generate OpenLineage ru…
Browse files Browse the repository at this point in the history
…n_id (#43936)

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski authored Nov 25, 2024
1 parent 074c822 commit 05f935d
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ async def get_artifacts_for_steps(steps, artifacts):
task_id=operator.task_id,
logical_date=_get_logical_date(task_instance),
try_number=_get_try_number(task_instance),
map_index=task_instance.map_index,
)

parent_job = ParentRunMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ def build_task_instance_run_id(
task_id: str,
try_number: int,
logical_date: datetime,
map_index: int,
):
return str(
generate_static_uuid(
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
)
)

Expand Down
11 changes: 11 additions & 0 deletions providers/src/airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
get_user_provided_run_facets,
is_operator_disabled,
is_selective_lineage_enabled,
is_ti_rescheduled_already,
print_warning,
)
from airflow.settings import configure_orm
Expand Down Expand Up @@ -134,6 +135,11 @@ def on_running():
# we return here because Airflow 2.3 needs task from deferred state
if task_instance.next_method is not None:
return

if is_ti_rescheduled_already(task_instance):
self.log.debug("Skipping this instance of rescheduled task - START event was emitted already")
return

parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
logical_date=dagrun.logical_date,
Expand All @@ -143,11 +149,13 @@ def on_running():
logical_date = task_instance.logical_date
else:
logical_date = task_instance.execution_date

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -231,6 +239,7 @@ def on_success():
task_id=task.task_id,
try_number=_get_try_number_success(task_instance),
logical_date=logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -329,11 +338,13 @@ def on_failure():
logical_date = task_instance.logical_date
else:
logical_date = task_instance.execution_date

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
map_index=task_instance.map_index,
)
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def lineage_run_id(task_instance: TaskInstance):
task_id=task_instance.task_id,
try_number=task_instance.try_number,
logical_date=logical_date,
map_index=task_instance.map_index,
)


Expand Down
34 changes: 30 additions & 4 deletions providers/src/airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
from deprecated import deprecated
from openlineage.client.utils import RedactMixin
from packaging.version import Version
from sqlalchemy import exists

from airflow import __version__ as AIRFLOW_VERSION
from airflow.exceptions import (
AirflowProviderDeprecationWarning,
)

# TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, TaskReschedule
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
Expand All @@ -53,6 +54,7 @@
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
from airflow.sensors.base import BaseSensorOperator
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.context import AirflowContextDeprecationWarning
from airflow.utils.log.secrets_masker import (
Expand All @@ -62,6 +64,7 @@
should_hide_value_for_key,
)
from airflow.utils.module_loading import import_string
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
Expand Down Expand Up @@ -184,6 +187,28 @@ def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bo
raise TypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")


@provide_session
def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
if not isinstance(ti.task, BaseSensorOperator):
return False

if not ti.task.reschedule:
return False

return (
session.query(
exists().where(
TaskReschedule.dag_id == ti.dag_id,
TaskReschedule.task_id == ti.task_id,
TaskReschedule.run_id == ti.run_id,
TaskReschedule.map_index == ti.map_index,
TaskReschedule.try_number == ti.try_number,
)
).scalar()
is True
)


class InfoJsonEncodable(dict):
"""
Airflow objects might not be json-encodable overall.
Expand Down Expand Up @@ -217,6 +242,7 @@ def __init__(self, obj):
self,
**{field: InfoJsonEncodable._cast_basic_types(getattr(self, field)) for field in self._fields},
)
del self.obj

@staticmethod
def _cast_basic_types(value):
Expand Down Expand Up @@ -677,11 +703,11 @@ def decorator(f):
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
except Exception:
log.warning(
"Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events."
"OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.",
exc_info=True,
)
log.warning(e)

return wrapper

Expand Down
23 changes: 23 additions & 0 deletions providers/tests/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ def test_build_task_instance_run_id_is_valid_uuid():
task_id="task_id",
try_number=1,
logical_date=datetime.datetime.now(),
map_index=-1,
)
uuid_result = uuid.UUID(result)
assert uuid_result
Expand All @@ -912,28 +913,50 @@ def test_build_task_instance_run_id_same_input_gives_same_result():
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=-1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=-1,
)
assert result1 == result2


def test_build_task_instance_run_id_different_map_index_gives_different_result():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
map_index=2,
)
assert result1 != result2


def test_build_task_instance_run_id_different_inputs_gives_different_results():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
logical_date=datetime.datetime.now(),
map_index=-1,
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag2",
task_id="task2",
try_number=2,
logical_date=datetime.datetime.now(),
map_index=-1,
)
assert result1 != result2

Expand Down
1 change: 0 additions & 1 deletion providers/tests/openlineage/plugins/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def get_sorted_events(event_dir: str) -> list[str]:

def has_value_in_events(events, chain, value):
x = [get_from_nullable_chain(event, chain) for event in events]
log.error(x)
y = [z == value for z in x]
return any(y)

Expand Down
Loading

0 comments on commit 05f935d

Please sign in to comment.