From 665eb32e194b8546bcecebc1571c28a4849bac41 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Wed, 4 Jun 2025 13:18:40 +0200 Subject: [PATCH 1/8] feat: initial draft implementation for `request_body` support in the `PowerBIDatasetRefreshOperator` - not tested yet --- .../airflow/providers/microsoft/azure/hooks/powerbi.py | 6 +++++- .../providers/microsoft/azure/operators/powerbi.py | 5 +++++ .../providers/microsoft/azure/triggers/powerbi.py | 9 +++++++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index c8e9fa76424fd..8575f097227a0 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -189,12 +189,15 @@ async def get_refresh_details_by_refresh_id( return refresh_details - async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str: + async def trigger_dataset_refresh( + self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None + ) -> str: """ Triggers a refresh for the specified dataset from the given group id. :param dataset_id: The dataset id. :param group_id: The workspace id. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. :return: Request id of the dataset refresh request. """ @@ -207,6 +210,7 @@ async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> st "group_id": group_id, "dataset_id": dataset_id, }, + data=request_body, ) request_id = response.get("requestid") diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 9abab3da977a3..1a6d92b6f7a1d 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -72,6 +72,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator): :param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True. :param check_interval: Number of seconds to wait before rechecking the refresh status. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. """ template_fields: Sequence[str] = ( @@ -92,6 +93,8 @@ def __init__( proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, + request_body: dict[str, Any] + | None = None, # TODO: Maybe change to pydantic model / dataclass / typeddict in future? **kwargs, ) -> None: super().__init__(**kwargs) @@ -102,6 +105,7 @@ def __init__( self.conn_id = conn_id self.timeout = timeout self.check_interval = check_interval + self.request_body = request_body @property def proxies(self) -> dict | None: @@ -124,6 +128,7 @@ def execute(self, context: Context): api_version=self.api_version, check_interval=self.check_interval, wait_for_termination=self.wait_for_termination, + request_body=self.request_body, ), method_name=self.get_refresh_status.__name__, ) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py index bcba15a2af1ff..b39bb68e45f71 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -20,7 +20,7 @@ import asyncio import time from collections.abc import AsyncIterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import tenacity @@ -51,9 +51,9 @@ class PowerBITrigger(BaseTrigger): :param dataset_id: The dataset Id to refresh. :param dataset_refresh_id: The dataset refresh Id to poll for the status, if not provided a new refresh will be triggered. :param group_id: The workspace Id where dataset is located. - :param end_time: Time in seconds when trigger should stop polling. :param check_interval: Time in seconds to wait between each poll. :param wait_for_termination: Wait for the dataset refresh to complete or fail. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. """ def __init__( @@ -67,6 +67,8 @@ def __init__( api_version: APIVersion | str | None = None, check_interval: int = 60, wait_for_termination: bool = True, + request_body: dict[str, Any] + | None = None, # TODO: Maybe change to pydantic model / dataclass / typeddict in future? ): super().__init__() self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) @@ -76,6 +78,7 @@ def __init__( self.group_id = group_id self.check_interval = check_interval self.wait_for_termination = wait_for_termination + self.request_body = request_body def serialize(self): """Serialize the trigger instance.""" @@ -91,6 +94,7 @@ def serialize(self): "timeout": self.timeout, "check_interval": self.check_interval, "wait_for_termination": self.wait_for_termination, + "request_body": self.request_body, }, ) @@ -113,6 +117,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: dataset_refresh_id = await self.hook.trigger_dataset_refresh( dataset_id=self.dataset_id, group_id=self.group_id, + request_body=self.request_body, ) if dataset_refresh_id: From 5bc345948abeed5debd52899af2dbc03409d75f0 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Wed, 4 Jun 2025 13:41:58 +0200 Subject: [PATCH 2/8] fix: reference to correct URL in case API changes in future --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 2 +- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 2 +- .../src/airflow/providers/microsoft/azure/triggers/powerbi.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 8575f097227a0..0500e869de6df 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -197,7 +197,7 @@ async def trigger_dataset_refresh( :param dataset_id: The dataset id. :param group_id: The workspace id. - :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. :return: Request id of the dataset refresh request. """ diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 1a6d92b6f7a1d..367c0da1c13a6 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -72,7 +72,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator): :param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True. :param check_interval: Number of seconds to wait before rechecking the refresh status. - :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. """ template_fields: Sequence[str] = ( diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py index b39bb68e45f71..6fd42dbd23f41 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -53,7 +53,7 @@ class PowerBITrigger(BaseTrigger): :param group_id: The workspace Id where dataset is located. :param check_interval: Time in seconds to wait between each poll. :param wait_for_termination: Wait for the dataset refresh to complete or fail. - :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset#datasetrefreshrequest. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. """ def __init__( From 6e97777ea6906a2dc45649b06e85681303835fee Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Thu, 5 Jun 2025 15:09:47 +0200 Subject: [PATCH 3/8] test: update `TestPowerBITrigger` --- .../unit/microsoft/azure/triggers/test_powerbi.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py index 950d707498e61..8a1eda0139345 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py @@ -44,6 +44,18 @@ TIMEOUT = 5 MODULE = "airflow.providers.microsoft.azure" CHECK_INTERVAL = 1 +REQUEST_BODY = { + "type": "full", + "commitMode": "transactional", + "objects": [ + { + "table": "Customer", + "partition": "Robert" + } + ], + "applyRefreshPolicy": "false", + "timeout": "05:00:00" +} API_VERSION = "v1.0" @@ -102,6 +114,7 @@ def test_powerbi_trigger_serialization(self, connection): check_interval=CHECK_INTERVAL, wait_for_termination=True, timeout=TIMEOUT, + request_body=REQUEST_BODY, ) classpath, kwargs = powerbi_trigger.serialize() @@ -116,6 +129,7 @@ def test_powerbi_trigger_serialization(self, connection): "api_version": API_VERSION, "check_interval": CHECK_INTERVAL, "wait_for_termination": True, + "request_body": REQUEST_BODY, } @pytest.mark.asyncio From 00314fd5679f1754505479af49395135095d3ddb Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Thu, 5 Jun 2025 15:59:56 +0200 Subject: [PATCH 4/8] chore: pre-commit checks --- .../tests/unit/microsoft/azure/triggers/test_powerbi.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py index 8a1eda0139345..658b6e3046f3b 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/triggers/test_powerbi.py @@ -47,14 +47,9 @@ REQUEST_BODY = { "type": "full", "commitMode": "transactional", - "objects": [ - { - "table": "Customer", - "partition": "Robert" - } - ], + "objects": [{"table": "Customer", "partition": "Robert"}], "applyRefreshPolicy": "false", - "timeout": "05:00:00" + "timeout": "05:00:00", } API_VERSION = "v1.0" From e2f14ba5b71fb26b3ef1175e43ee158932b7c152 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Thu, 5 Jun 2025 16:07:01 +0200 Subject: [PATCH 5/8] chore: remove TODOs --- .../src/airflow/providers/microsoft/azure/operators/powerbi.py | 3 +-- .../src/airflow/providers/microsoft/azure/triggers/powerbi.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 367c0da1c13a6..444100dc667f7 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -93,8 +93,7 @@ def __init__( proxies: dict | None = None, api_version: APIVersion | str | None = None, check_interval: int = 60, - request_body: dict[str, Any] - | None = None, # TODO: Maybe change to pydantic model / dataclass / typeddict in future? + request_body: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py index 6fd42dbd23f41..042916f796637 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/triggers/powerbi.py @@ -67,8 +67,7 @@ def __init__( api_version: APIVersion | str | None = None, check_interval: int = 60, wait_for_termination: bool = True, - request_body: dict[str, Any] - | None = None, # TODO: Maybe change to pydantic model / dataclass / typeddict in future? + request_body: dict[str, Any] | None = None, ): super().__init__() self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout) From 967ccc5046a80b0c2fc337d0646f7236557d3419 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Thu, 5 Jun 2025 16:30:16 +0200 Subject: [PATCH 6/8] test: add sample request_body to `TestPowerBIDatasetRefreshOperator` --- .../tests/unit/microsoft/azure/operators/test_powerbi.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py index 43794c43880f1..6d1bdddeef90c 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py @@ -39,6 +39,13 @@ TASK_ID = "run_powerbi_operator" GROUP_ID = "group_id" DATASET_ID = "dataset_id" +REQUEST_BODY = { + "type": "full", + "commitMode": "transactional", + "objects": [{"table": "Customer", "partition": "Robert"}], + "applyRefreshPolicy": "false", + "timeout": "05:00:00", +} CONFIG = { "task_id": TASK_ID, "conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, @@ -46,6 +53,7 @@ "dataset_id": DATASET_ID, "check_interval": 1, "timeout": 3, + "request_body": REQUEST_BODY, } NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" From ee9664dce966de803e72a2d0c53cbba7738c1789 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Fri, 6 Jun 2025 09:39:07 +0200 Subject: [PATCH 7/8] test: add example of `request_body` to system tests / examples --- .../microsoft/azure/example_powerbi_dataset_refresh.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py index 5453caff6463b..ac43371e14266 100644 --- a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py +++ b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py @@ -66,6 +66,12 @@ def create_connection(conn_id_name: str): group_id=GROUP_ID, check_interval=30, timeout=120, + request_body={ + "type": "full", + "retryCount": 3, + "commitMode": "transactional", + "notifyOption": "MailOnFailure", + } ) # [END howto_operator_powerbi_refresh_async] From 82bfc6d7d3e075baa2d6dfd361e4717329fb4347 Mon Sep 17 00:00:00 2001 From: Ramon Vermeulen Date: Fri, 6 Jun 2025 09:59:40 +0200 Subject: [PATCH 8/8] chore: missing trailing comma --- .../system/microsoft/azure/example_powerbi_dataset_refresh.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py index ac43371e14266..b772405da8601 100644 --- a/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py +++ b/providers/microsoft/azure/tests/system/microsoft/azure/example_powerbi_dataset_refresh.py @@ -71,7 +71,7 @@ def create_connection(conn_id_name: str): "retryCount": 3, "commitMode": "transactional", "notifyOption": "MailOnFailure", - } + }, ) # [END howto_operator_powerbi_refresh_async]