Skip to content

Commit

Permalink
Merge pull request #352 from atlanhq/bump-release-2.3.0
Browse files Browse the repository at this point in the history
Bump to release `2.3.0`
  • Loading branch information
Aryamanz29 authored Jun 19, 2024
2 parents 16593a2 + af5feaf commit 21c8b69
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 31 deletions.
18 changes: 18 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## 2.3.0 (June 19, 2024)

### Breaking changes

- Introduced a new pagination approach in `AssetClient.search()` and `FluentSearch.execute()` called **bulk search** (disabled by default). It minimizes system impact when handling large result sets. The SDK switches to this search operation automatically if results exceed a predefined threshold (i.e: `100,000` results). Alternatively, users can enable bulk search explicitly by setting `bulk=True` in `AssetClient.search()` or `FluentSearch.execute()`. The breaking change is in regards to searches that return more than `100,000` results — either the results will now be sorted differently or an error will be thrown.

- The `AssetClient.search()` and `FluentSearch.execute()` methods will now raise an exception (`InvalidRequestError`) in the following scenarios:

- when bulk search is enabled (`bulk=True`) and any user-specified sorting options are found in the search request.

- when bulk search is disabled (`bulk=False`), the number of results exceeds the predefined threshold (i.e: `100,000` assets), and any user-specified sorting options are found in the search request.

_This is because the bulk search approach ignores user-specified sorting and instead reorders the results based on the creation timestamps of assets to handle large numbers of assets efficiently._

### QOL improvements

- Pinned `urllib3>=1.26.0,<3` and moved `networkx` to the dev requirements to avoid potential version mismatches.

## 2.2.4 (June 11, 2024)

### New features
Expand Down
53 changes: 43 additions & 10 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,46 @@ def _prepare_sorts_for_bulk_search(sorts: List[SortItem]):
# if not already sorted by creation time first
return IndexSearchResults.sort_by_timestamp_first(sorts)

def _get_bulk_search_log_message(self, bulk):
return (
(
"Bulk search option is enabled. "
if bulk
else "Result size (%s) exceeds threshold (%s). "
)
+ "Ignoring requests for offset-based paging and using timestamp-based paging instead."
)

