diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 99a524df6d1a3..c49880237b8a7 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -517,10 +517,14 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None: run_facets = {**get_airflow_dag_run_facet(dag_run)} + date = dag_run.logical_date + if AIRFLOW_V_3_0_PLUS and date is None: + date = dag_run.run_after + self.submit_callable( self.adapter.dag_started, dag_id=dag_run.dag_id, - logical_date=dag_run.logical_date, + logical_date=date, start_date=dag_run.start_date, nominal_start_time=data_interval_start, nominal_end_time=data_interval_end, @@ -556,12 +560,16 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: else: task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + date = dag_run.logical_date + if AIRFLOW_V_3_0_PLUS and date is None: + date = dag_run.run_after + self.submit_callable( self.adapter.dag_success, dag_id=dag_run.dag_id, run_id=dag_run.run_id, end_date=dag_run.end_date, - logical_date=dag_run.logical_date, + logical_date=date, clear_number=dag_run.clear_number, task_ids=task_ids, dag_run_state=dag_run.get_state(), @@ -590,12 +598,17 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: task_ids = DagRun._get_partial_task_ids(dag_run.dag) else: task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + + date = dag_run.logical_date + if AIRFLOW_V_3_0_PLUS and date is None: + date = dag_run.run_after + self.submit_callable( self.adapter.dag_failed, dag_id=dag_run.dag_id, run_id=dag_run.run_id, end_date=dag_run.end_date, - logical_date=dag_run.logical_date, + logical_date=date, clear_number=dag_run.clear_number, dag_run_state=dag_run.get_state(), task_ids=task_ids,