Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/airflow-plugin): bumping up the openlineage-airflow version #10457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ jobs:
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.9.0 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.10.txt'
extra_pip_extras: plugin-v2
fail-fast: false
steps:
- name: Set up JDK 17
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_long_description():
# We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the
# underlying package.
"openlineage-airflow>=1.2.0,<=1.7.0",
"openlineage-airflow>=1.2.0,<=1.12.0",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ def run_datajob(
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)
Expand Down Expand Up @@ -509,6 +510,7 @@ def complete_datajob(
:return: DataProcessInstance
"""
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)
Expand All @@ -530,6 +532,7 @@ def complete_datajob(
f"Result should be either success or failure and it was {ti.state}"
)

assert datajob is not None
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def on_task_instance_running(
# The type ignore is to placate mypy on Airflow 2.1.x.
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
task = task_instance.task
assert task is not None
dag: "DAG" = task.dag # type: ignore[assignment]

self._task_holder.set_task(task_instance)
Expand Down Expand Up @@ -447,6 +448,7 @@ def on_task_instance_finish(
) -> None:
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
task = self._task_holder.get_task(task_instance) or task_instance.task
assert task is not None
dag: "DAG" = task.dag # type: ignore[assignment]
dushayntAW marked this conversation as resolved.
Show resolved Hide resolved

datajob = AirflowGenerator.generate_datajob(
Expand Down
Loading