Skip to content

Commit

Permalink
Source Facebook Marketing: add unit test (airbytehq#29642)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem1205 authored and harrytou committed Sep 1, 2023
1 parent 681e1b8 commit 098b1ae
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=1.1.5
LABEL io.airbyte.version=1.1.6
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.1.5
dockerImageTag: 1.1.6
dockerRepository: airbyte/source-facebook-marketing
githubIssueLabel: source-facebook-marketing
icon: facebook.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from functools import partial
from math import ceil
from queue import Queue
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional

Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 1
self._api = api
self.page_size = page_size if page_size is not None else 100
self._include_deleted = include_deleted if self.enable_deleted else False
self.max_batch_size = max_batch_size if max_batch_size is not None else 50
self.max_batch_size = self._initial_max_batch_size = max_batch_size if max_batch_size is not None else 50

@cached_property
def fields(self) -> List[str]:
Expand All @@ -72,22 +73,25 @@ def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Itera
requests_q.put(r)

def success(response: FacebookResponse):
self.max_batch_size = self._initial_max_batch_size
records.append(response.json())

def reduce_batch_size():
if self.max_batch_size == 1:
raise RuntimeError("Batch request failed with only 1 request in it")
self.max_batch_size = ceil(self.max_batch_size / 2)
logger.warning(f"Caught retryable error: Too much data was requested in batch. Reducing batch size to {self.max_batch_size}")

def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None):
# although it is Optional in the signature for compatibility, we need it always
assert request, "Missing a request object"
resp_body = response.json()
if not isinstance(resp_body, dict) or (
resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE
and resp_body.get("error", {}).get("message")
!= "Please reduce the amount of data you're asking for, then retry your request"
):
# response body is not a json object or the error code is different
if not isinstance(resp_body, dict):
raise RuntimeError(f"Batch request failed with response: {resp_body}")
if resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request":
logger.warning("Caught retryable error: Too much data was requested in batch. Reducing batch size...")
self.max_batch_size = int(self.max_batch_size / 2)
elif resp_body.get("error", {}).get("message") == "Please reduce the amount of data you're asking for, then retry your request":
reduce_batch_size()
elif resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE:
raise RuntimeError(f"Batch request failed with response: {resp_body}; unknown error code")
requests_q.put(request)

api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses):
assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 1

def test_batch_reduce_amount(self, api, batch, mock_batch_responses, caplog):
"""Reduce batch size to 1 and finally fail with message"""

retryable_message = "Please reduce the amount of data you're asking for, then retry your request"
mock_batch_responses(
[
{
"json": [
{"body": {"error": {"message": retryable_message}}, "code": 500, "headers": {}},
],
}
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint")]
with pytest.raises(RuntimeError, match="Batch request failed with only 1 request in..."):
list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == 7
assert batch.execute.call_count == 7
assert stream.max_batch_size == 1
for index, expected_batch_size in enumerate(["25", "13", "7", "4", "2", "1"]):
assert expected_batch_size in caplog.messages[index]

def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses):
"""Should retry without exception when any request returns 960 error code"""
mock_batch_responses(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ The Facebook Marketing connector uses the `lookback_window` parameter to repeate

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.1.6 | 2023-08-18 | [29642](https://github.com/airbytehq/airbyte/pull/29642) | Stop batch requests if only 1 left in a batch |
| 1.1.5 | 2023-08-18 | [29610](https://github.com/airbytehq/airbyte/pull/29610) | Automatically reduce batch size |
| 1.1.4 | 2023-08-08 | [29412](https://github.com/airbytehq/airbyte/pull/29412) | Add new custom_audience stream |
| 1.1.3 | 2023-08-08 | [29208](https://github.com/airbytehq/airbyte/pull/29208) | Add account type validation during check |
Expand Down

0 comments on commit 098b1ae

Please sign in to comment.