Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def _get_logical_date(task_instance):
return date


def _get_dag_run_clear_number(task_instance):
# todo: remove when min airflow version >= 3.0
if AIRFLOW_V_3_0_PLUS:
dagrun = task_instance.get_template_context()["dag_run"]
return dagrun.clear_number
return task_instance.dag_run.clear_number


@require_openlineage_version(provider_min_version="2.3.0")
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance
Expand Down Expand Up @@ -144,7 +152,7 @@ async def get_artifacts_for_steps(steps, artifacts):
root_parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=task_instance.dag_id,
logical_date=_get_logical_date(task_instance),
clear_number=task_instance.dag_run.clear_number,
clear_number=_get_dag_run_clear_number(task_instance),
)

parent_job = ParentRunMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,16 @@ def _get_logical_date(task_instance):

return date

# todo: move this run_id logic into OpenLineage's listener to avoid differences

def _get_dag_run_clear_number(task_instance):
# todo: remove when min airflow version >= 3.0
if AIRFLOW_V_3_0_PLUS:
dagrun = task_instance.get_template_context()["dag_run"]
return dagrun.clear_number
return task_instance.dag_run.clear_number


# todo: move this run_id logic into OpenLineage's listener to avoid differences
def _get_ol_run_id(task_instance) -> str:
"""
Get OpenLineage run_id from TaskInstance.
Expand Down Expand Up @@ -140,7 +147,7 @@ def _get_ol_dag_run_id(task_instance) -> str:
return OpenLineageAdapter.build_dag_run_id(
dag_id=task_instance.dag_id,
logical_date=_get_logical_date(task_instance),
clear_number=task_instance.dag_run.clear_number,
clear_number=_get_dag_run_clear_number(task_instance),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,17 @@ def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_
query_ids = ["query1", "query2", "query3"]
original_query_ids = copy.deepcopy(query_ids)
logical_date = timezone.datetime(2025, 1, 1)
mock_dagrun = mock.MagicMock(logical_date=logical_date, clear_number=0)
mock_ti = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
map_index=1,
try_number=1,
logical_date=logical_date,
state=TaskInstanceState.FAILED, # This will be query default state if no metadata found
dag_run=mock.MagicMock(logical_date=logical_date, clear_number=0),
dag_run=mock_dagrun,
)
mock_ti.get_template_context.return_value = {"dag_run": mock.MagicMock(logical_date=logical_date)}
mock_ti.get_template_context.return_value = {"dag_run": mock_dagrun}

fake_metadata = {
"query1": {
Expand Down
Loading