Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom_operator_name added to Airflow 2.7 and compatibility added for Airflow 2.9 and 2.10 #110

Merged
merged 7 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.0.6"
__version__ = "2.1.0"


def get_provider_info():
Expand Down
78 changes: 76 additions & 2 deletions astronomer_starship/compat/starship_compatability.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,20 @@ def pool_attrs(self):
}
return attrs

def task_instance_attrs(self):
attrs = super().task_instance_attrs()
attrs["custom_operator_name"] = {
"attr": "custom_operator_name",
"methods": [("POST", True)],
"test_value": None,
}
return attrs

def task_instances_attrs(self):
attrs = super().task_instances_attrs()
attrs["task_instances"]["test_value"][0]["custom_operator_name"] = None
return attrs


class StarshipAirflow28(StarshipAirflow27):
"""
Expand All @@ -1053,6 +1067,61 @@ def dag_run_attrs(self):
return attrs


class StarshipAirflow29(StarshipAirflow28):
"""
- rendered_map_index in task_instance
- task_display_name in task_instance
"""

def task_instance_attrs(self):
attrs = super().task_instance_attrs()
attrs["rendered_map_index"] = {
"attr": "rendered_map_index",
"methods": [("POST", True)],
"test_value": "rendered_map_index",
}
attrs["task_display_name"] = {
"attr": "task_display_name",
"methods": [("POST", True)],
"test_value": "task_display_name",
}
return attrs

def task_instances_attrs(self):
attrs = super().task_instances_attrs()
attrs["task_instances"]["test_value"][0][
"rendered_map_index"
] = "rendered_map_index"
attrs["task_instances"]["test_value"][0][
"task_display_name"
] = "task_display_name"
return attrs


class StarshipAirflow210(StarshipAirflow28):
"""
- _try_number to try_number in task_instance
- executor in task_instance
"""

# TODO: Identify any other compat issues that exist between 2.8-2.10

def task_instance_attrs(self):
attrs = super().task_instance_attrs()
attrs["try_number"]["attr"] = "try_number"
attrs["executor"] = {
"attr": "executor",
"methods": [("POST", True)],
"test_value": "executor",
}
return attrs

def task_instances_attrs(self):
attrs = super().task_instances_attrs()
attrs["task_instances"]["test_value"][0]["executor"] = "executor"
return attrs


class StarshipCompatabilityLayer:
"""StarshipCompatabilityLayer is a factory class that returns the correct StarshipAirflow class for a version

Expand All @@ -1067,7 +1136,8 @@ class StarshipCompatabilityLayer:
- 2.6 https://github.com/apache/airflow/tree/2.6.3/airflow/models
- 2.7 https://github.com/apache/airflow/tree/2.7.3/airflow/models
- 2.8 https://github.com/apache/airflow/tree/2.8.3/airflow/models
- 2.9
- 2.9 https://github.com/apache/airflow/tree/2.9.3/airflow/models
- 2.10 https://github.com/apache/airflow/tree/2.10.3/airflow/models

>>> isinstance(StarshipCompatabilityLayer("2.8.1"), StarshipAirflow28)
True
Expand Down Expand Up @@ -1096,7 +1166,11 @@ def __new__(cls, airflow_version: "Union[str, None]" = None) -> StarshipAirflow:
)

if int(major) == 2:
if int(minor) >= 8:
if int(minor) == 10:
return StarshipAirflow210()
if int(minor) == 9:
return StarshipAirflow29()
if int(minor) == 8:
return StarshipAirflow28()
if int(minor) == 7:
return StarshipAirflow27()
Expand Down
Loading
Loading