diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 0f9d5f23bb5a..325d86c0225a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.1.5 +LABEL io.airbyte.version=1.1.6 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml index d8757d600855..b1dc52925d09 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml +++ b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c - dockerImageTag: 1.1.5 + dockerImageTag: 1.1.6 dockerRepository: airbyte/source-facebook-marketing githubIssueLabel: source-facebook-marketing icon: facebook.svg diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 6521828e1a48..6e14cc79323f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -6,6 +6,7 @@ from abc import ABC, abstractmethod from datetime import datetime from functools import partial +from math import ceil from queue import Queue from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional @@ -50,7 +51,7 @@ def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 1 self._api = api self.page_size = page_size if page_size is not None else 100 self._include_deleted = include_deleted if self.enable_deleted else False - self.max_batch_size = max_batch_size if max_batch_size is not None else 50 + self.max_batch_size = self._initial_max_batch_size = max_batch_size if max_batch_size is not None else 50 @cached_property def fields(self) -> List[str]: @@ -72,22 +73,25 @@ def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Itera requests_q.put(r) def success(response: FacebookResponse): + self.max_batch_size = self._initial_max_batch_size records.append(response.json()) + def reduce_batch_size(): + if self.max_batch_size == 1: + raise RuntimeError("Batch request failed with only 1 request in it") + self.max_batch_size = ceil(self.max_batch_size / 2) + logger.warning(f"Caught retryable error: Too much data was requested in batch. Reducing batch size to {self.max_batch_size}") + def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None): # although it is Optional in the signature for compatibility, we need it always assert request, "Missing a request object" resp_body = response.json() - if not isinstance(resp_body, dict) or ( - resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE - and resp_body.get("error", {}).get("message") - != "Please reduce the amount of data you're asking for, then retry your request" - ): - # response body is not a json object or the error code is different + if not isinstance(resp_body, dict): raise RuntimeError(f"Batch request failed with response: {resp_body}") - if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": - logger.warning("Caught retryable error: Too much data was requested in batch. Reducing batch size...") - self.max_batch_size = int(self.max_batch_size / 2) + elif resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request": + reduce_batch_size() + elif resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE: + raise RuntimeError(f"Batch request failed with response: {resp_body}; unknown error code") requests_q.put(request) api_batch: FacebookAdsApiBatch = self._api.api.new_batch() diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py index 20b520194411..c64f1c778ba8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py @@ -128,6 +128,31 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses): assert batch.add_request.call_count == len(requests) assert batch.execute.call_count == 1 + def test_batch_reduce_amount(self, api, batch, mock_batch_responses, caplog): + """Reduce batch size to 1 and finally fail with message""" + + retryable_message = "Please reduce the amount of data you're asking for, then retry your request" + mock_batch_responses( + [ + { + "json": [ + {"body": {"error": {"message": retryable_message}}, "code": 500, "headers": {}}, + ], + } + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint")] + with pytest.raises(RuntimeError, match="Batch request failed with only 1 request in..."): + list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == 7 + assert batch.execute.call_count == 7 + assert stream.max_batch_size == 1 + for index, expected_batch_size in enumerate(["25", "13", "7", "4", "2", "1"]): + assert expected_batch_size in caplog.messages[index] + def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses): """Should retry without exception when any request returns 960 error code""" mock_batch_responses( diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 1f1ca5fe380b..20c4123f2fdd 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -175,6 +175,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.1.6 | 2023-08-18 | [29642](https://github.com/airbytehq/airbyte/pull/29642) | Stop batch requests if only 1 left in a batch | | 1.1.5 | 2023-08-18 | [29610](https://github.com/airbytehq/airbyte/pull/29610) | Automatically reduce batch size | | 1.1.4 | 2023-08-08 | [29412](https://github.com/airbytehq/airbyte/pull/29412) | Add new custom_audience stream | | 1.1.3 | 2023-08-08 | [29208](https://github.com/airbytehq/airbyte/pull/29208) | Add account type validation during check |