-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
low-code: Add last_page_size and last_record to pagination context #36408
Changes from all commits
30ba4d4
97449e6
a7b57dd
ea63f36
d89dcbe
2aaa8d4
4e193d3
7b514e0
2e976b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,15 @@ | |
# | ||
|
||
from dataclasses import InitVar, dataclass | ||
from typing import Any, List, Mapping, Optional, Union | ||
from typing import Any, Dict, List, Mapping, Optional, Union | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder | ||
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder | ||
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean | ||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy | ||
from airbyte_cdk.sources.declarative.types import Config | ||
from airbyte_cdk.sources.declarative.types import Config, Record | ||
|
||
|
||
@dataclass | ||
|
@@ -34,32 +34,52 @@ class CursorPaginationStrategy(PaginationStrategy): | |
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None | ||
decoder: Decoder = JsonDecoder(parameters={}) | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]): | ||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
if isinstance(self.cursor_value, str): | ||
self.cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) | ||
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) | ||
else: | ||
self._cursor_value = self.cursor_value | ||
if isinstance(self.stop_condition, str): | ||
self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters) | ||
else: | ||
self._stop_condition = self.stop_condition | ||
|
||
@property | ||
def initial_token(self) -> Optional[Any]: | ||
return None | ||
|
||
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: | ||
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: | ||
decoded_response = self.decoder.decode(response) | ||
|
||
# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This | ||
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links | ||
headers = response.headers | ||
headers: Dict[str, Any] = dict(response.headers) | ||
headers["link"] = response.links | ||
|
||
if self.stop_condition: | ||
should_stop = self.stop_condition.eval(self.config, response=decoded_response, headers=headers, last_records=last_records) | ||
last_record = last_records[-1] if last_records else None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and this would presumably be removed later in favor of an incoming There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's right. It'll be passed as a param |
||
|
||
if self._stop_condition: | ||
should_stop = self._stop_condition.eval( | ||
self.config, | ||
response=decoded_response, | ||
headers=headers, | ||
last_records=last_records, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. last_records will be removed in a follow up PR |
||
last_record=last_record, | ||
last_page_size=len(last_records), | ||
) | ||
if should_stop: | ||
return None | ||
token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response, headers=headers) | ||
token = self._cursor_value.eval( | ||
config=self.config, | ||
last_records=last_records, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. last_records will be removed in a follow up PR |
||
response=decoded_response, | ||
headers=headers, | ||
last_record=last_record, | ||
last_page_size=len(last_records), | ||
) | ||
return token if token else None | ||
|
||
def reset(self): | ||
def reset(self) -> None: | ||
# No state to reset | ||
pass | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for mypy