Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,15 @@ async def get_refresh_details_by_refresh_id(

return refresh_details

async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str:
async def trigger_dataset_refresh(
self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None
) -> str:
"""
Triggers a refresh for the specified dataset from the given group id.

:param dataset_id: The dataset id.
:param group_id: The workspace id.
:param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.

:return: Request id of the dataset refresh request.
"""
Expand All @@ -207,6 +210,7 @@ async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> st
"group_id": group_id,
"dataset_id": dataset_id,
},
data=request_body,
)

request_id = response.get("requestid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
:param timeout: Time in seconds to wait for a dataset to reach a terminal status for asynchronous waits. Used only if ``wait_for_termination`` is True.
:param check_interval: Number of seconds to wait before rechecking the
refresh status.
:param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.
"""

template_fields: Sequence[str] = (
Expand All @@ -92,6 +93,7 @@ def __init__(
proxies: dict | None = None,
api_version: APIVersion | str | None = None,
check_interval: int = 60,
request_body: dict[str, Any] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -102,6 +104,7 @@ def __init__(
self.conn_id = conn_id
self.timeout = timeout
self.check_interval = check_interval
self.request_body = request_body

@property
def proxies(self) -> dict | None:
Expand All @@ -124,6 +127,7 @@ def execute(self, context: Context):
api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_termination,
request_body=self.request_body,
),
method_name=self.get_refresh_status.__name__,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import asyncio
import time
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import tenacity

Expand Down Expand Up @@ -51,9 +51,9 @@ class PowerBITrigger(BaseTrigger):
:param dataset_id: The dataset Id to refresh.
:param dataset_refresh_id: The dataset refresh Id to poll for the status, if not provided a new refresh will be triggered.
:param group_id: The workspace Id where dataset is located.
:param end_time: Time in seconds when trigger should stop polling.
:param check_interval: Time in seconds to wait between each poll.
:param wait_for_termination: Wait for the dataset refresh to complete or fail.
:param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.
"""

def __init__(
Expand All @@ -67,6 +67,7 @@ def __init__(
api_version: APIVersion | str | None = None,
check_interval: int = 60,
wait_for_termination: bool = True,
request_body: dict[str, Any] | None = None,
):
super().__init__()
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, api_version=api_version, timeout=timeout)
Expand All @@ -76,6 +77,7 @@ def __init__(
self.group_id = group_id
self.check_interval = check_interval
self.wait_for_termination = wait_for_termination
self.request_body = request_body

def serialize(self):
"""Serialize the trigger instance."""
Expand All @@ -91,6 +93,7 @@ def serialize(self):
"timeout": self.timeout,
"check_interval": self.check_interval,
"wait_for_termination": self.wait_for_termination,
"request_body": self.request_body,
},
)

Expand All @@ -113,6 +116,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
dataset_refresh_id = await self.hook.trigger_dataset_refresh(
dataset_id=self.dataset_id,
group_id=self.group_id,
request_body=self.request_body,
)

if dataset_refresh_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def create_connection(conn_id_name: str):
group_id=GROUP_ID,
check_interval=30,
timeout=120,
request_body={
"type": "full",
"retryCount": 3,
"commitMode": "transactional",
"notifyOption": "MailOnFailure",
},
)
# [END howto_operator_powerbi_refresh_async]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,21 @@
TASK_ID = "run_powerbi_operator"
GROUP_ID = "group_id"
DATASET_ID = "dataset_id"
REQUEST_BODY = {
"type": "full",
"commitMode": "transactional",
"objects": [{"table": "Customer", "partition": "Robert"}],
"applyRefreshPolicy": "false",
"timeout": "05:00:00",
}
CONFIG = {
"task_id": TASK_ID,
"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET,
"group_id": GROUP_ID,
"dataset_id": DATASET_ID,
"check_interval": 1,
"timeout": 3,
"request_body": REQUEST_BODY,
}
NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@
TIMEOUT = 5
MODULE = "airflow.providers.microsoft.azure"
CHECK_INTERVAL = 1
REQUEST_BODY = {
"type": "full",
"commitMode": "transactional",
"objects": [{"table": "Customer", "partition": "Robert"}],
"applyRefreshPolicy": "false",
"timeout": "05:00:00",
}
API_VERSION = "v1.0"


Expand Down Expand Up @@ -102,6 +109,7 @@ def test_powerbi_trigger_serialization(self, connection):
check_interval=CHECK_INTERVAL,
wait_for_termination=True,
timeout=TIMEOUT,
request_body=REQUEST_BODY,
)

classpath, kwargs = powerbi_trigger.serialize()
Expand All @@ -116,6 +124,7 @@ def test_powerbi_trigger_serialization(self, connection):
"api_version": API_VERSION,
"check_interval": CHECK_INTERVAL,
"wait_for_termination": True,
"request_body": REQUEST_BODY,
}

@pytest.mark.asyncio
Expand Down