Skip to content

APP-5740 : Add support for iterative pagination to User, Group, and Workflow results #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ def get_groups(
def get_all_groups(
self,
limit: int = 20,
) -> List[AtlanGroup]:
) -> GroupResponse:
"""Deprecated - use group.get_all() instead."""
warn(
"This method is deprecated, please use 'group.get_all' instead, which offers identical functionality.",
Expand All @@ -867,7 +867,7 @@ def get_group_by_name(
self,
alias: str,
limit: int = 20,
) -> Optional[List[AtlanGroup]]:
) -> Optional[GroupResponse]:
"""Deprecated - use group.get_by_name() instead."""
warn(
"This method is deprecated, please use 'group.get_by_name' instead, which offers identical functionality.",
Expand Down Expand Up @@ -989,7 +989,7 @@ def get_users(
def get_all_users(
self,
limit: int = 20,
) -> List[AtlanUser]:
) -> UserResponse:
"""Deprecated - use user.get_all() instead."""
warn(
"This method is deprecated, please use 'user.get_all' instead, which offers identical functionality.",
Expand All @@ -1002,7 +1002,7 @@ def get_users_by_email(
self,
email: str,
limit: int = 20,
) -> Optional[List[AtlanUser]]:
) -> Optional[UserResponse]:
"""Deprecated - use user.get_by_email() instead."""
warn(
"This method is deprecated, please use 'user.get_by_email' instead, which offers identical functionality.",
Expand Down
26 changes: 13 additions & 13 deletions pyatlan/client/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,45 +142,45 @@ def get_all(
offset: int = 0,
sort: Optional[str] = "name",
columns: Optional[List[str]] = None,
) -> List[AtlanGroup]:
) -> GroupResponse:
"""
Retrieve all groups defined in Atlan.
Retrieve a GroupResponse object containing a list of all groups defined in Atlan.

:param limit: maximum number of results to be returned
:param offset: starting point for the list of groups when paging
:param sort: property by which to sort the results, by default : name
:param columns: provides columns projection support for groups endpoint
:returns: a list of all the groups in Atlan
:returns: a GroupResponse object with all groups based on the parameters; results are iterable.
"""
if response := self.get(offset=offset, limit=limit, sort=sort, columns=columns):
return response.records # type: ignore
return None # type: ignore
response: GroupResponse = self.get(
offset=offset, limit=limit, sort=sort, columns=columns
)
return response

@validate_arguments
def get_by_name(
self,
alias: str,
limit: int = 20,
offset: int = 0,
) -> Optional[List[AtlanGroup]]:
) -> Optional[GroupResponse]:
"""
Retrieve all groups with a name that contains the provided string.
Retrieves a GroupResponse object containing a list of groups that match the specified string.
(This could include a complete group name, in which case there should be at most
a single item in the returned list, or could be a partial group name to retrieve
all groups with that naming convention.)

:param alias: name (as it appears in the UI) on which to filter the groups
:param limit: maximum number of groups to retrieve
:param offset: starting point for the list of groups when paging
:returns: all groups whose name (in the UI) contains the provided string
:returns: a GroupResponse object containing a list of groups whose UI names include the given string; the results are iterable.
"""
if response := self.get(
response: GroupResponse = self.get(
offset=offset,
limit=limit,
post_filter='{"$and":[{"alias":{"$ilike":"%' + alias + '%"}}]}',
):
return response.records
return None
)
return response

@validate_arguments
def get_members(
Expand Down
51 changes: 25 additions & 26 deletions pyatlan/client/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ def __init__(self, client: ApiCaller):
@validate_arguments
def create(
self, users: List[AtlanUser], return_info: bool = False
) -> Optional[List[AtlanUser]]:
) -> Optional[UserResponse]:
"""
Create one or more new users.

:param users: the details of the new users
:param return_info: whether to return the details of created users, defaults to `False`
:raises AtlanError: on any API communication issue
:returns: the list of details of created users if `return_info` is `True`, otherwise `None`
:returns: a UserResponse object which contains the list of details of created users if `return_info` is `True`, otherwise `None`
"""
from pyatlan.client.atlan import AtlanClient

Expand Down Expand Up @@ -201,27 +201,27 @@ def get_all(
limit: int = 20,
offset: int = 0,
sort: Optional[str] = "username",
) -> List[AtlanUser]:
) -> UserResponse:
"""
Retrieve all users defined in Atlan.
Retrieve a UserResponse object containing a list of all users defined in Atlan.

:param limit: maximum number of users to retrieve
:param offset: starting point for the list of users when paging
:param sort: property by which to sort the results, by default : `username`
:returns: a list of all the users in Atlan
:returns: a UserResponse object with all users based on the parameters; results are iterable.
"""
response: UserResponse = self.get(offset=offset, limit=limit, sort=sort)
return [user for user in response]
return response

