diff --git a/airbyte-integrations/connectors/source-linkedin-ads/metadata.yaml b/airbyte-integrations/connectors/source-linkedin-ads/metadata.yaml index 52e098a5d7d9..30950b7cf5e2 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/metadata.yaml +++ b/airbyte-integrations/connectors/source-linkedin-ads/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 137ece28-5434-455c-8f34-69dc3782f451 - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 dockerRepository: airbyte/source-linkedin-ads documentationUrl: https://docs.airbyte.com/integrations/sources/linkedin-ads githubIssueLabel: source-linkedin-ads diff --git a/airbyte-integrations/connectors/source-linkedin-ads/pyproject.toml b/airbyte-integrations/connectors/source-linkedin-ads/pyproject.toml index 83d635ed38eb..2f9acf037b0f 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/pyproject.toml +++ b/airbyte-integrations/connectors/source-linkedin-ads/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "1.0.0" +version = "1.0.1" name = "source-linkedin-ads" description = "Source implementation for Linkedin Ads." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/analytics_streams.py b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/analytics_streams.py index e0835840630e..bf9e8eaf423a 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/analytics_streams.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/analytics_streams.py @@ -213,7 +213,18 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, (See Restrictions: https://learn.microsoft.com/en-us/linkedin/marketing/integrations/ads-reporting/ads-reporting?view=li-lms-2023-09&tabs=http#restrictions) """ parsed_response = response.json() - if len(parsed_response.get("elements")) < self.records_limit: + is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit + + # Note: The API might return fewer records than requested within the limits during pagination. + # This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164 + paging_params = parsed_response.get("paging", {}) + is_end_of_records = ( + paging_params["total"] - paging_params["start"] <= self.records_limit + if all(param in paging_params for param in ("total", "start")) + else True + ) + + if is_elements_less_than_limit and is_end_of_records: return None raise Exception( f"Limit {self.records_limit} elements exceeded. " diff --git a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/streams.py b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/streams.py index 5151d52a961d..e208df685d9f 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/streams.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/source_linkedin_ads/streams.py @@ -70,9 +70,20 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, https://docs.microsoft.com/en-us/linkedin/shared/api-guide/concepts/pagination?context=linkedin/marketing/context """ parsed_response = response.json() - if len(parsed_response.get("elements")) < self.records_limit: + is_elements_less_than_limit = len(parsed_response.get("elements")) < self.records_limit + + # Note: The API might return fewer records than requested within the limits during pagination. + # This behavior is documented at: https://github.com/airbytehq/airbyte/issues/34164 + paging_params = parsed_response.get("paging", {}) + is_end_of_records = ( + paging_params["total"] - paging_params["start"] <= self.records_limit + if all(param in paging_params for param in ("total", "start")) + else True + ) + + if is_elements_less_than_limit and is_end_of_records: return None - return {"start": parsed_response.get("paging").get("start") + self.records_limit} + return {"start": paging_params.get("start") + self.records_limit} def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None diff --git a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/test_source.py b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/test_source.py index 56097007da27..09fe4da46afb 100644 --- a/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-linkedin-ads/unit_tests/test_source.py @@ -161,7 +161,7 @@ def test_accounts(self): "response_json, expected", ( ({"elements": []}, None), - ({"elements": [{"data": []}] * 500, "paging": {"start": 0}}, {"start": 500}), + ({"elements": [{"data": []}] * 500, "paging": {"start": 0, "total": 600}}, {"start": 500}), ), ) def test_next_page_token(self, requests_mock, response_json, expected): diff --git a/docs/integrations/sources/linkedin-ads.md b/docs/integrations/sources/linkedin-ads.md index a22f098beb20..eaf6c456ed24 100644 --- a/docs/integrations/sources/linkedin-ads.md +++ b/docs/integrations/sources/linkedin-ads.md @@ -171,6 +171,7 @@ After 5 unsuccessful attempts - the connector will stop the sync operation. In s | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 1.0.1 | 2024-03-28 | [34152](https://github.com/airbytehq/airbyte/pull/34152) | Proceed pagination if return less than expected | | 1.0.0 | 2024-04-10 | [36927](https://github.com/airbytehq/airbyte/pull/36927) | Update primary key for Analytics Streams | | 0.8.0 | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` | | 0.7.0 | 2024-02-20 | [35465](https://github.com/airbytehq/airbyte/pull/35465) | Per-error reporting and continue sync on stream failures |