From de0718c7262bd48f2525efa184139fefa2682c32 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 2 Apr 2024 10:43:30 -0700 Subject: [PATCH] low-code: Add last_page_size and last_record to pagination context (#36408) --- .../declarative_component_schema.yaml | 33 +++++++-------- .../models/declarative_component_schema.py | 2 +- .../strategies/cursor_pagination_strategy.py | 40 ++++++++++++++----- .../test_model_to_component_factory.py | 6 +-- .../test_cursor_pagination_strategy.py | 28 +++++++++++++ 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 57a7d912271b..f8f3b2400597 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -353,11 +353,12 @@ definitions: interpolation_context: - config - headers - - last_records + - last_page_size + - last_record - response examples: - "{{ headers.link.next.cursor }}" - - "{{ last_records[-1]['key'] }}" + - "{{ last_record['key'] }}" - "{{ response['nextPage'] }}" page_size: title: Page Size @@ -372,7 +373,7 @@ definitions: interpolation_context: - config - headers - - last_records + - last_record - response examples: - "{{ response.data.has_more is false }}" @@ -2306,20 +2307,20 @@ interpolation: x-ratelimit-limit: "600" x-ratelimit-remaining: "598" x-ratelimit-reset: "39" - - title: last_records - description: List of records extracted from the last response received from the API. - type: list + - title: last_record + description: Last record extracted from the response received from the API. + type: object + examples: + - name: "Test List: 19" + id: 0236d6d2 + contact_count: 20 + _metadata: + self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2 + - title: last_page_size + description: Number of records extracted from the last response received from the API. + type: object examples: - - - name: "Test List: 19" - id: 0236d6d2 - contact_count: 20 - _metadata: - self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2 - - name: List for CI tests, number 30 - id: 041ee031 - contact_count: 0 - _metadata: - self: https://api.sendgrid.com/v3/marketing/lists/041ee031 + - 2 - title: next_page_token description: Object describing the token to fetch the next page of records. The object has a single key "next_page_token". type: object diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 4a3388a0ea24..5926052dea3b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -880,7 +880,7 @@ class CursorPagination(BaseModel): description='Value of the cursor defining the next page to fetch.', examples=[ '{{ headers.link.next.cursor }}', - "{{ last_records[-1]['key'] }}", + "{{ last_record['key'] }}", "{{ response['nextPage'] }}", ], title='Cursor Value', diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index bea36fc8e323..af3939a8d490 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Optional, Union +from typing import Any, Dict, List, Mapping, Optional, Union import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder @@ -11,7 +11,7 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, Record @dataclass @@ -34,32 +34,52 @@ class CursorPaginationStrategy(PaginationStrategy): stop_condition: Optional[Union[InterpolatedBoolean, str]] = None decoder: Decoder = JsonDecoder(parameters={}) - def __post_init__(self, parameters: Mapping[str, Any]): + def __post_init__(self, parameters: Mapping[str, Any]) -> None: if isinstance(self.cursor_value, str): - self.cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) + self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) + else: + self._cursor_value = self.cursor_value if isinstance(self.stop_condition, str): self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters) + else: + self._stop_condition = self.stop_condition @property def initial_token(self) -> Optional[Any]: return None - def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: decoded_response = self.decoder.decode(response) # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links - headers = response.headers + headers: Dict[str, Any] = dict(response.headers) headers["link"] = response.links - if self.stop_condition: - should_stop = self.stop_condition.eval(self.config, response=decoded_response, headers=headers, last_records=last_records) + last_record = last_records[-1] if last_records else None + + if self._stop_condition: + should_stop = self._stop_condition.eval( + self.config, + response=decoded_response, + headers=headers, + last_records=last_records, + last_record=last_record, + last_page_size=len(last_records), + ) if should_stop: return None - token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response, headers=headers) + token = self._cursor_value.eval( + config=self.config, + last_records=last_records, + response=decoded_response, + headers=headers, + last_record=last_record, + last_page_size=len(last_records), + ) return token if token else None - def reset(self): + def reset(self) -> None: # No state to reset pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index ee0e2e8ae373..4f6c8db9b197 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -243,8 +243,8 @@ def test_full_config_stream(): assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy) assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, JsonDecoder) - assert stream.retriever.paginator.pagination_strategy.cursor_value.string == "{{ response._metadata.next }}" - assert stream.retriever.paginator.pagination_strategy.cursor_value.default == "{{ response._metadata.next }}" + assert stream.retriever.paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}" + assert stream.retriever.paginator.pagination_strategy._cursor_value.default == "{{ response._metadata.next }}" assert stream.retriever.paginator.pagination_strategy.page_size == 10 assert isinstance(stream.retriever.requester, HttpRequester) @@ -1128,7 +1128,7 @@ def test_create_default_paginator(): assert isinstance(paginator.pagination_strategy, CursorPaginationStrategy) assert paginator.pagination_strategy.page_size == 50 - assert paginator.pagination_strategy.cursor_value.string == "{{ response._metadata.next }}" + assert paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}" assert isinstance(paginator.page_size_option, RequestOption) assert paginator.page_size_option.inject_into == RequestOptionType.request_parameter diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py index ce15b586125b..bb75a0201368 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py @@ -62,3 +62,31 @@ def test_cursor_pagination_strategy(test_name, template_string, stop_condition, token = strategy.next_page_token(response, last_records) assert expected_token == token assert page_size == strategy.get_page_size() + + +def test_last_record_points_to_the_last_item_in_last_records_array(): + last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}] + strategy = CursorPaginationStrategy( + page_size=1, + cursor_value="{{ last_record.id }}", + config={}, + parameters={}, + ) + + response = requests.Response() + next_page_token = strategy.next_page_token(response, last_records) + assert next_page_token == 1 + + +def test_last_record_is_node_if_no_records(): + last_records = [] + strategy = CursorPaginationStrategy( + page_size=1, + cursor_value="{{ last_record.id }}", + config={}, + parameters={}, + ) + + response = requests.Response() + next_page_token = strategy.next_page_token(response, last_records) + assert next_page_token is None