Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source FB Marketing: fix execute_in_batch when batch is bigger than 50 #10588

Merged
merged 10 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
- id: black
args: ["--config", "pyproject.toml"]
- repo: https://github.com/timothycrosley/isort
rev: 5.6.4
rev: 5.10.1
hooks:
- id: isort
args:
Expand All @@ -38,14 +38,14 @@ repos:
).?$

- repo: https://github.com/csachs/pyproject-flake8
rev: 0.0.1a2
rev: v0.0.1a2.post1
hooks:
- id: pyproject-flake8
args: ["--config", "pyproject.toml"]
additional_dependencies: ["mccabe"]
alias: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 0.930
rev: v0.930
hooks:
- id: mypy
args: ["--config-file", "pyproject.toml"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.version=0.2.36
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def handle_call_rate_limit(self, response, params):
max_pause_interval = self.pause_interval_minimum

for record in response.json():
# in case it is failed inner request the headers might not be present
keu marked this conversation as resolved.
Show resolved Hide resolved
if "headers" not in record:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the header not be present? Could you add a comment to help the reader understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

continue
headers = {header["name"].lower(): header["value"] for header in record["headers"]}
usage, pause_interval = self._parse_call_rate_header(headers)
max_usage = max(max_usage, usage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ def fields(self) -> List[str]:
"""List of fields that we want to query, for now just all properties from stream's schema"""
return list(self.get_json_schema().get("properties", {}).keys())

def _execute_batch(self, batch: FacebookAdsApiBatch) -> FacebookAdsApiBatch:
def _execute_batch(self, batch: FacebookAdsApiBatch) -> None:
"""Execute batch, retry in case of failures"""
while batch:
batch = batch.execute()
if batch:
logger.info("Retry failed requests in batch")
return batch

def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]:
"""Execute list of requests in batches"""
Expand All @@ -71,9 +70,10 @@ def failure(response: FacebookResponse):
for request in pending_requests:
api_batch.add_request(request, success=success, failure=failure)
if len(api_batch) == MAX_BATCH_SIZE:
api_batch = self._execute_batch(api_batch)
self._execute_batch(api_batch)
yield from records
records = []
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()

self._execute_batch(api_batch)
yield from records
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from functools import partial
from typing import Any, Iterable, Mapping

import pytest
from facebook_business import FacebookSession
from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch, FacebookRequest
from source_facebook_marketing.api import MyFacebookAdsApi
from source_facebook_marketing.streams.base_streams import FBMarketingStream
from source_facebook_marketing.streams.common import MAX_BATCH_SIZE


@pytest.fixture(name="mock_batch_responses")
def mock_batch_responses_fixture(requests_mock):
return partial(requests_mock.register_uri, "POST", f"{FacebookSession.GRAPH}/{FacebookAdsApi.API_VERSION}/")


@pytest.fixture(name="batch")
def batch_fixture(api, mocker):
batch = FacebookAdsApiBatch(api=api.api)
mocker.patch.object(batch, "execute", wraps=batch.execute)
mocker.patch.object(batch, "add_request", wraps=batch.add_request)
mocker.patch.object(MyFacebookAdsApi, "new_batch", return_value=batch)
return batch


class SomeTestStream(FBMarketingStream):
def list_objects(self, params: Mapping[str, Any]) -> Iterable:
yield from []


class TestBaseStream:
def test_execute_in_batch_with_few_requests(self, api, batch, mock_batch_responses):
"""Should execute single batch if number of requests less than MAX_BATCH_SIZE."""
mock_batch_responses(
[
{
"json": [{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}] * 3,
}
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(5)]

result = list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == len(requests)
batch.execute.assert_called_once()
assert len(result) == 3

def test_execute_in_batch_with_many_requests(self, api, batch, mock_batch_responses):
"""Should execute as many batches as needed if number of requests bigger than MAX_BATCH_SIZE."""
mock_batch_responses(
[
{
"json": [{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}] * 5,
}
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(MAX_BATCH_SIZE + 1)]

result = list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 2
assert len(result) == 5 * 2

def test_execute_in_batch_with_retries(self, api, batch, mock_batch_responses):
"""Should retry batch execution until succeed"""
# batch.execute.side_effect = [batch, batch, None]
mock_batch_responses(
[
{
"json": [
{},
{},
{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}},
],
},
{
"json": [
{},
{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}},
],
},
{
"json": [
{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}},
],
},
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(3)]

result = list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 1
assert len(result) == 3

def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses):
"""Should fail with exception when any request returns error"""
mock_batch_responses(
[
{
"json": [
{"body": "{}", "code": 500, "headers": {}},
{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}},
],
}
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(5)]

with pytest.raises(RuntimeError, match="Batch request failed with response:"):
list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 1
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.36 | 2022-02-24 | [10588](https://github.com/airbytehq/airbyte/pull/10588) | Fix `execute_in_batch` for large amount of requests |
| 0.2.35 | 2022-02-18 | [10348](https://github.com/airbytehq/airbyte/pull/10348) | Add 104 error code to backoff triggers |
| 0.2.34 | 2022-02-17 | [10180](https://github.com/airbytehq/airbyte/pull/9805) | Performance and reliability fixes |
| 0.2.33 | 2021-12-28 | [10180](https://github.com/airbytehq/airbyte/pull/10180) | Add AdAccount and Images streams |
Expand Down