Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: Capture configuration updates from connectors' control messages #19979

Merged
merged 10 commits into from
Dec 6, 2022
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.22
Capture control messages to store and use updated configurations. [#19979](https://github.com/airbytehq/airbyte/pull/19979).

## 0.2.21
Optionally disable discovered catalog caching. [#19806](https://github.com/airbytehq/airbyte/pull/19806).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.version=0.2.22
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import logging
import os
from glob import glob
from logging import Logger
from pathlib import Path
from subprocess import STDOUT, check_output, run
Expand Down Expand Up @@ -55,8 +56,19 @@ def cache_discovered_catalog_fixture(acceptance_test_config: Config) -> bool:

@pytest.fixture(name="connector_config_path")
def connector_config_path_fixture(inputs, base_path) -> Path:
"""Fixture with connector's config path"""
return Path(base_path) / getattr(inputs, "config_path")
"""Fixture with connector's config path. The path to the latest updated configurations will be returned if any."""
original_configuration_path = Path(base_path) / getattr(inputs, "config_path")
updated_configurations_glob = f"{original_configuration_path.parent}/updated_configurations/{original_configuration_path.stem}|**{original_configuration_path.suffix}"
existing_configurations_path_creation_time = [
(config_file_path, os.path.getctime(config_file_path)) for config_file_path in glob(updated_configurations_glob)
]
if existing_configurations_path_creation_time:
existing_configurations_path_creation_time.sort(key=lambda x: x[1])
most_recent_configuration_path = existing_configurations_path_creation_time[-1][0]
else:
most_recent_configuration_path = original_configuration_path
logging.info(f"Using {most_recent_configuration_path} as configuration. It is the most recent version.")
return Path(most_recent_configuration_path)


@pytest.fixture(name="invalid_connector_config_path")
Expand Down Expand Up @@ -127,8 +139,8 @@ def connector_spec_fixture(connector_spec_path) -> ConnectorSpecification:


@pytest.fixture(name="docker_runner")
def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner:
return ConnectorRunner(image_tag, volume=tmp_path)
def docker_runner_fixture(image_tag, tmp_path, connector_config_path) -> ConnectorRunner:
return ConnectorRunner(image_tag, volume=tmp_path, connector_configuration_path=connector_config_path)


@pytest.fixture(name="previous_connector_image_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from typing import Iterable, List, Mapping, Optional

import docker
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType
from airbyte_cdk.models import Type as AirbyteMessageType
from docker.errors import ContainerError, NotFound
from docker.models.containers import Container
from pydantic import ValidationError


class ConnectorRunner:
def __init__(self, image_name: str, volume: Path):
def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None):
self._client = docker.from_env()
try:
self._image = self._client.images.get(image_name)
Expand All @@ -26,6 +27,7 @@ def __init__(self, image_name: str, volume: Path):
print("Pulling completed")
self._runs = 0
self._volume_base = volume
self._connector_configuration_path = connector_configuration_path

@property
def output_folder(self) -> Path:
Expand Down Expand Up @@ -93,6 +95,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte
return output

def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the calling context automatically get this new config? if yes can you clarify how in a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the tests using the ConnectorRunner are using the connector_config fixture (also function scoped) . The connector_config fixture is loading the config from the connector_config_path fixture. The connector_config_path fixture dynamically retrieves the latest configuration written to theupdated_configurations folder. So all tests using connector_config fixture are calling docker_runner methods with the "new" config.

I don't think clarifying the behavior here is a good idea as it's fixture related, not an internal logic of the runner. I wrote something here in the connector_config_path fixture.


self._runs += 1
volumes = self._prepare_volumes(config, state, catalog)
logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}")
Expand All @@ -109,7 +112,15 @@ def run(self, cmd, config=None, state=None, catalog=None, raise_container_error:
for line in self.read(container, command=cmd, with_ext=raise_container_error):
f.write(line.encode())
try:
yield AirbyteMessage.parse_raw(line)
airbyte_message = AirbyteMessage.parse_raw(line)
if (
airbyte_message.type is AirbyteMessageType.CONTROL
and airbyte_message.control.type is OrchestratorType.CONNECTOR_CONFIG
):
self._persist_new_configuration(
airbyte_message.control.connectorConfig.config, int(airbyte_message.control.emitted_at)
)
yield airbyte_message
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
except ValidationError as exc:
logging.warning("Unable to parse connector's output %s, error: %s", line, exc)

Expand Down Expand Up @@ -168,3 +179,37 @@ def env_variables(self):
@property
def entry_point(self):
return self._image.attrs["Config"]["Entrypoint"]

def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]:
"""Store new configuration values to an updated_configurations subdir under the original configuration path.
N.B. The new configuration will not be stored if no configuration path was passed to the ConnectorRunner.
Args:
new_configuration (dict): The updated configuration
configuration_emitted_at (int): Timestamp at which the configuration was emitted (ms)

Returns:
Optional[Path]: The updated configuration path if it was persisted.
"""
if self._connector_configuration_path is None:
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
logging.warning("No configuration path was passed to the ConnectorRunner. The new configuration was not persisted")
return None

with open(self._connector_configuration_path) as old_configuration_file:
old_configuration = json.load(old_configuration_file)

if new_configuration != old_configuration:
file_prefix = self._connector_configuration_path.stem.split("|")[0]
if "/updated_configurations/" not in str(self._connector_configuration_path):
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True)
new_configuration_file_path = Path(
f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}"
)
else:
new_configuration_file_path = Path(
f"{self._connector_configuration_path.parent}/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}"
)

