From 1aadacccdc4d2092b5da59e01f9094129129912b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 18 Aug 2023 20:29:37 +0200 Subject: [PATCH 01/10] Source Facebook Marketing: automatically reduce batch size --- .../source_facebook_marketing/streams/base_streams.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index c86c4f2a93a3..5b1a6c8bbf0f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -78,9 +78,12 @@ def failure(response: FacebookResponse, request: Optional[FacebookRequest] = Non # although it is Optional in the signature for compatibility, we need it always assert request, "Missing a request object" resp_body = response.json() - if not isinstance(resp_body, dict) or resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE: + if not isinstance(resp_body, dict) or (resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE and resp_body.get("error", {}).get("message") != "Please reduce the amount of data you're asking for, then retry your request"): # response body is not a json object or the error code is different raise RuntimeError(f"Batch request failed with response: {resp_body}") + if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": + logger.warning("Caught retryable error: Too much data was requested in batch. Reducing batch size...") + self.max_batch_size = int(self.max_batch_size / 2) requests_q.put(request) api_batch: FacebookAdsApiBatch = self._api.api.new_batch() From a9510f7d629e9a83e017a56d80e9b15d68962151 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 18 Aug 2023 21:10:56 +0200 Subject: [PATCH 02/10] Source Facebook Marketing: update spec limits --- .../source-facebook-marketing/integration_tests/spec.json | 1 + .../source_facebook_marketing/spec.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json index eb6296d944e2..89dc368db106 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json @@ -374,6 +374,7 @@ "default": 50, "order": 9, "exclusiveMinimum": 0, + "maximum": 50, "type": "integer" }, "action_breakdowns_allow_empty": { diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py index 5ddbffada530..9118b92b703e 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import json import logging from datetime import datetime, timezone from enum import Enum @@ -201,7 +201,7 @@ class Config: default=28, ) - max_batch_size: Optional[PositiveInt] = Field( + max_batch_size: Optional[int] = Field( title="Maximum size of Batched Requests", order=9, description=( @@ -209,6 +209,8 @@ class Config: "Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases." ), default=50, + gt=0, + le=50 ) action_breakdowns_allow_empty: bool = Field( From 6cef102f395df51efc8cdec6a71d67bd6b9b0a7e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 18 Aug 2023 21:18:25 +0200 Subject: [PATCH 03/10] Source Facebook Marketing: update docs --- .../connectors/source-facebook-marketing/Dockerfile | 2 +- .../connectors/source-facebook-marketing/metadata.yaml | 2 +- docs/integrations/sources/facebook-marketing.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 6fc064eecc9f..0f9d5f23bb5a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.1.4 +LABEL io.airbyte.version=1.1.5 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml index 8e763bb57eeb..d8757d600855 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml +++ b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c - dockerImageTag: 1.1.4 + dockerImageTag: 1.1.5 dockerRepository: airbyte/source-facebook-marketing githubIssueLabel: source-facebook-marketing icon: facebook.svg diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 265213f3f32a..1f1ca5fe380b 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -175,6 +175,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.1.5 | 2023-08-18 | [29610](https://github.com/airbytehq/airbyte/pull/29610) | Automatically reduce batch size | | 1.1.4 | 2023-08-08 | [29412](https://github.com/airbytehq/airbyte/pull/29412) | Add new custom_audience stream | | 1.1.3 | 2023-08-08 | [29208](https://github.com/airbytehq/airbyte/pull/29208) | Add account type validation during check | | 1.1.2 | 2023-08-03 | [29042](https://github.com/airbytehq/airbyte/pull/29042) | Fix broken `advancedAuth` references for `spec` | From 20171ff31b3e825f1af745894e437cfd0042f9ca Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 18 Aug 2023 22:49:18 +0200 Subject: [PATCH 04/10] Source Facebook Marketing: fix formatting --- .../source_facebook_marketing/spec.py | 4 ++-- .../source_facebook_marketing/streams/base_streams.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py index 9118b92b703e..99340caf954d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import json + import logging from datetime import datetime, timezone from enum import Enum @@ -210,7 +210,7 @@ class Config: ), default=50, gt=0, - le=50 + le=50, ) action_breakdowns_allow_empty: bool = Field( diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 5b1a6c8bbf0f..6521828e1a48 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -78,7 +78,11 @@ def failure(response: FacebookResponse, request: Optional[FacebookRequest] = Non # although it is Optional in the signature for compatibility, we need it always assert request, "Missing a request object" resp_body = response.json() - if not isinstance(resp_body, dict) or (resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE and resp_body.get("error", {}).get("message") != "Please reduce the amount of data you're asking for, then retry your request"): + if not isinstance(resp_body, dict) or ( + resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE + and resp_body.get("error", {}).get("message") + != "Please reduce the amount of data you're asking for, then retry your request" + ): # response body is not a json object or the error code is different raise RuntimeError(f"Batch request failed with response: {resp_body}") if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": From 21895c2c085eb56e150a82ae236054858b12be3a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sat, 19 Aug 2023 00:14:07 +0200 Subject: [PATCH 05/10] Source Facebook Marketing: add unit test --- .../unit_tests/test_base_streams.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py index 20b520194411..c64f1c778ba8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py @@ -128,6 +128,31 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses): assert batch.add_request.call_count == len(requests) assert batch.execute.call_count == 1 + def test_batch_reduce_amount(self, api, batch, mock_batch_responses, caplog): + """Reduce batch size to 1 and finally fail with message""" + + retryable_message = "Please reduce the amount of data you're asking for, then retry your request" + mock_batch_responses( + [ + { + "json": [ + {"body": {"error": {"message": retryable_message}}, "code": 500, "headers": {}}, + ], + } + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint")] + with pytest.raises(RuntimeError, match="Batch request failed with only 1 request in..."): + list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == 7 + assert batch.execute.call_count == 7 + assert stream.max_batch_size == 1 + for index, expected_batch_size in enumerate(["25", "13", "7", "4", "2", "1"]): + assert expected_batch_size in caplog.messages[index] + def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses): """Should retry without exception when any request returns 960 error code""" mock_batch_responses( From 268bb4e4b5c056252f301a2c8b160ab0d82ce604 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sat, 19 Aug 2023 09:57:56 +0200 Subject: [PATCH 06/10] Source Facebook Marketing: refactor --- .../streams/base_streams.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 6521828e1a48..6e14cc79323f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -6,6 +6,7 @@ from abc import ABC, abstractmethod from datetime import datetime from functools import partial +from math import ceil from queue import Queue from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional @@ -50,7 +51,7 @@ def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 1 self._api = api self.page_size = page_size if page_size is not None else 100 self._include_deleted = include_deleted if self.enable_deleted else False - self.max_batch_size = max_batch_size if max_batch_size is not None else 50 + self.max_batch_size = self._initial_max_batch_size = max_batch_size if max_batch_size is not None else 50 @cached_property def fields(self) -> List[str]: @@ -72,22 +73,25 @@ def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Itera requests_q.put(r) def success(response: FacebookResponse): + self.max_batch_size = self._initial_max_batch_size records.append(response.json()) + def reduce_batch_size(): + if self.max_batch_size == 1: + raise RuntimeError("Batch request failed with only 1 request in it") + self.max_batch_size = ceil(self.max_batch_size / 2) + logger.warning(f"Caught retryable error: Too much data was requested in batch. Reducing batch size to {self.max_batch_size}") + def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None): # although it is Optional in the signature for compatibility, we need it always assert request, "Missing a request object" resp_body = response.json() - if not isinstance(resp_body, dict) or ( - resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE - and resp_body.get("error", {}).get("message") - != "Please reduce the amount of data you're asking for, then retry your request" - ): - # response body is not a json object or the error code is different + if not isinstance(resp_body, dict): raise RuntimeError(f"Batch request failed with response: {resp_body}") - if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": - logger.warning("Caught retryable error: Too much data was requested in batch. Reducing batch size...") - self.max_batch_size = int(self.max_batch_size / 2) + elif resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": + reduce_batch_size() + elif resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE: + raise RuntimeError(f"Batch request failed with response: {resp_body}; unknown error code") requests_q.put(request) api_batch: FacebookAdsApiBatch = self._api.api.new_batch() From 95d0e936667e9948c756a6c3b9b6b1087cb3413b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sat, 19 Aug 2023 10:04:17 +0200 Subject: [PATCH 07/10] Source Facebook Marketing: update docs --- .../connectors/source-facebook-marketing/Dockerfile | 2 +- .../connectors/source-facebook-marketing/metadata.yaml | 2 +- docs/integrations/sources/facebook-marketing.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 0f9d5f23bb5a..325d86c0225a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.1.5 +LABEL io.airbyte.version=1.1.6 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml index d8757d600855..b1dc52925d09 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml +++ b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c - dockerImageTag: 1.1.5 + dockerImageTag: 1.1.6 dockerRepository: airbyte/source-facebook-marketing githubIssueLabel: source-facebook-marketing icon: facebook.svg diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 1f1ca5fe380b..20c4123f2fdd 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -175,6 +175,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.1.6 | 2023-08-18 | [29642](https://github.com/airbytehq/airbyte/pull/29642) | Stop batch requests if only 1 left in a batch | | 1.1.5 | 2023-08-18 | [29610](https://github.com/airbytehq/airbyte/pull/29610) | Automatically reduce batch size | | 1.1.4 | 2023-08-08 | [29412](https://github.com/airbytehq/airbyte/pull/29412) | Add new custom_audience stream | | 1.1.3 | 2023-08-08 | [29208](https://github.com/airbytehq/airbyte/pull/29208) | Add account type validation during check | From d7ee6ddadc0726aea270d785cfa7f725271d5548 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sat, 19 Aug 2023 19:50:16 +0200 Subject: [PATCH 08/10] retrigger ci From 778b25e372b220a63dd4b7f20724dd3311ad15fb Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sat, 19 Aug 2023 21:50:39 +0200 Subject: [PATCH 09/10] retrigger ci From 44b58ce53fcc5848d43d9310737ba2b095efdec7 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sun, 20 Aug 2023 11:03:32 +0200 Subject: [PATCH 10/10] retrigger ci