Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Facebook Marketing: add unit test #29642

Merged
merged 13 commits into from
Aug 21, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=1.1.5
LABEL io.airbyte.version=1.1.6
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.1.5
dockerImageTag: 1.1.6
dockerRepository: airbyte/source-facebook-marketing
githubIssueLabel: source-facebook-marketing
icon: facebook.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from functools import partial
from math import ceil
from queue import Queue
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional

Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 1
self._api = api
self.page_size = page_size if page_size is not None else 100
self._include_deleted = include_deleted if self.enable_deleted else False
self.max_batch_size = max_batch_size if max_batch_size is not None else 50
self.max_batch_size = self._initial_max_batch_size = max_batch_size if max_batch_size is not None else 50

@cached_property
def fields(self) -> List[str]:
Expand All @@ -72,22 +73,25 @@ def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Itera
requests_q.put(r)

def success(response: FacebookResponse):
self.max_batch_size = self._initial_max_batch_size
records.append(response.json())

def reduce_batch_size():
if self.max_batch_size == 1:
raise RuntimeError("Batch request failed with only 1 request in it")
self.max_batch_size = ceil(self.max_batch_size / 2)
logger.warning(f"Caught retryable error: Too much data was requested in batch. Reducing batch size to {self.max_batch_size}")

def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None):
# although it is Optional in the signature for compatibility, we need it always
assert request, "Missing a request object"
resp_body = response.json()
if not isinstance(resp_body, dict) or (
resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE
and resp_body.get("error", {}).get("message")
!= "Please reduce the amount of data you're asking for, then retry your request"
):
# response body is not a json object or the error code is different
if not isinstance(resp_body, dict):
raise RuntimeError(f"Batch request failed with response: {resp_body}")
if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request":
logger.warning("Caught retryable error: Too much data was requested in batch. Reducing batch size...")
self.max_batch_size = int(self.max_batch_size / 2)
elif resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request":
reduce_batch_size()
elif resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE:
raise RuntimeError(f"Batch request failed with response: {resp_body}; unknown error code")
requests_q.put(request)

api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses):
assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 1

def test_batch_reduce_amount(self, api, batch, mock_batch_responses, caplog):
"""Reduce batch size to 1 and finally fail with message"""

retryable_message = "Please reduce the amount of data you're asking for, then retry your request"
mock_batch_responses(
[
{
"json": [
{"body": {"error": {"message": retryable_message}}, "code": 500, "headers": {}},
],
}
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint")]
with pytest.raises(RuntimeError, match="Batch request failed with only 1 request in..."):
list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == 7
assert batch.execute.call_count == 7
assert stream.max_batch_size == 1
for index, expected_batch_size in enumerate(["25", "13", "7", "4", "2", "1"]):
assert expected_batch_size in caplog.messages[index]

def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses):
"""Should retry without exception when any request returns 960 error code"""
mock_batch_responses(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.1.6 | 2023-08-18 | [29642](https://github.com/airbytehq/airbyte/pull/29642) | Stop batch requests if only 1 left in a batch |
| 1.1.5 | 2023-08-18 | [29610](https://github.com/airbytehq/airbyte/pull/29610) | Automatically reduce batch size |
| 1.1.4 | 2023-08-08 | [29412](https://github.com/airbytehq/airbyte/pull/29412) | Add new custom_audience stream |
| 1.1.3 | 2023-08-08 | [29208](https://github.com/airbytehq/airbyte/pull/29208) | Add account type validation during check |
Expand Down