with open(new_configuration_file_path, "w") as new_configuration_file:
json.dump(new_configuration, new_configuration_file)
logging.info(f"Stored most recent configuration value to {new_configuration_file_path}")
return new_configuration_file_path
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import json

import pytest
from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteMessage,
AirbyteRecordMessage,
OrchestratorType,
)
from airbyte_cdk.models import Type as AirbyteMessageType
from source_acceptance_test.utils import connector_runner


class TestContainerRunner:
def test_run_call_persist_configuration(self, mocker, tmp_path):
old_configuration_path = tmp_path / "config.json"
new_configuration = {"field_a": "new_value_a"}
mocker.patch.object(connector_runner, "docker")
records_reads = [
AirbyteMessage(
type=AirbyteMessageType.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"foo": "bar"}, emitted_at=1.0)
).json(exclude_unset=False),
AirbyteMessage(
type=AirbyteMessageType.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=1.0,
connectorConfig=AirbyteControlConnectorConfigMessage(config=new_configuration),
),
).json(exclude_unset=False),
]
mocker.patch.object(connector_runner.ConnectorRunner, "read", mocker.Mock(return_value=records_reads))
mocker.patch.object(connector_runner.ConnectorRunner, "_persist_new_configuration")

runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, connector_configuration_path=old_configuration_path)
list(runner.run("dummy_cmd"))
runner._persist_new_configuration.assert_called_once_with(new_configuration, 1)

@pytest.mark.parametrize(
"pass_configuration_path, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration",
[
pytest.param(
True,
{"field_a": "value_a"},
{"field_a": "value_a"},
1,
False,
id="Config unchanged: No new configuration persisted",
),
pytest.param(
True, {"field_a": "value_a"}, {"field_a": "new_value_a"}, 1, True, id="Config changed: New configuration persisted"
),
pytest.param(
False,
{"field_a": "value_a"},
{"field_a": "new_value_a"},
1,
False,
id="Config changed but persistence is disable: New configuration not persisted",
),
],
)
def test_persist_new_configuration(
self,
mocker,
tmp_path,
pass_configuration_path,
old_configuration,
new_configuration,
new_configuration_emitted_at,
expect_new_configuration,
):
if pass_configuration_path:
old_configuration_path = tmp_path / "config.json"
with open(old_configuration_path, "w") as old_configuration_file:
json.dump(old_configuration, old_configuration_file)
else:
old_configuration_path = None
mocker.patch.object(connector_runner, "docker")
runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path)
new_configuration_path = runner._persist_new_configuration(new_configuration, new_configuration_emitted_at)
if not expect_new_configuration:
assert new_configuration_path is None
else:
assert new_configuration_path == tmp_path / "updated_configurations" / f"config|{new_configuration_emitted_at}.json"
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,23 @@ def test_configured_catalog_fixture(mocker, configured_catalog_path):
else:
assert configured_catalog == conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.return_value
conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.assert_called_once_with(mock_discovered_catalog, set())