@validate_arguments
def get_by_email(
self,
email: str,
limit: int = 20,
offset: int = 0,
) -> Optional[List[AtlanUser]]:
) -> Optional[UserResponse]:
"""
Retrieves all users with email addresses that contain the provided email.
Retrieves a UserResponse object containing a list of users with email addresses that contain the provided email.
(This could include a complete email address, in which case there should be at
most a single item in the returned list, or could be a partial email address
such as "@example.com" to retrieve all users with that domain in their email
Expand All @@ -230,35 +230,35 @@ def get_by_email(
:param email: on which to filter the users
:param limit: maximum number of users to retrieve
:param offset: starting point for the list of users when pagin
:returns: all users whose email addresses contain the provided string
:returns: a UserResponse object containing a list of users whose email addresses contain the provided string
"""
if response := self.get(
response: UserResponse = self.get(
offset=offset,
limit=limit,
post_filter='{"email":{"$ilike":"%' + email + '%"}}',
):
return response.records
return None
)
return response

@validate_arguments
def get_by_emails(
self,
emails: List[str],
limit: int = 20,
offset: int = 0,
) -> Optional[List[AtlanUser]]:
) -> Optional[UserResponse]:
"""
Retrieves all users with email addresses that match the provided list of emails.
Retrieves a UserResponse object containing a list of users with email addresses that match the provided list of emails.

:param emails: list of email addresses to filter the users
:param limit: maximum number of users to retrieve
:param offset: starting point for the list of users when paginating
:returns: list of users whose email addresses match the provided list
:returns: a UserResponse object containing a list of users whose email addresses match the provided list
"""
email_filter = '{"email":{"$in":' + dumps(emails or [""]) + "}}"
if response := self.get(offset=offset, limit=limit, post_filter=email_filter):
return response.records
return None
response: UserResponse = self.get(
offset=offset, limit=limit, post_filter=email_filter
)
return response

@validate_arguments
def get_by_username(self, username: str) -> Optional[AtlanUser]:
Expand All @@ -281,21 +281,20 @@ def get_by_username(self, username: str) -> Optional[AtlanUser]:
@validate_arguments
def get_by_usernames(
self, usernames: List[str], limit: int = 5, offset: int = 0
) -> Optional[List[AtlanUser]]:
) -> Optional[UserResponse]:
"""
Retrieves users based on their usernames.
Retrieves a UserResponse object containing a list of users based on their usernames.

:param usernames: the list of usernames by which to find the users
:param limit: maximum number of users to retrieve
:param offset: starting point for the list of users when paginating
:returns: the users with the specified usernames
:returns: a UserResponse object containing list of users with the specified usernames
"""
username_filter = '{"username":{"$in":' + dumps(usernames or [""]) + "}}"
if response := self.get(
response: UserResponse = self.get(
offset=offset, limit=limit, post_filter=username_filter
):
return response.records
return None
)
return response

