Skip to content

Commit

Permalink
🐛bug(source-linkedin-ads): Fix unit tests running local (#48565)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 authored Nov 21, 2024
1 parent 4f36d2b commit 95f95f9
Showing 1 changed file with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Dict, List

import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
Expand Down Expand Up @@ -57,8 +58,39 @@
class TestAllStreams:
_instance: SourceLinkedinAds = SourceLinkedinAds()

@staticmethod
def _mock_initialize_cache_for_parent_streams(stream_configs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
parent_streams = set()

def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
for parent_config in parent_configs:
parent_streams.add(parent_config["stream"]["name"])
parent_config["stream"]["retriever"]["requester"]["use_cache"] = False

for stream_config in stream_configs:
if stream_config.get("incremental_sync", {}).get("parent_stream"):
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]["use_cache"] = False

elif stream_config.get("retriever", {}).get("partition_router", {}):
partition_router = stream_config["retriever"]["partition_router"]

if isinstance(partition_router, dict) and partition_router.get("parent_stream_configs"):
update_with_cache_parent_configs(partition_router["parent_stream_configs"])
elif isinstance(partition_router, list):
for router in partition_router:
if router.get("parent_stream_configs"):
update_with_cache_parent_configs(router["parent_stream_configs"])

for stream_config in stream_configs:
if stream_config["name"] in parent_streams:
stream_config["retriever"]["requester"]["use_cache"] = False

return stream_configs

@pytest.mark.parametrize("error_code", [429, 500, 503])
def test_should_retry_on_error(self, error_code, requests_mock, mocker):
mocker.patch.object(ManifestDeclarativeSource, "_initialize_cache_for_parent_streams", side_effect=self._mock_initialize_cache_for_parent_streams)
mocker.patch("time.sleep", lambda x: None)
stream = find_stream("accounts", TEST_CONFIG)
requests_mock.register_uri(
Expand Down Expand Up @@ -122,6 +154,8 @@ def test_path(self, stream_name, expected):
),
)
def test_check_connection(self, requests_mock, status_code, is_connection_successful, error_msg, mocker):
mocker.patch.object(ManifestDeclarativeSource, "_initialize_cache_for_parent_streams",
side_effect=self._mock_initialize_cache_for_parent_streams)
mocker.patch("time.sleep", lambda x: None)
json = {"elements": [{"data": []}] * 500} if 200 >= status_code < 300 else {}
requests_mock.register_uri(
Expand Down

0 comments on commit 95f95f9

Please sign in to comment.