@pytest.mark.parametrize(
"updated_configurations", [[], ["config|created_last.json"], ["config|created_first.json", "config|created_last.json"]]
)
def test_connector_config_path_fixture(mocker, tmp_path, updated_configurations):
inputs = mocker.Mock(config_path="config.json")
base_path = tmp_path
if updated_configurations:
updated_configurations_dir = tmp_path / "updated_configurations"
updated_configurations_dir.mkdir()
for configuration_file_name in updated_configurations:
updated_configuration_path = updated_configurations_dir / configuration_file_name
updated_configuration_path.touch()

connector_config_path = conftest.connector_config_path_fixture.__wrapped__(inputs, base_path)
if not updated_configurations:
assert connector_config_path == base_path / "config.json"
else:
assert connector_config_path == base_path / "updated_configurations" / "config|created_last.json"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import backoff
import pendulum
from airbyte_cdk.config_observation import observe_connector_config
from airbyte_cdk.logger import AirbyteLogger
from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant
from bingads.service_client import ServiceClient
Expand Down Expand Up @@ -38,45 +39,35 @@ class Client:

def __init__(
self,
tenant_id: str,
reports_start_date: str,
developer_token: str = None,
client_id: str = None,
client_secret: str = None,
refresh_token: str = None,
**kwargs: Mapping[str, Any],
config,
) -> None:
self.config = observe_connector_config(config)
self.authorization_data: Mapping[str, AuthorizationData] = {}
self.refresh_token = refresh_token
self.developer_token = developer_token

self.client_id = client_id
self.client_secret = client_secret

self.authentication = self._get_auth_client(client_id, tenant_id, client_secret)
self.authentication = self._get_auth_client()
self.oauth: OAuthTokens = self._get_access_token()
self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc)
self.reports_start_date = pendulum.parse(config["reports_start_date"]).astimezone(tz=timezone.utc)

def _get_auth_client(self, client_id: str, tenant_id: str, client_secret: str = None) -> OAuthWebAuthCodeGrant:
# https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390
auth_creds = {
"client_id": client_id,
@property
def auth_creds(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: signatures

return {
"client_id": self.config["client_id"],
"redirection_uri": "", # should be empty string
"client_secret": None,
"tenant": tenant_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need tenant?

"client_secret": self.config.get("client_secret"),
"tenant": self.config["tenant_id"],
}
# the `client_secret` should be provided for `non-public clients` only
# https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken
if client_secret and client_secret != "":
auth_creds["client_secret"] = client_secret
return OAuthWebAuthCodeGrant(**auth_creds)

def _get_auth_client(self) -> OAuthWebAuthCodeGrant:
return OAuthWebAuthCodeGrant(**self.auth_creds, token_refreshed_callback=self.update_tokens)

def update_tokens(self, new_oauth_tokens: OAuthTokens):
self.config["refresh_token"] = new_oauth_tokens.refresh_token

@lru_cache(maxsize=4)
def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData:
return AuthorizationData(
account_id=account_id,
customer_id=customer_id,
developer_token=self.developer_token,
developer_token=self.config["developer_token"],
authentication=self.authentication,
)

Expand All @@ -85,7 +76,7 @@ def _get_access_token(self) -> OAuthTokens:
# clear caches to be able to use new access token
self.get_service.cache_clear()
self._get_auth_data.cache_clear()
return self.authentication.request_oauth_tokens_by_refresh_token(self.refresh_token)
return self.authentication.request_oauth_tokens_by_refresh_token(self.config["refresh_token"])

def is_token_expiring(self) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ class SourceBingAds(AbstractSource):

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
client = Client(**config)
client = Client(config)
account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)}
if account_ids:
return True, None
Expand All @@ -738,7 +738,7 @@ def get_report_streams(self, aggregation_type: str) -> List[Stream]:
]

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
client = Client(**config)
client = Client(config)
streams = [
Accounts(client, config),
AdGroups(client, config),
Expand Down