@validate_arguments
def add_to_groups(
Expand Down
23 changes: 16 additions & 7 deletions pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,16 @@ def find_runs_by_status_and_time_range(
finished_at: Optional[str] = None,
from_: int = 0,
size: int = 100,
) -> List[WorkflowSearchResult]:
) -> WorkflowSearchResponse:
"""
Find workflow runs based on their status and time range.
Retrieves a WorkflowSearchResponse object containing workflow runs based on their status and time range.

:param status: list of the workflow statuses to filter
:param started_at: (optional) lower bound on 'status.startedAt' (e.g 'now-2h')
:param finished_at: (optional) lower bound on 'status.finishedAt' (e.g 'now-1h')
:param from_:(optional) starting index of the search results (default: `0`).
:param size: (optional) maximum number of search results to return (default: `100`).
:returns: list of workflows matching the filters
:returns: a WorkflowSearchResponse object containing a list of workflows matching the filters
:raises ValidationError: if inputs are invalid
:raises AtlanError: on any API communication issue
"""
Expand Down Expand Up @@ -205,7 +205,7 @@ def find_runs_by_status_and_time_range(
run_lookup_results = self._find_runs(
query=run_lookup_query, from_=from_, size=size
)
return run_lookup_results.hits and run_lookup_results.hits.hits or []
return run_lookup_results

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
Expand Down Expand Up @@ -280,7 +280,16 @@ def _find_runs(
WORKFLOW_INDEX_RUN_SEARCH,
request_obj=request,
)
return WorkflowSearchResponse(**raw_json)
return WorkflowSearchResponse(
client=self._client,
endpoint=WORKFLOW_INDEX_RUN_SEARCH,
criteria=query,
start=request.from_,
size=request.size,
took=raw_json.get("took"),
hits=raw_json.get("hits"),
shards=raw_json.get("_shards"),
)

def _add_schedule(
self,
Expand Down Expand Up @@ -517,7 +526,7 @@ def get_runs(
workflow_phase: AtlanWorkflowPhase,
from_: int = 0,
size: int = 100,
) -> Optional[List[WorkflowSearchResult]]:
) -> Optional[WorkflowSearchResponse]:
"""
Retrieves all workflow runs.

Expand All @@ -542,7 +551,7 @@ def get_runs(
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
)
response = self._find_runs(query, from_=from_, size=size)
return results if (results := response.hits and response.hits.hits) else None
return response

@validate_arguments
def stop(
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/model/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _get_next_page(self):
self._criteria.offset = self._start
self._criteria.limit = self._size
raw_json = self._client._call_api(
api=self._endpoint,
api=self._endpoint.format_path_with_params(),
query_params=self._criteria.query_params,
)
if not raw_json.get("records"):
Expand Down
73 changes: 65 additions & 8 deletions pyatlan/model/workflow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Generator, List, Optional

from pydantic.v1 import Field
from pydantic.v1 import Field, PrivateAttr, ValidationError, parse_obj_as

from pyatlan.client.common import ApiCaller
from pyatlan.errors import ErrorCode
from pyatlan.model.core import AtlanObject
from pyatlan.model.enums import AtlanWorkflowPhase, SortOrder
from pyatlan.model.search import Query, SortItem
from pyatlan.utils import API


class PackageParameter(AtlanObject):
Expand Down Expand Up @@ -132,12 +135,6 @@ class WorkflowSearchHits(AtlanObject):
hits: Optional[List[WorkflowSearchResult]] = Field(default=None)


class WorkflowSearchResponse(AtlanObject):
took: Optional[int] = Field(default=None)
hits: Optional[WorkflowSearchHits] = Field(default=None)
shards: Optional[Dict[str, Any]] = Field(alias="_shards", default=None)


class ReRunRequest(AtlanObject):
namespace: Optional[str] = Field(default="default")
resource_kind: Optional[str] = Field(default="WorkflowTemplate")
Expand Down Expand Up @@ -207,6 +204,7 @@ class WorkflowSearchRequest(AtlanObject):
)
],
)
source: Optional[WorkflowSearchResultDetail] = Field(default=None, alias="_source")

class Config:
json_encoders = {Query: lambda v: v.to_dict(), SortItem: lambda v: v.to_dict()}
Expand All @@ -216,3 +214,62 @@ def __init__(__pydantic_self__, **data: Any) -> None:
__pydantic_self__.__fields_set__.update(
["from_", "size", "track_total_hits", "sort"]
)


class WorkflowSearchResponse(AtlanObject):
_size: int = PrivateAttr()
_start: int = PrivateAttr()
_endpoint: API = PrivateAttr()
_client: ApiCaller = PrivateAttr()
_criteria: WorkflowSearchRequest = PrivateAttr()
took: Optional[int] = Field(default=None)
hits: Optional[WorkflowSearchHits] = Field(default=None)
shards: Optional[Dict[str, Any]] = Field(alias="_shards", default=None)

def __init__(self, **data: Any):
super().__init__(**data)
self._endpoint = data.get("endpoint") # type: ignore[assignment]
self._client = data.get("client") # type: ignore[assignment]
self._criteria = data.get("criteria") # type: ignore[assignment]
self._size = data.get("size") # type: ignore[assignment]
self._start = data.get("start") # type: ignore[assignment]

@property
def count(self):
return self.hits.total.get("value", 0) if self.hits and self.hits.total else 0

def current_page(self) -> Optional[List[WorkflowSearchResult]]:
return self.hits.hits # type: ignore

def next_page(self, start=None, size=None) -> bool:
self._start = start or self._start + self._size
if size:
self._size = size
return self._get_next_page() if self.hits.hits else False # type: ignore

def _get_next_page(self):
request = WorkflowSearchRequest(
query=self._criteria, from_=self._start, size=self._size
)
raw_json = self._client._call_api(
api=self._endpoint,
request_obj=request,
)
if not raw_json.get("hits", {}).get("hits"):
self.hits.hits = []
return False
try:
self.hits.hits = parse_obj_as(
List[WorkflowSearchResult], raw_json["hits"]["hits"]
)
except ValidationError as err:
raise ErrorCode.JSON_ERROR.exception_with_parameters(
raw_json, 200, str(err)
) from err
return True

def __iter__(self) -> Generator[WorkflowSearchResult, None, None]: # type: ignore[override]
while True:
yield from self.current_page() or []
if not self.next_page():
break
Loading
Loading