From 5f5948aef1d1f7fd2b0ec242ec37e946ee48daee Mon Sep 17 00:00:00 2001 From: alafanechere Date: Thu, 1 Dec 2022 15:51:39 +0100 Subject: [PATCH 1/9] use observed config in bing ads for testing --- .../source-bing-ads/source_bing_ads/client.py | 47 ++++++++----------- .../source-bing-ads/source_bing_ads/source.py | 4 +- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py index e6193ca3ce0f..4fa6c056dc8f 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py @@ -11,6 +11,7 @@ import backoff import pendulum +from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.logger import AirbyteLogger from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant from bingads.service_client import ServiceClient @@ -38,45 +39,35 @@ class Client: def __init__( self, - tenant_id: str, - reports_start_date: str, - developer_token: str = None, - client_id: str = None, - client_secret: str = None, - refresh_token: str = None, - **kwargs: Mapping[str, Any], + config, ) -> None: + self.config = observe_connector_config(config) self.authorization_data: Mapping[str, AuthorizationData] = {} - self.refresh_token = refresh_token - self.developer_token = developer_token - - self.client_id = client_id - self.client_secret = client_secret - - self.authentication = self._get_auth_client(client_id, tenant_id, client_secret) + self.authentication = self._get_auth_client() self.oauth: OAuthTokens = self._get_access_token() - self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc) + self.reports_start_date = pendulum.parse(config["reports_start_date"]).astimezone(tz=timezone.utc) - def _get_auth_client(self, client_id: str, tenant_id: str, client_secret: str = None) -> OAuthWebAuthCodeGrant: - # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 - auth_creds = { - "client_id": client_id, + @property + def auth_creds(self): + return { + "client_id": self.config["client_id"], "redirection_uri": "", # should be empty string - "client_secret": None, - "tenant": tenant_id, + "client_secret": self.config.get("client_secret"), + "tenant": self.config["tenant_id"], } - # the `client_secret` should be provided for `non-public clients` only - # https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken - if client_secret and client_secret != "": - auth_creds["client_secret"] = client_secret - return OAuthWebAuthCodeGrant(**auth_creds) + + def _get_auth_client(self) -> OAuthWebAuthCodeGrant: + return OAuthWebAuthCodeGrant(**self.auth_creds, token_refreshed_callback=self.update_tokens) + + def update_tokens(self, new_oauth_tokens: OAuthTokens): + self.config["refresh_token"] = new_oauth_tokens.refresh_token @lru_cache(maxsize=4) def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData: return AuthorizationData( account_id=account_id, customer_id=customer_id, - developer_token=self.developer_token, + developer_token=self.config["developer_token"], authentication=self.authentication, ) @@ -85,7 +76,7 @@ def _get_access_token(self) -> OAuthTokens: # clear caches to be able to use new access token self.get_service.cache_clear() self._get_auth_data.cache_clear() - return self.authentication.request_oauth_tokens_by_refresh_token(self.refresh_token) + return self.authentication.request_oauth_tokens_by_refresh_token(self.config["refresh_token"]) def is_token_expiring(self) -> bool: """ diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py index b16bdbf600ab..995271fb8e9d 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py @@ -719,7 +719,7 @@ class SourceBingAds(AbstractSource): def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: - client = Client(**config) + client = Client(config) account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)} if account_ids: return True, None @@ -738,7 +738,7 @@ def get_report_streams(self, aggregation_type: str) -> List[Stream]: ] def streams(self, config: Mapping[str, Any]) -> List[Stream]: - client = Client(**config) + client = Client(config) streams = [ Accounts(client, config), AdGroups(client, config), From b37d7882c6750ff4f8b9a9011af33b66ce220f7c Mon Sep 17 00:00:00 2001 From: alafanechere Date: Thu, 1 Dec 2022 15:52:23 +0100 Subject: [PATCH 2/9] update local configurations and use most recent --- .../source_acceptance_test/conftest.py | 18 ++++++++-- .../utils/connector_runner.py | 34 +++++++++++++++++-- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py index 0696b55e134c..4448a21e196c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py @@ -7,6 +7,7 @@ import json import logging import os +from glob import glob from logging import Logger from pathlib import Path from subprocess import STDOUT, check_output, run @@ -56,7 +57,18 @@ def cache_discovered_catalog_fixture(acceptance_test_config: Config) -> bool: @pytest.fixture(name="connector_config_path") def connector_config_path_fixture(inputs, base_path) -> Path: """Fixture with connector's config path""" - return Path(base_path) / getattr(inputs, "config_path") + original_configuration_path = Path(base_path) / getattr(inputs, "config_path") + updated_configurations_glob = f"{original_configuration_path.parent}/updated_configurations/{original_configuration_path.stem}|**{original_configuration_path.suffix}" + existing_configurations_path_creation_time = [ + (config_file_path, os.path.getctime(config_file_path)) for config_file_path in glob(updated_configurations_glob) + ] + if existing_configurations_path_creation_time: + existing_configurations_path_creation_time.sort(key=lambda x: x[1]) + most_recent_configuration_path = existing_configurations_path_creation_time[-1][0] + else: + most_recent_configuration_path = original_configuration_path + logging.info(f"Using {most_recent_configuration_path} as configuration. It is the most recent version.") + return Path(most_recent_configuration_path) @pytest.fixture(name="invalid_connector_config_path") @@ -127,8 +139,8 @@ def connector_spec_fixture(connector_spec_path) -> ConnectorSpecification: @pytest.fixture(name="docker_runner") -def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner: - return ConnectorRunner(image_tag, volume=tmp_path) +def docker_runner_fixture(image_tag, tmp_path, connector_config_path) -> ConnectorRunner: + return ConnectorRunner(image_tag, volume=tmp_path, connector_configuration_path=connector_config_path) @pytest.fixture(name="previous_connector_image_name") diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 0d7aa48045a3..539d15621425 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -9,14 +9,15 @@ from typing import Iterable, List, Mapping, Optional import docker -from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteControlMessage, AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType +from airbyte_cdk.models import Type as AirbyteMessageType from docker.errors import ContainerError, NotFound from docker.models.containers import Container from pydantic import ValidationError class ConnectorRunner: - def __init__(self, image_name: str, volume: Path): + def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None): self._client = docker.from_env() try: self._image = self._client.images.get(image_name) @@ -26,6 +27,7 @@ def __init__(self, image_name: str, volume: Path): print("Pulling completed") self._runs = 0 self._volume_base = volume + self._connector_configuration_path = connector_configuration_path @property def output_folder(self) -> Path: @@ -109,7 +111,10 @@ def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: for line in self.read(container, command=cmd, with_ext=raise_container_error): f.write(line.encode()) try: - yield AirbyteMessage.parse_raw(line) + airbyte_message = AirbyteMessage.parse_raw(line) + if airbyte_message.type is AirbyteMessageType.CONTROL: + self._handle_control_message(airbyte_message.control) + yield airbyte_message except ValidationError as exc: logging.warning("Unable to parse connector's output %s, error: %s", line, exc) @@ -168,3 +173,26 @@ def env_variables(self): @property def entry_point(self): return self._image.attrs["Config"]["Entrypoint"] + + def _handle_control_message(self, airbyte_control_message: AirbyteControlMessage): + if airbyte_control_message.type is OrchestratorType.CONNECTOR_CONFIG: + new_config = airbyte_control_message.connectorConfig.config + + with open(self._connector_configuration_path) as old_config_file: + old_config = json.load(old_config_file) + if new_config != old_config: + + file_prefix = self._connector_configuration_path.stem.split("|")[0] + if "/updated_configurations/" not in str(self._connector_configuration_path): + Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True) + new_config_file_path = Path( + f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{int(airbyte_control_message.emitted_at)}{self._connector_configuration_path.suffix}" + ) + else: + new_config_file_path = Path( + f"{self._connector_configuration_path.parent}/{file_prefix}|{int(airbyte_control_message.emitted_at)}{self._connector_configuration_path.suffix}" + ) + + with open(new_config_file_path, "w") as new_config_file: + json.dump(new_config, new_config_file) + logging.info(f"Stored most recent configuration value to {new_config_file_path}") From aebc8a51193fec4b7d76d05db171744f401eb26a Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 2 Dec 2022 11:36:49 +0100 Subject: [PATCH 3/9] test connector runner changes --- .../utils/connector_runner.py | 55 +++++++----- .../unit_tests/test_container_runner.py | 88 +++++++++++++++++++ 2 files changed, 120 insertions(+), 23 deletions(-) create mode 100644 airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 539d15621425..b5d193bc5a6d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -9,7 +9,7 @@ from typing import Iterable, List, Mapping, Optional import docker -from airbyte_cdk.models import AirbyteControlMessage, AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType from airbyte_cdk.models import Type as AirbyteMessageType from docker.errors import ContainerError, NotFound from docker.models.containers import Container @@ -17,7 +17,7 @@ class ConnectorRunner: - def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None): + def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path], should_persist_new_configurations=True): self._client = docker.from_env() try: self._image = self._client.images.get(image_name) @@ -28,6 +28,7 @@ def __init__(self, image_name: str, volume: Path, connector_configuration_path: self._runs = 0 self._volume_base = volume self._connector_configuration_path = connector_configuration_path + self._should_persist_new_configurations = should_persist_new_configurations @property def output_folder(self) -> Path: @@ -95,6 +96,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte return output def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]: + self._runs += 1 volumes = self._prepare_volumes(config, state, catalog) logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}") @@ -112,8 +114,13 @@ def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: f.write(line.encode()) try: airbyte_message = AirbyteMessage.parse_raw(line) - if airbyte_message.type is AirbyteMessageType.CONTROL: - self._handle_control_message(airbyte_message.control) + if ( + airbyte_message.type is AirbyteMessageType.CONTROL + and airbyte_message.control.type is OrchestratorType.CONNECTOR_CONFIG + ): + self._persist_new_configuration( + airbyte_message.control.connectorConfig.config, int(airbyte_message.control.emitted_at) + ) yield airbyte_message except ValidationError as exc: logging.warning("Unable to parse connector's output %s, error: %s", line, exc) @@ -174,25 +181,27 @@ def env_variables(self): def entry_point(self): return self._image.attrs["Config"]["Entrypoint"] - def _handle_control_message(self, airbyte_control_message: AirbyteControlMessage): - if airbyte_control_message.type is OrchestratorType.CONNECTOR_CONFIG: - new_config = airbyte_control_message.connectorConfig.config + def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]: + if not self._should_persist_new_configurations: + logging.warning("New configuration persistence is disabled. The new configuration was not persisted") + return None - with open(self._connector_configuration_path) as old_config_file: - old_config = json.load(old_config_file) - if new_config != old_config: + with open(self._connector_configuration_path) as old_configuration_file: + old_configuration = json.load(old_configuration_file) - file_prefix = self._connector_configuration_path.stem.split("|")[0] - if "/updated_configurations/" not in str(self._connector_configuration_path): - Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True) - new_config_file_path = Path( - f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{int(airbyte_control_message.emitted_at)}{self._connector_configuration_path.suffix}" - ) - else: - new_config_file_path = Path( - f"{self._connector_configuration_path.parent}/{file_prefix}|{int(airbyte_control_message.emitted_at)}{self._connector_configuration_path.suffix}" - ) + if new_configuration != old_configuration: + file_prefix = self._connector_configuration_path.stem.split("|")[0] + if "/updated_configurations/" not in str(self._connector_configuration_path): + Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True) + new_configuration_file_path = Path( + f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}" + ) + else: + new_configuration_file_path = Path( + f"{self._connector_configuration_path.parent}/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}" + ) - with open(new_config_file_path, "w") as new_config_file: - json.dump(new_config, new_config_file) - logging.info(f"Stored most recent configuration value to {new_config_file_path}") + with open(new_configuration_file_path, "w") as new_configuration_file: + json.dump(new_configuration, new_configuration_file) + logging.info(f"Stored most recent configuration value to {new_configuration_file_path}") + return new_configuration_file_path diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py new file mode 100644 index 000000000000..6959988fd684 --- /dev/null +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py @@ -0,0 +1,88 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import json + +import pytest +from airbyte_cdk.models import ( + AirbyteControlConnectorConfigMessage, + AirbyteControlMessage, + AirbyteMessage, + AirbyteRecordMessage, + OrchestratorType, +) +from airbyte_cdk.models import Type as AirbyteMessageType +from source_acceptance_test.utils import connector_runner + + +class TestContainerRunner: + def test_run_call_persist_configuration(self, mocker, tmp_path): + old_configuration_path = tmp_path / "config.json" + new_configuration = {"field_a": "new_value_a"} + mocker.patch.object(connector_runner, "docker") + records_reads = [ + AirbyteMessage( + type=AirbyteMessageType.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"foo": "bar"}, emitted_at=1.0) + ).json(exclude_unset=False), + AirbyteMessage( + type=AirbyteMessageType.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=1.0, + connectorConfig=AirbyteControlConnectorConfigMessage(config=new_configuration), + ), + ).json(exclude_unset=False), + ] + mocker.patch.object(connector_runner.ConnectorRunner, "read", mocker.Mock(return_value=records_reads)) + mocker.patch.object(connector_runner.ConnectorRunner, "_persist_new_configuration") + + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path) + list(runner.run("dummy_cmd")) + runner._persist_new_configuration.assert_called_once_with(new_configuration, 1) + + @pytest.mark.parametrize( + "should_persist_new_configurations, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration", + [ + pytest.param( + True, + {"field_a": "value_a"}, + {"field_a": "value_a"}, + 1, + False, + id="Config unchanged: No new configuration persisted", + ), + pytest.param( + True, {"field_a": "value_a"}, {"field_a": "new_value_a"}, 1, True, id="Config changed: New configuration persisted" + ), + pytest.param( + False, + {"field_a": "value_a"}, + {"field_a": "new_value_a"}, + 1, + False, + id="Config changed but persistence is disable: New configuration not persisted", + ), + ], + ) + def test_persist_new_configuration( + self, + mocker, + tmp_path, + should_persist_new_configurations, + old_configuration, + new_configuration, + new_configuration_emitted_at, + expect_new_configuration, + ): + old_configuration_path = tmp_path / "config.json" + with open(old_configuration_path, "w") as old_configuration_file: + json.dump(old_configuration, old_configuration_file) + mocker.patch.object(connector_runner, "docker") + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path, should_persist_new_configurations) + new_configuration_path = runner._persist_new_configuration(new_configuration, new_configuration_emitted_at) + if not expect_new_configuration: + assert new_configuration_path is None + else: + assert new_configuration_path == tmp_path / "updated_configurations" / f"config|{new_configuration_emitted_at}.json" From 7a8ecdecbb804a4a20faf975d9a0a88656e8babc Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 2 Dec 2022 12:14:46 +0100 Subject: [PATCH 4/9] test_connector_config_path_fixture --- .../unit_tests/test_global_fixtures.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py index 9b1ecf07ab5f..7420c193bd4c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py @@ -179,3 +179,23 @@ def test_configured_catalog_fixture(mocker, configured_catalog_path): else: assert configured_catalog == conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.return_value conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.assert_called_once_with(mock_discovered_catalog, set()) + + +@pytest.mark.parametrize( + "updated_configurations", [[], ["config|created_last.json"], ["config|created_first.json", "config|created_last.json"]] +) +def test_connector_config_path_fixture(mocker, tmp_path, updated_configurations): + inputs = mocker.Mock(config_path="config.json") + base_path = tmp_path + if updated_configurations: + updated_configurations_dir = tmp_path / "updated_configurations" + updated_configurations_dir.mkdir() + for configuration_file_name in updated_configurations: + updated_configuration_path = updated_configurations_dir / configuration_file_name + updated_configuration_path.touch() + + connector_config_path = conftest.connector_config_path_fixture.__wrapped__(inputs, base_path) + if not updated_configurations: + assert connector_config_path == base_path / "config.json" + else: + assert connector_config_path == base_path / "updated_configurations" / "config|created_last.json" From 234ba6180da404f5143607862683d213fd608f22 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 2 Dec 2022 12:27:22 +0100 Subject: [PATCH 5/9] make connector_configuration_path optional in ContainerRunner --- .../utils/connector_runner.py | 7 +++---- .../unit_tests/test_container_runner.py | 17 ++++++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index b5d193bc5a6d..ef859723d229 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -17,7 +17,7 @@ class ConnectorRunner: - def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path], should_persist_new_configurations=True): + def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None): self._client = docker.from_env() try: self._image = self._client.images.get(image_name) @@ -28,7 +28,6 @@ def __init__(self, image_name: str, volume: Path, connector_configuration_path: self._runs = 0 self._volume_base = volume self._connector_configuration_path = connector_configuration_path - self._should_persist_new_configurations = should_persist_new_configurations @property def output_folder(self) -> Path: @@ -182,8 +181,8 @@ def entry_point(self): return self._image.attrs["Config"]["Entrypoint"] def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]: - if not self._should_persist_new_configurations: - logging.warning("New configuration persistence is disabled. The new configuration was not persisted") + if self._connector_configuration_path is None: + logging.warning("No configuration path was passed to the ConnectorRunner. The new configuration was not persisted") return None with open(self._connector_configuration_path) as old_configuration_file: diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py index 6959988fd684..eee26a3763d0 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py @@ -38,12 +38,12 @@ def test_run_call_persist_configuration(self, mocker, tmp_path): mocker.patch.object(connector_runner.ConnectorRunner, "read", mocker.Mock(return_value=records_reads)) mocker.patch.object(connector_runner.ConnectorRunner, "_persist_new_configuration") - runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path) + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, connector_configuration_path=old_configuration_path) list(runner.run("dummy_cmd")) runner._persist_new_configuration.assert_called_once_with(new_configuration, 1) @pytest.mark.parametrize( - "should_persist_new_configurations, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration", + "pass_configuration_path, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration", [ pytest.param( True, @@ -70,17 +70,20 @@ def test_persist_new_configuration( self, mocker, tmp_path, - should_persist_new_configurations, + pass_configuration_path, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration, ): - old_configuration_path = tmp_path / "config.json" - with open(old_configuration_path, "w") as old_configuration_file: - json.dump(old_configuration, old_configuration_file) + if pass_configuration_path: + old_configuration_path = tmp_path / "config.json" + with open(old_configuration_path, "w") as old_configuration_file: + json.dump(old_configuration, old_configuration_file) + else: + old_configuration_path = None mocker.patch.object(connector_runner, "docker") - runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path, should_persist_new_configurations) + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path) new_configuration_path = runner._persist_new_configuration(new_configuration, new_configuration_emitted_at) if not expect_new_configuration: assert new_configuration_path is None From c251eb95416689cfb034124584949b8172ec5474 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 2 Dec 2022 12:56:26 +0100 Subject: [PATCH 6/9] add docstrings --- .../source_acceptance_test/conftest.py | 2 +- .../source_acceptance_test/utils/connector_runner.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py index 4448a21e196c..60333371b81d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py @@ -56,7 +56,7 @@ def cache_discovered_catalog_fixture(acceptance_test_config: Config) -> bool: @pytest.fixture(name="connector_config_path") def connector_config_path_fixture(inputs, base_path) -> Path: - """Fixture with connector's config path""" + """Fixture with connector's config path. The path to the latest updated configurations will be returned if any.""" original_configuration_path = Path(base_path) / getattr(inputs, "config_path") updated_configurations_glob = f"{original_configuration_path.parent}/updated_configurations/{original_configuration_path.stem}|**{original_configuration_path.suffix}" existing_configurations_path_creation_time = [ diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index ef859723d229..006c3a534013 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -181,6 +181,15 @@ def entry_point(self): return self._image.attrs["Config"]["Entrypoint"] def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]: + """Store new configuration values to an updated_configurations subdir under the original configuration path. + N.B. The new configuration will not be stored if no configuration path was passed to the ConnectorRunner. + Args: + new_configuration (dict): The updated configuration + configuration_emitted_at (int): Timestamp at which the configuration was emitted (ms) + + Returns: + Optional[Path]: The updated configuration path if it was persisted. + """ if self._connector_configuration_path is None: logging.warning("No configuration path was passed to the ConnectorRunner. The new configuration was not persisted") return None From f879c01153559e0e3bfe83e4cce1fa76b1592ded Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 2 Dec 2022 13:19:56 +0100 Subject: [PATCH 7/9] bump version --- airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md | 3 +++ airbyte-integrations/bases/source-acceptance-test/Dockerfile | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 017634fd619b..c678e58b9e87 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.2.22 +Capture control messages to store and use updated configurations. [#19979](https://github.com/airbytehq/airbyte/pull/19979). + ## 0.2.21 Optionally disable discovered catalog caching. [#19806](https://github.com/airbytehq/airbyte/pull/19806). diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index d1f803f6eca8..e7a3eff1bcd4 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.2.21 +LABEL io.airbyte.version=0.2.22 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] From da3917e473e59611e483e51e7ebf3893a6141b7f Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 6 Dec 2022 10:58:22 +0100 Subject: [PATCH 8/9] gitingore updated_configurations --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 1876fcbfcf48..a6e0049c2f68 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ acceptance_tests_logs/ # Secrets secrets +updated_configurations !airbyte-integrations/connector-templates/**/secrets # Test logs From e1255a7824d8cdfffec052ec022df9905735d4f1 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 6 Dec 2022 11:19:26 +0100 Subject: [PATCH 9/9] revert changes on source-bing-ads --- .../source-bing-ads/source_bing_ads/client.py | 47 +++++++++++-------- .../source-bing-ads/source_bing_ads/source.py | 4 +- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py index 4fa6c056dc8f..e6193ca3ce0f 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py @@ -11,7 +11,6 @@ import backoff import pendulum -from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.logger import AirbyteLogger from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant from bingads.service_client import ServiceClient @@ -39,35 +38,45 @@ class Client: def __init__( self, - config, + tenant_id: str, + reports_start_date: str, + developer_token: str = None, + client_id: str = None, + client_secret: str = None, + refresh_token: str = None, + **kwargs: Mapping[str, Any], ) -> None: - self.config = observe_connector_config(config) self.authorization_data: Mapping[str, AuthorizationData] = {} - self.authentication = self._get_auth_client() + self.refresh_token = refresh_token + self.developer_token = developer_token + + self.client_id = client_id + self.client_secret = client_secret + + self.authentication = self._get_auth_client(client_id, tenant_id, client_secret) self.oauth: OAuthTokens = self._get_access_token() - self.reports_start_date = pendulum.parse(config["reports_start_date"]).astimezone(tz=timezone.utc) + self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc) - @property - def auth_creds(self): - return { - "client_id": self.config["client_id"], + def _get_auth_client(self, client_id: str, tenant_id: str, client_secret: str = None) -> OAuthWebAuthCodeGrant: + # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 + auth_creds = { + "client_id": client_id, "redirection_uri": "", # should be empty string - "client_secret": self.config.get("client_secret"), - "tenant": self.config["tenant_id"], + "client_secret": None, + "tenant": tenant_id, } - - def _get_auth_client(self) -> OAuthWebAuthCodeGrant: - return OAuthWebAuthCodeGrant(**self.auth_creds, token_refreshed_callback=self.update_tokens) - - def update_tokens(self, new_oauth_tokens: OAuthTokens): - self.config["refresh_token"] = new_oauth_tokens.refresh_token + # the `client_secret` should be provided for `non-public clients` only + # https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken + if client_secret and client_secret != "": + auth_creds["client_secret"] = client_secret + return OAuthWebAuthCodeGrant(**auth_creds) @lru_cache(maxsize=4) def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData: return AuthorizationData( account_id=account_id, customer_id=customer_id, - developer_token=self.config["developer_token"], + developer_token=self.developer_token, authentication=self.authentication, ) @@ -76,7 +85,7 @@ def _get_access_token(self) -> OAuthTokens: # clear caches to be able to use new access token self.get_service.cache_clear() self._get_auth_data.cache_clear() - return self.authentication.request_oauth_tokens_by_refresh_token(self.config["refresh_token"]) + return self.authentication.request_oauth_tokens_by_refresh_token(self.refresh_token) def is_token_expiring(self) -> bool: """ diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py index 995271fb8e9d..b16bdbf600ab 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py @@ -719,7 +719,7 @@ class SourceBingAds(AbstractSource): def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: - client = Client(config) + client = Client(**config) account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)} if account_ids: return True, None @@ -738,7 +738,7 @@ def get_report_streams(self, aggregation_type: str) -> List[Stream]: ] def streams(self, config: Mapping[str, Any]) -> List[Stream]: - client = Client(config) + client = Client(**config) streams = [ Accounts(client, config), AdGroups(client, config),