# TODO: Try adding @validate_arguments to this method once
# the issue below is fixed or when we switch to pydantic v2
# https://github.com/atlanhq/atlan-python/pull/88#discussion_r1260892704
def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults:
"""
Search for assets using the provided criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.
:param criteria: detailing the search query, parameters, and so on to run
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:returns: the results of the search
:raises InvalidRequestError:
- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.
:raises AtlanError: on any API communication issue
:returns: the results of the search
"""
if bulk:
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
LOGGER.debug(self._get_bulk_search_log_message(bulk))
raw_json = self._client._call_api(
INDEX_SEARCH,
request_obj=criteria,
Expand All @@ -190,10 +212,22 @@ def search(self, criteria: IndexSearchRequest, bulk=False) -> IndexSearchResults
aggregations = self._get_aggregations(raw_json)
count = raw_json.get("approximateCount", 0)

if not bulk and count > IndexSearchResults._MASS_EXTRACT_THRESHOLD:
raise ErrorCode.ENABLE_BULK_FOR_MASS_EXTRACTION.exception_with_parameters(
IndexSearchResults._MASS_EXTRACT_THRESHOLD
if (
count > IndexSearchResults._MASS_EXTRACT_THRESHOLD
and not IndexSearchResults.presorted_by_timestamp(criteria.dsl.sort)
):
# If there is any user-specified sorting present in the search request
if criteria.dsl.sort and len(criteria.dsl.sort) > 1:
raise ErrorCode.UNABLE_TO_RUN_BULK_WITH_SORTS.exception_with_parameters()
# Re-fetch the first page results with updated timestamp sorting
# for bulk search if count > _MASS_EXTRACT_THRESHOLD (100,000 assets)
criteria.dsl.sort = self._prepare_sorts_for_bulk_search(criteria.dsl.sort)
LOGGER.debug(
self._get_bulk_search_log_message(bulk),
count,
IndexSearchResults._MASS_EXTRACT_THRESHOLD,
)
return self.search(criteria)

return IndexSearchResults(
client=self._client,
Expand Down Expand Up @@ -1848,14 +1882,13 @@ def _get_next_page(self):
query = self._criteria.dsl.query
self._criteria.dsl.size = self._size
self._criteria.dsl.from_ = self._start
is_bulk_search = (
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
)

if self._bulk:
LOGGER.debug(
"Bulk search option is enabled. Ignoring requests for default offset-based "
"paging and switching to a creation timestamp-based paging approach."
)
if is_bulk_search:
self._prepare_query_for_timestamp_paging(query)
if raw_json := super()._get_next_page_json(self._bulk):
if raw_json := super()._get_next_page_json(is_bulk_search):
self._count = raw_json.get("approximateCount", 0)
return True
return False
Expand Down
7 changes: 0 additions & 7 deletions pyatlan/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,6 @@ class ErrorCode(Enum):
"Please ensure that no sorting options are included in your search request when performing a bulk search.",
InvalidRequestError,
)
ENABLE_BULK_FOR_MASS_EXTRACTION = (
400,
"ATLAN-PYTHON-400-063",
"Number of results exceeds the predefined threshold {}. Please execute the search again with `bulk=True`.",
"Please note that this will reorder the results based on creation timestamp to efficiently handle a large number of results.", # noqa
InvalidRequestError,
)
AUTHENTICATION_PASSTHROUGH = (
401,
"ATLAN-PYTHON-401-000",
Expand Down
11 changes: 11 additions & 0 deletions pyatlan/model/fluent_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,22 @@ def count(self, client: AtlanClient) -> int:
def execute(self, client: AtlanClient, bulk: bool = False) -> IndexSearchResults:
"""
Run the fluent search to retrieve assets that match the supplied criteria.
`Note:` if the number of results exceeds the predefined threshold
(100,000 assets) this will be automatically converted into a `bulk` search.
:param client: client through which to retrieve the assets.
:param bulk: whether to run the search to retrieve assets that match the supplied criteria,
for large numbers of results (> `100,000`), defaults to `False`. Note: this will reorder the results
(based on creation timestamp) in order to iterate through a large number (more than `100,000`) results.
:raises InvalidRequestError:
- if bulk search is enabled (`bulk=True`) and any
user-specified sorting options are found in the search request.
- if bulk search is disabled (`bulk=False`) and the number of results
exceeds the predefined threshold (i.e: `100,000` assets)
and any user-specified sorting options are found in the search request.
:raises AtlanError: on any API communication issue
:returns: an iterable list of assets that match the supplied criteria, lazily-fetched
"""
return client.asset.search(criteria=self.to_request(), bulk=bulk)
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/model/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ def validate_order(cls, v, values):

class DSL(AtlanObject):
from_: int = Field(default=0, alias="from")
size: int = Field(default=100)
size: int = Field(default=300)
aggregations: Dict[str, Aggregation] = Field(default_factory=dict)
track_total_hits: Optional[bool] = Field(default=True, alias="track_total_hits")
post_filter: Optional[Query] = Field(default=None, alias="post_filter")
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.4
2.3.0
24 changes: 23 additions & 1 deletion tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pytest

from pyatlan.client.asset import LOGGER
from pyatlan.client.asset import LOGGER, IndexSearchResults
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, Table
from pyatlan.model.assets.atlas_glossary_term import AtlasGlossaryTerm
Expand Down Expand Up @@ -249,6 +249,28 @@ def test_search_pagination(mock_logger, client: AtlanClient):
assert "Bulk search option is enabled." in mock_logger.call_args_list[0][0][0]
mock_logger.reset_mock()

# Test search(): when the number of results exceeds the predefined threshold,
# the SDK automatically switches to a `bulk` search option using timestamp-based pagination.
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", 1):
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.asset.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, expected_sorts, size, TOTAL_ASSETS)
assert mock_logger.call_count < TOTAL_ASSETS
assert (
"Result size (%s) exceeds threshold (%s)."
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()


def test_search_iter(client: AtlanClient):
size = 15
Expand Down
49 changes: 42 additions & 7 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,8 @@ def test_index_search_pagination(
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception suggesting the user switch to bulk search
# when the number of results exceeds the predefined threshold
# Test search(): when the number of results exceeds the predefined threshold
# it will automatically convert to a `bulk` search.
TEST_THRESHOLD = 1
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
Expand All @@ -1477,17 +1477,52 @@ def test_index_search_pagination(
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
).to_request()
results = client.search(criteria=request)
expected_sorts = [
Asset.CREATE_TIME.order(SortOrder.ASCENDING),
Asset.GUID.order(SortOrder.ASCENDING),
]
_assert_search_results(results, index_search_paging_json, expected_sorts)
assert mock_api_caller._call_api.call_count == 3
assert mock_logger.call_count == 1
assert (
"Result size (%s) exceeds threshold (%s)"
in mock_logger.call_args_list[0][0][0]
)
mock_logger.reset_mock()
mock_api_caller.reset_mock()

# Test search(bulk=False): Raise an exception when the number of results exceeds
# the predefined threshold and there are any user-defined sorting options present
with patch.object(IndexSearchResults, "_MASS_EXTRACT_THRESHOLD", TEST_THRESHOLD):
mock_api_caller._call_api.side_effect = [
index_search_paging_json,
]
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(CompoundQuery.asset_type(AtlasGlossaryTerm))
.page_size(2)
# With some sort options
.sort(Asset.NAME.order(SortOrder.ASCENDING))
).to_request()

with pytest.raises(
InvalidRequestError,
match=(
"ATLAN-PYTHON-400-063 Number of results exceeds the predefined threshold "
f"{TEST_THRESHOLD}. Please execute the search again with `bulk=True`."
"ATLAN-PYTHON-400-063 Unable to execute "
"bulk search with user-defined sorting options. "
"Suggestion: Please ensure that no sorting options are "
"included in your search request when performing a bulk search."
),
):
results = client.search(criteria=request)
client.search(criteria=request)
assert mock_api_caller._call_api.call_count == 1
mock_api_caller.reset_mock()
mock_api_caller.reset_mock()

# Test search(): Raise an exception when
# bulk search is attempted with any user-defined sorting options
# Test search(bulk=True): Raise an exception when bulk search is enabled
# and there are any user-defined sorting options present
request = (
FluentSearch()
.where(CompoundQuery.active_assets())
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_search_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_dsl():
)
assert (
dsl.json(by_alias=True, exclude_none=True)
== '{"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
== '{"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": '
'{"__typeName.keyword": {"value": "Schema"}}}, "sort": []}'
Expand All @@ -283,7 +283,7 @@ def test_index_search_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"__guid": {"order": "asc"}}]}, "relationAttributes": [], '
Expand All @@ -300,7 +300,7 @@ def test_audit_search_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"entityId": {"order": "asc"}}]}}'
Expand All @@ -316,7 +316,7 @@ def test_search_log_request():
assert (
request.json(by_alias=True, exclude_none=True)
== '{"attributes": ["schemaName", "databaseName"],'
' "dsl": {"from": 0, "size": 100, "aggregations": {}, "track_total_hits": true, '
' "dsl": {"from": 0, "size": 300, "aggregations": {}, "track_total_hits": true, '
'"post_filter": {"term": {"databaseName.keyword": '
'{"value": "ATLAN_SAMPLE_DATA"}}}, "query": {"term": {"__typeName.keyword": {"value": "Schema"}}}, '
'"sort": [{"entityGuidsAll": {"order": "asc"}}]}}'
Expand Down

0 comments on commit 21c8b69

Please sign in to comment.