From 016fa3578e3fda6fee4040ef341a08918fdc302e Mon Sep 17 00:00:00 2001 From: jsjasonseba Date: Sun, 8 Jun 2025 16:45:38 +0700 Subject: [PATCH 1/3] Fix fallback to use extra if kwargs is set to None --- .../src/airflow/providers/alibaba/cloud/hooks/maxcompute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py index 2a249f123da13..ce5afcb918cde 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py @@ -45,7 +45,7 @@ def fallback_to_default_project_endpoint(func: Callable[..., RT]) -> Callable[.. def inner_wrapper(self, **kwargs) -> RT: required_args = ("project", "endpoint") for arg_name in required_args: - kwargs[arg_name] = kwargs.get(arg_name, getattr(self, arg_name)) + kwargs[arg_name] = getattr(self, arg_name) if kwargs.get(arg_name) is None else kwargs[arg_name] if not kwargs[arg_name]: raise MaxComputeConfigurationException( f'"{arg_name}" must be passed either as ' From 4b2df46fba648b29b46cee790969e62c7711bccd Mon Sep 17 00:00:00 2001 From: jsjasonseba Date: Sun, 8 Jun 2025 16:49:23 +0700 Subject: [PATCH 2/3] Fix incorrect conn_id argument --- .../providers/alibaba/cloud/operators/maxcompute.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py index c8424c028464e..d0b16d1eb6eeb 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py @@ -56,8 +56,8 @@ class MaxComputeSQLOperator(BaseOperator): :param default_schema: The default schema to use. :param quota_name: The quota name to use. Defaults to project default quota if not specified. - :param alibabacloud_conn_id: The connection ID to use. Defaults to - `alibabacloud_default` if not specified. + :param maxcompute_conn_id: The connection ID to use. Defaults to + `maxcompute_default` if not specified. :param cancel_on_kill: Flag which indicates whether to stop running instance or not when task is killed. Default is True. """ @@ -72,7 +72,7 @@ class MaxComputeSQLOperator(BaseOperator): "aliases", "default_schema", "quota_name", - "alibabacloud_conn_id", + "maxcompute_conn_id", ) template_ext: Sequence[str] = (".sql",) template_fields_renderers = {"sql": "sql"} @@ -90,7 +90,7 @@ def __init__( aliases: dict[str, str] | None = None, default_schema: str | None = None, quota_name: str | None = None, - alibabacloud_conn_id: str = "alibabacloud_default", + maxcompute_conn_id: str = "maxcompute_default", cancel_on_kill: bool = True, **kwargs, ) -> None: @@ -104,13 +104,13 @@ def __init__( self.aliases = aliases self.default_schema = default_schema self.quota_name = quota_name - self.alibabacloud_conn_id = alibabacloud_conn_id + self.maxcompute_conn_id = maxcompute_conn_id self.cancel_on_kill = cancel_on_kill self.hook: MaxComputeHook | None = None self.instance: Instance | None = None def execute(self, context: Context) -> str: - self.hook = MaxComputeHook(alibabacloud_conn_id=self.alibabacloud_conn_id) + self.hook = MaxComputeHook(maxcompute_conn_id=self.maxcompute_conn_id) self.instance = self.hook.run_sql( sql=self.sql, From 36d12d55ce7f86edbbfbc2ecfada904aa0e39523 Mon Sep 17 00:00:00 2001 From: jsjasonseba Date: Mon, 9 Jun 2025 08:51:41 +0700 Subject: [PATCH 3/3] Add comment --- .../src/airflow/providers/alibaba/cloud/hooks/maxcompute.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py index ce5afcb918cde..94de067b07d9b 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/hooks/maxcompute.py @@ -45,6 +45,8 @@ def fallback_to_default_project_endpoint(func: Callable[..., RT]) -> Callable[.. def inner_wrapper(self, **kwargs) -> RT: required_args = ("project", "endpoint") for arg_name in required_args: + # Use the value from kwargs if it is provided and value is not None, otherwise use the + # value from the connection extra property. kwargs[arg_name] = getattr(self, arg_name) if kwargs.get(arg_name) is None else kwargs[arg_name] if not kwargs[arg_name]: raise MaxComputeConfigurationException(