From f829ac50adc67b46262276f74590607c6bb25445 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Wed, 8 Mar 2023 22:23:11 +0000 Subject: [PATCH 01/10] New connector_builder module for handling requests from the Connector Builder. Also implements `resolve_manifest` handler --- .../python/connector_builder/README.md | 32 +++ .../python/connector_builder/__init__.py | 3 + .../connector_builder_source.py | 41 ++++ .../source_declarative_manifest/main.py | 44 +++- .../unit_tests/connector_builder/__init__.py | 3 + .../test_connector_builder_source.py | 205 ++++++++++++++++++ .../test_source_declarative_manifest.py | 128 +++++++---- 7 files changed, 404 insertions(+), 52 deletions(-) create mode 100644 airbyte-cdk/python/connector_builder/README.md create mode 100644 airbyte-cdk/python/connector_builder/__init__.py create mode 100644 airbyte-cdk/python/connector_builder/connector_builder_source.py create mode 100644 airbyte-cdk/python/unit_tests/connector_builder/__init__.py create mode 100644 airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py diff --git a/airbyte-cdk/python/connector_builder/README.md b/airbyte-cdk/python/connector_builder/README.md new file mode 100644 index 000000000000..6c444267e52f --- /dev/null +++ b/airbyte-cdk/python/connector_builder/README.md @@ -0,0 +1,32 @@ +# Connector Builder Backend + +This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/). + +## Local development + +### Locally running the Connector Builder backend + +``` +python main.py read --config secrets/config.json +``` + +Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderSource (`stream_read`, `list_streams`, or `resolve_manifest`). + +### Locally running the docker image + +#### Build + +First, make sure you build the latest Docker image: +``` +./gradlew airbyte-cdk:python:airbyteDocker +``` + +The docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in the Dockerfile. + +#### Run + +Then run any of the connector commands as follows: + +``` +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-declarative-manifest:dev read --config /secrets/config.json +``` diff --git a/airbyte-cdk/python/connector_builder/__init__.py b/airbyte-cdk/python/connector_builder/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/connector_builder/connector_builder_source.py b/airbyte-cdk/python/connector_builder/connector_builder_source.py new file mode 100644 index 000000000000..c530feb23a36 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/connector_builder_source.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from datetime import datetime +from typing import Any, Mapping, Union + +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +class ConnectorBuilderSource: + def __init__(self, source: ManifestDeclarativeSource): + self.source = source + + def list_streams(self) -> AirbyteRecordMessage: + raise NotImplementedError + + def stream_read(self, command_config) -> AirbyteRecordMessage: + raise NotImplementedError + + @staticmethod + def _emitted_at(): + return int(datetime.now().timestamp()) * 1000 + + def resolve_manifest(self) -> Union[AirbyteMessage, AirbyteRecordMessage]: + try: + return AirbyteRecordMessage( + data={"manifest": self.source.resolved_manifest}, + emitted_at=self._emitted_at(), + stream="", + ) + except Exception as exc: + error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") + return error.as_airbyte_message() + + def handle_request(self, config: Mapping[str, Any]) -> Union[AirbyteMessage, AirbyteRecordMessage]: + command = config.get("__command") + if command == "resolve_manifest": + return self.resolve_manifest() + raise ValueError(f"Unrecognized command {command}.") diff --git a/airbyte-cdk/python/source_declarative_manifest/main.py b/airbyte-cdk/python/source_declarative_manifest/main.py index 2c1bdcb2b782..6cf9f08f1050 100644 --- a/airbyte-cdk/python/source_declarative_manifest/main.py +++ b/airbyte-cdk/python/source_declarative_manifest/main.py @@ -3,27 +3,55 @@ # +import argparse import sys -from typing import List +from typing import Any, List, Mapping, Tuple from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch +from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from connector_builder.connector_builder_source import ConnectorBuilderSource -def create_manifest(args: List[str]): - parsed_args = AirbyteEntrypoint.parse_args(args) - if parsed_args.command == "spec": +def create_source(config: Mapping[str, Any]) -> DeclarativeSource: + manifest = config.get("__injected_declarative_manifest") + return ManifestDeclarativeSource(manifest) + + +def get_config_from_args(args: List[str]) -> Mapping[str, Any]: + command, config_filepath = preparse(args) + if command == "spec": raise ValueError("spec command is not supported for injected declarative manifest") - config = BaseConnector.read_config(parsed_args.config) + config = BaseConnector.read_config(config_filepath) + if "__injected_declarative_manifest" not in config: raise ValueError( f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" ) - return ManifestDeclarativeSource(config.get("__injected_declarative_manifest")) + + return config + + +def preparse(args: List[str]) -> Tuple[str, str]: + parser = argparse.ArgumentParser() + parser.add_argument("command", type=str, help="Airbyte Protocol command") + parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") + parsed, _ = parser.parse_known_args(args) + return parsed.command, parsed.config + + +def handle_request(args: List[str]): + config = get_config_from_args(args) + source = create_source(config) + if "__command" in config: + ConnectorBuilderSource(source).handle_request(config) + else: + # Verify that the correct args are present for the production codepaths. + AirbyteEntrypoint.parse_args(args) + launch(source, sys.argv[1:]) if __name__ == "__main__": - source = create_manifest(sys.argv[1:]) - launch(source, sys.argv[1:]) + handle_request(sys.argv[1:]) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/__init__.py b/airbyte-cdk/python/unit_tests/connector_builder/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/connector_builder/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py new file mode 100644 index 000000000000..c9315e979b52 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py @@ -0,0 +1,205 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import copy + +import pytest +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException +from connector_builder.connector_builder_source import ConnectorBuilderSource + +_stream_name = "stream_with_custom_requester" +_stream_primary_key = "id" +_stream_url_base = "https://api.sendgrid.com" +_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} + +MANIFEST = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": "10"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$parameters": _stream_options, + "schema_loader": {"$ref": "#/definitions/schema_loader"}, + "retriever": "#/definitions/retriever", + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, +} + + +CONFIG = { + "__injected_declarative_manifest": MANIFEST, + "__command": "resolve_manifest", +} + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("asdf", id="test_arbitrary_command_error"), + pytest.param(None, id="test_command_is_none_error"), + pytest.param("", id="test_command_is_empty_error"), + ], +) +def test_invalid_command(command): + source = ConnectorBuilderSource(ManifestDeclarativeSource(MANIFEST)) + config = copy.deepcopy(CONFIG) + config["__command"] = command + with pytest.raises(ValueError): + source.handle_request(config) + + +def test_resolve_manifest(): + source = ConnectorBuilderSource(ManifestDeclarativeSource(MANIFEST)) + resolved_manifest = source.handle_request(CONFIG) + + expected_resolved_manifest = { + "type": "DeclarativeSource", + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": "10"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "schema_loader": { + "type": "JsonFileSchemaLoader", + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "retriever": { + "type": "SimpleRetriever", + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "page_token_option": { + "type": "RequestPath", + "inject_into": "path", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "requester": { + "type": "HttpRequester", + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "request_parameters": {"page_size": "10"}, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["result"], + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + assert resolved_manifest.data["manifest"] == expected_resolved_manifest + + +def test_resolve_manifest_error_returns_error_response(): + class MockManifestDeclarativeSource: + @property + def resolved_manifest(self): + raise ValueError + + source = ConnectorBuilderSource(MockManifestDeclarativeSource()) + response = source.handle_request(CONFIG) + assert "Error resolving manifest" in response.trace.error.message + + +def test_resolve_manifest_cannot_instantiate_source(): + manifest = copy.deepcopy(MANIFEST) + manifest["streams"][0]["retriever"] = "#/definitions/retrieverasdf" + with pytest.raises(UndefinedReferenceException) as actual_exception: + ConnectorBuilderSource(ManifestDeclarativeSource(manifest)) + assert "Undefined reference #/definitions/retriever" in actual_exception.value.args[0] diff --git a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py index 63757dda03f5..e40374cfc1ac 100644 --- a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py +++ b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py @@ -4,44 +4,22 @@ import copy import json +from unittest import mock import pytest +import source_declarative_manifest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from source_declarative_manifest.main import create_manifest +from source_declarative_manifest.main import handle_request CONFIG = { "__injected_declarative_manifest": { "version": "0.1.0", "definitions": { - "selector": { - "extractor": { - "field_path": [] - } - }, - "requester": { - "url_base": "https://test.com/api", - "http_method": "GET" - }, - "retriever": { - "record_selector": { - "$ref": "#/definitions/selector" - }, - "requester": { - "$ref": "#/definitions/requester" - } - }, - "base_stream": { - "retriever": { - "$ref": "#/definitions/retriever" - } - }, - "data_stream": { - "$ref": "#/definitions/base_stream", - "$parameters": { - "name": "data", - "path": "/data" - } - }, + "selector": {"extractor": {"field_path": []}}, + "requester": {"url_base": "https://test.com/api", "http_method": "GET"}, + "retriever": {"record_selector": {"$ref": "#/definitions/selector"}, "requester": {"$ref": "#/definitions/requester"}}, + "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, + "data_stream": {"$ref": "#/definitions/base_stream", "$parameters": {"name": "data", "path": "/data"}}, }, "streams": [ "#/definitions/data_stream", @@ -59,40 +37,102 @@ "title": "Test Spec", "type": "object", "additionalProperties": True, - "properties": {} - } - } + "properties": {}, + }, + }, } } +CATALOG = {} + @pytest.fixture def valid_config_file(tmp_path): - config_file = tmp_path / "config.json" - config_file.write_text(json.dumps(CONFIG)) - return config_file + return _write_to_tmp_path(tmp_path, CONFIG, "config") + + +@pytest.fixture +def catalog_file(tmp_path): + return _write_to_tmp_path(tmp_path, CATALOG, "catalog") @pytest.fixture def config_file_without_injection(tmp_path): config = copy.deepcopy(CONFIG) del config["__injected_declarative_manifest"] + return _write_to_tmp_path(tmp_path, config, "config") - config_file = tmp_path / "config.json" + +@pytest.fixture +def config_file_with_command(tmp_path): + config = copy.deepcopy(CONFIG) + config["__command"] = "command" + return _write_to_tmp_path(tmp_path, config, "config") + + +def _write_to_tmp_path(tmp_path, config, filename): + config_file = tmp_path / f"{filename}.json" config_file.write_text(json.dumps(config)) return config_file -def test_on_spec_command_then_raise_value_error(): +def test_on_spec_command_then_raise_value_error(valid_config_file): with pytest.raises(ValueError): - create_manifest(["spec"]) + handle_request(["spec", "--config", str(valid_config_file)]) -def test_given_no_injected_declarative_manifest_then_raise_value_error(config_file_without_injection): +@pytest.mark.parametrize( + "command", + [ + pytest.param("check", id="test_check_command_error"), + pytest.param("discover", id="test_discover_command_error"), + pytest.param("read", id="test_read_command_error"), + pytest.param("asdf", id="test_arbitrary_command_error"), + ], +) +def test_given_no_injected_declarative_manifest_then_raise_value_error(command, config_file_without_injection): with pytest.raises(ValueError): - create_manifest(["check", "--config", str(config_file_without_injection)]) + handle_request([command, "--config", str(config_file_without_injection)]) + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("check", id="test_check_command_error"), + pytest.param("discover", id="test_discover_command_error"), + pytest.param("read", id="test_read_command_error"), + pytest.param("asdf", id="test_arbitrary_command_error"), + ], +) +def test_missing_config_raises_value_error(command): + with pytest.raises(SystemExit): + handle_request([command]) + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("check", id="test_check_command"), + pytest.param("discover", id="test_discover_command"), + pytest.param("read", id="test_read_command"), + ], +) +def test_given_injected_declarative_manifest_then_launch_with_declarative_manifest(command, valid_config_file, catalog_file): + with mock.patch("source_declarative_manifest.main.launch") as patch: + if command == "read": + handle_request([command, "--config", str(valid_config_file), "--catalog", str(catalog_file)]) + else: + handle_request([command, "--config", str(valid_config_file)]) + source, _ = patch.call_args[0] + assert isinstance(source, ManifestDeclarativeSource) + + +def test_given_injected_declarative_manifest_then_launch_with_declarative_manifest_missing_arg(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["read", "--config", str(valid_config_file)]) -def test_given_injected_declarative_manifest_then_return_declarative_manifest(valid_config_file): - source = create_manifest(["check", "--config", str(valid_config_file)]) - assert isinstance(source, ManifestDeclarativeSource) +def test_given_command_then_use_connector_builder_source(config_file_with_command): + with mock.patch.object(source_declarative_manifest.main.ConnectorBuilderSource, "handle_request") as patch: + handle_request(["read", "--config", str(config_file_with_command)]) + assert patch.call_count == 1 From 13a9a14fa3f4e7e3c196157d50b5e51737a3c2b7 Mon Sep 17 00:00:00 2001 From: clnoll Date: Wed, 8 Mar 2023 22:39:44 +0000 Subject: [PATCH 02/10] Automated Commit - Formatting Changes --- airbyte-cdk/python/connector_builder/connector_builder_source.py | 1 + .../connector_builder/test_connector_builder_source.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airbyte-cdk/python/connector_builder/connector_builder_source.py b/airbyte-cdk/python/connector_builder/connector_builder_source.py index c530feb23a36..8ca0bcc0492d 100644 --- a/airbyte-cdk/python/connector_builder/connector_builder_source.py +++ b/airbyte-cdk/python/connector_builder/connector_builder_source.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + from datetime import datetime from typing import Any, Mapping, Union diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py index c9315e979b52..dd84547ca439 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + import copy import pytest From 1c85330f17bdb24e472c74b664c6aafd9f5c1b46 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 9 Mar 2023 12:25:18 +0000 Subject: [PATCH 03/10] Rename ConnectorBuilderSource to ConnectorBuilderHandler --- airbyte-cdk/python/connector_builder/README.md | 2 +- ...er_source.py => connector_builder_handler.py} | 2 +- .../python/source_declarative_manifest/main.py | 4 ++-- ...urce.py => test_connector_builder_handler.py} | 16 ++++++++-------- .../test_source_declarative_manifest.py | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) rename airbyte-cdk/python/connector_builder/{connector_builder_source.py => connector_builder_handler.py} (97%) rename airbyte-cdk/python/unit_tests/connector_builder/{test_connector_builder_source.py => test_connector_builder_handler.py} (94%) diff --git a/airbyte-cdk/python/connector_builder/README.md b/airbyte-cdk/python/connector_builder/README.md index 6c444267e52f..ac2db315bc3e 100644 --- a/airbyte-cdk/python/connector_builder/README.md +++ b/airbyte-cdk/python/connector_builder/README.md @@ -10,7 +10,7 @@ This is the backend for requests from the [Connector Builder](https://docs.airby python main.py read --config secrets/config.json ``` -Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderSource (`stream_read`, `list_streams`, or `resolve_manifest`). +Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). ### Locally running the docker image diff --git a/airbyte-cdk/python/connector_builder/connector_builder_source.py b/airbyte-cdk/python/connector_builder/connector_builder_handler.py similarity index 97% rename from airbyte-cdk/python/connector_builder/connector_builder_source.py rename to airbyte-cdk/python/connector_builder/connector_builder_handler.py index 8ca0bcc0492d..4a2f5953b3eb 100644 --- a/airbyte-cdk/python/connector_builder/connector_builder_source.py +++ b/airbyte-cdk/python/connector_builder/connector_builder_handler.py @@ -10,7 +10,7 @@ from airbyte_cdk.utils.traced_exception import AirbyteTracedException -class ConnectorBuilderSource: +class ConnectorBuilderHandler: def __init__(self, source: ManifestDeclarativeSource): self.source = source diff --git a/airbyte-cdk/python/source_declarative_manifest/main.py b/airbyte-cdk/python/source_declarative_manifest/main.py index 6cf9f08f1050..890fc37931a5 100644 --- a/airbyte-cdk/python/source_declarative_manifest/main.py +++ b/airbyte-cdk/python/source_declarative_manifest/main.py @@ -11,7 +11,7 @@ from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from connector_builder.connector_builder_source import ConnectorBuilderSource +from connector_builder.connector_builder_handler import ConnectorBuilderHandler def create_source(config: Mapping[str, Any]) -> DeclarativeSource: @@ -46,7 +46,7 @@ def handle_request(args: List[str]): config = get_config_from_args(args) source = create_source(config) if "__command" in config: - ConnectorBuilderSource(source).handle_request(config) + ConnectorBuilderHandler(source).handle_request(config) else: # Verify that the correct args are present for the production codepaths. AirbyteEntrypoint.parse_args(args) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py similarity index 94% rename from airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py rename to airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index dd84547ca439..e2713d6068c5 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_source.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -7,7 +7,7 @@ import pytest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException -from connector_builder.connector_builder_source import ConnectorBuilderSource +from connector_builder.connector_builder_handler import ConnectorBuilderHandler _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" @@ -61,16 +61,16 @@ ], ) def test_invalid_command(command): - source = ConnectorBuilderSource(ManifestDeclarativeSource(MANIFEST)) + handler = ConnectorBuilderHandler(ManifestDeclarativeSource(MANIFEST)) config = copy.deepcopy(CONFIG) config["__command"] = command with pytest.raises(ValueError): - source.handle_request(config) + handler.handle_request(config) def test_resolve_manifest(): - source = ConnectorBuilderSource(ManifestDeclarativeSource(MANIFEST)) - resolved_manifest = source.handle_request(CONFIG) + handler = ConnectorBuilderHandler(ManifestDeclarativeSource(MANIFEST)) + resolved_manifest = handler.handle_request(CONFIG) expected_resolved_manifest = { "type": "DeclarativeSource", @@ -193,8 +193,8 @@ class MockManifestDeclarativeSource: def resolved_manifest(self): raise ValueError - source = ConnectorBuilderSource(MockManifestDeclarativeSource()) - response = source.handle_request(CONFIG) + handler = ConnectorBuilderHandler(MockManifestDeclarativeSource()) + response = handler.handle_request(CONFIG) assert "Error resolving manifest" in response.trace.error.message @@ -202,5 +202,5 @@ def test_resolve_manifest_cannot_instantiate_source(): manifest = copy.deepcopy(MANIFEST) manifest["streams"][0]["retriever"] = "#/definitions/retrieverasdf" with pytest.raises(UndefinedReferenceException) as actual_exception: - ConnectorBuilderSource(ManifestDeclarativeSource(manifest)) + ConnectorBuilderHandler(ManifestDeclarativeSource(manifest)) assert "Undefined reference #/definitions/retriever" in actual_exception.value.args[0] diff --git a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py index e40374cfc1ac..788495dd04fb 100644 --- a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py +++ b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py @@ -132,7 +132,7 @@ def test_given_injected_declarative_manifest_then_launch_with_declarative_manife handle_request(["read", "--config", str(valid_config_file)]) -def test_given_command_then_use_connector_builder_source(config_file_with_command): - with mock.patch.object(source_declarative_manifest.main.ConnectorBuilderSource, "handle_request") as patch: +def test_given_command_then_use_connector_builder_handler(config_file_with_command): + with mock.patch.object(source_declarative_manifest.main.ConnectorBuilderHandler, "handle_request") as patch: handle_request(["read", "--config", str(config_file_with_command)]) assert patch.call_count == 1 From 0dccb4ee018ec0d6d6a32acf2bf8e6b1087b479e Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 9 Mar 2023 12:34:55 +0000 Subject: [PATCH 04/10] Update source_declarative_manifest README --- airbyte-cdk/python/source_declarative_manifest/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-cdk/python/source_declarative_manifest/README.md b/airbyte-cdk/python/source_declarative_manifest/README.md index 7a723a4b6d33..665e03e8fda1 100644 --- a/airbyte-cdk/python/source_declarative_manifest/README.md +++ b/airbyte-cdk/python/source_declarative_manifest/README.md @@ -7,6 +7,8 @@ This entrypoint is used for connectors created by the connector builder. These c The spec operation is not supported because the config is not known when running a spec. +This entrypoint is also the entrypoint for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/) Server. In addition to the `__injected_declarative_manifest`, the Connector Builder backend config requires the `__command` key, whose value is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). + ## Local development #### Building From f7a475ab03bc4b12034c7432032662d20ecde37c Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 9 Mar 2023 16:19:29 +0000 Subject: [PATCH 05/10] Reorganize --- .../connector_builder_handler.py | 55 ++++++++----------- .../source_declarative_manifest/main.py | 21 +++++-- .../test_connector_builder_handler.py | 38 ++----------- .../test_source_declarative_manifest.py | 20 ++++++- 4 files changed, 62 insertions(+), 72 deletions(-) diff --git a/airbyte-cdk/python/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/connector_builder/connector_builder_handler.py index 4a2f5953b3eb..a1d8f6a6f647 100644 --- a/airbyte-cdk/python/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/connector_builder/connector_builder_handler.py @@ -3,40 +3,31 @@ # from datetime import datetime -from typing import Any, Mapping, Union +from typing import Union from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage -from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.utils.traced_exception import AirbyteTracedException -class ConnectorBuilderHandler: - def __init__(self, source: ManifestDeclarativeSource): - self.source = source - - def list_streams(self) -> AirbyteRecordMessage: - raise NotImplementedError - - def stream_read(self, command_config) -> AirbyteRecordMessage: - raise NotImplementedError - - @staticmethod - def _emitted_at(): - return int(datetime.now().timestamp()) * 1000 - - def resolve_manifest(self) -> Union[AirbyteMessage, AirbyteRecordMessage]: - try: - return AirbyteRecordMessage( - data={"manifest": self.source.resolved_manifest}, - emitted_at=self._emitted_at(), - stream="", - ) - except Exception as exc: - error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") - return error.as_airbyte_message() - - def handle_request(self, config: Mapping[str, Any]) -> Union[AirbyteMessage, AirbyteRecordMessage]: - command = config.get("__command") - if command == "resolve_manifest": - return self.resolve_manifest() - raise ValueError(f"Unrecognized command {command}.") +def list_streams() -> AirbyteRecordMessage: + raise NotImplementedError + + +def stream_read() -> AirbyteRecordMessage: + raise NotImplementedError + + +def resolve_manifest(source) -> Union[AirbyteMessage, AirbyteRecordMessage]: + try: + return AirbyteRecordMessage( + data={"manifest": source.resolved_manifest}, + emitted_at=_emitted_at(), + stream="", + ) + except Exception as exc: + error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") + return error.as_airbyte_message() + + +def _emitted_at(): + return int(datetime.now().timestamp()) * 1000 diff --git a/airbyte-cdk/python/source_declarative_manifest/main.py b/airbyte-cdk/python/source_declarative_manifest/main.py index 890fc37931a5..2d66c822477d 100644 --- a/airbyte-cdk/python/source_declarative_manifest/main.py +++ b/airbyte-cdk/python/source_declarative_manifest/main.py @@ -11,7 +11,7 @@ from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from connector_builder.connector_builder_handler import ConnectorBuilderHandler +from connector_builder import connector_builder_handler def create_source(config: Mapping[str, Any]) -> DeclarativeSource: @@ -42,15 +42,26 @@ def preparse(args: List[str]) -> Tuple[str, str]: return parsed.command, parsed.config +def handle_connector_builder_request(source: DeclarativeSource, config: Mapping[str, Any]): + command = config.get("__command") + if command == "resolve_manifest": + return connector_builder_handler.resolve_manifest(source) + raise ValueError(f"Unrecognized command {command}.") + + +def handle_connector_request(source: DeclarativeSource, args: List[str]): + # Verify that the correct args are present for the production codepaths. + AirbyteEntrypoint.parse_args(args) + launch(source, sys.argv[1:]) + + def handle_request(args: List[str]): config = get_config_from_args(args) source = create_source(config) if "__command" in config: - ConnectorBuilderHandler(source).handle_request(config) + handle_connector_builder_request(source, config) else: - # Verify that the correct args are present for the production codepaths. - AirbyteEntrypoint.parse_args(args) - launch(source, sys.argv[1:]) + handle_connector_request(source, args) if __name__ == "__main__": diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index e2713d6068c5..26e652df0faf 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -2,12 +2,8 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import copy - -import pytest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException -from connector_builder.connector_builder_handler import ConnectorBuilderHandler +from connector_builder.connector_builder_handler import resolve_manifest _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" @@ -52,25 +48,9 @@ } -@pytest.mark.parametrize( - "command", - [ - pytest.param("asdf", id="test_arbitrary_command_error"), - pytest.param(None, id="test_command_is_none_error"), - pytest.param("", id="test_command_is_empty_error"), - ], -) -def test_invalid_command(command): - handler = ConnectorBuilderHandler(ManifestDeclarativeSource(MANIFEST)) - config = copy.deepcopy(CONFIG) - config["__command"] = command - with pytest.raises(ValueError): - handler.handle_request(config) - - def test_resolve_manifest(): - handler = ConnectorBuilderHandler(ManifestDeclarativeSource(MANIFEST)) - resolved_manifest = handler.handle_request(CONFIG) + source = ManifestDeclarativeSource(MANIFEST) + resolved_manifest = resolve_manifest(source) expected_resolved_manifest = { "type": "DeclarativeSource", @@ -193,14 +173,6 @@ class MockManifestDeclarativeSource: def resolved_manifest(self): raise ValueError - handler = ConnectorBuilderHandler(MockManifestDeclarativeSource()) - response = handler.handle_request(CONFIG) + source = MockManifestDeclarativeSource() + response = resolve_manifest(source) assert "Error resolving manifest" in response.trace.error.message - - -def test_resolve_manifest_cannot_instantiate_source(): - manifest = copy.deepcopy(MANIFEST) - manifest["streams"][0]["retriever"] = "#/definitions/retrieverasdf" - with pytest.raises(UndefinedReferenceException) as actual_exception: - ConnectorBuilderHandler(ManifestDeclarativeSource(manifest)) - assert "Undefined reference #/definitions/retriever" in actual_exception.value.args[0] diff --git a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py index 788495dd04fb..7b6d75f42f28 100644 --- a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py +++ b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py @@ -9,7 +9,7 @@ import pytest import source_declarative_manifest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from source_declarative_manifest.main import handle_request +from source_declarative_manifest.main import handle_connector_builder_request, handle_request CONFIG = { "__injected_declarative_manifest": { @@ -133,6 +133,22 @@ def test_given_injected_declarative_manifest_then_launch_with_declarative_manife def test_given_command_then_use_connector_builder_handler(config_file_with_command): - with mock.patch.object(source_declarative_manifest.main.ConnectorBuilderHandler, "handle_request") as patch: + with mock.patch.object(source_declarative_manifest.main, "handle_connector_builder_request") as patch: handle_request(["read", "--config", str(config_file_with_command)]) assert patch.call_count == 1 + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("asdf", id="test_arbitrary_command_error"), + pytest.param(None, id="test_command_is_none_error"), + pytest.param("", id="test_command_is_empty_error"), + ], +) +def test_invalid_command(command): + config = copy.deepcopy(CONFIG) + config["__command"] = command + source = ManifestDeclarativeSource(CONFIG["__injected_declarative_manifest"]) + with pytest.raises(ValueError): + handle_connector_builder_request(source, config) From a7911f511b4b4919e5af68deccdd584e5a2ac862 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Thu, 9 Mar 2023 23:56:38 +0000 Subject: [PATCH 06/10] CR improvements --- .../connector_builder_handler.py | 21 +++++++++++-------- .../source_declarative_manifest/README.md | 2 +- .../source_declarative_manifest/main.py | 9 ++++---- .../test_connector_builder_handler.py | 3 ++- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/airbyte-cdk/python/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/connector_builder/connector_builder_handler.py index a1d8f6a6f647..a3d3c65a8641 100644 --- a/airbyte-cdk/python/connector_builder/connector_builder_handler.py +++ b/airbyte-cdk/python/connector_builder/connector_builder_handler.py @@ -3,26 +3,29 @@ # from datetime import datetime -from typing import Union -from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.utils.traced_exception import AirbyteTracedException -def list_streams() -> AirbyteRecordMessage: +def list_streams() -> AirbyteMessage: raise NotImplementedError -def stream_read() -> AirbyteRecordMessage: +def stream_read() -> AirbyteMessage: raise NotImplementedError -def resolve_manifest(source) -> Union[AirbyteMessage, AirbyteRecordMessage]: +def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: try: - return AirbyteRecordMessage( - data={"manifest": source.resolved_manifest}, - emitted_at=_emitted_at(), - stream="", + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + data={"manifest": source.resolved_manifest}, + emitted_at=_emitted_at(), + stream="resolve_manifest", + ), ) except Exception as exc: error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") diff --git a/airbyte-cdk/python/source_declarative_manifest/README.md b/airbyte-cdk/python/source_declarative_manifest/README.md index 665e03e8fda1..0ed9a0443476 100644 --- a/airbyte-cdk/python/source_declarative_manifest/README.md +++ b/airbyte-cdk/python/source_declarative_manifest/README.md @@ -7,7 +7,7 @@ This entrypoint is used for connectors created by the connector builder. These c The spec operation is not supported because the config is not known when running a spec. -This entrypoint is also the entrypoint for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/) Server. In addition to the `__injected_declarative_manifest`, the Connector Builder backend config requires the `__command` key, whose value is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). +This entrypoint is also the entrypoint for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/) Server. In addition to the `__injected_declarative_manifest`, the [Connector Builder backend](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/connector_builder/README.md) config requires the `__command` key, whose value is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). ## Local development diff --git a/airbyte-cdk/python/source_declarative_manifest/main.py b/airbyte-cdk/python/source_declarative_manifest/main.py index 2d66c822477d..369743eda454 100644 --- a/airbyte-cdk/python/source_declarative_manifest/main.py +++ b/airbyte-cdk/python/source_declarative_manifest/main.py @@ -9,12 +9,11 @@ from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch -from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from connector_builder import connector_builder_handler -def create_source(config: Mapping[str, Any]) -> DeclarativeSource: +def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: manifest = config.get("__injected_declarative_manifest") return ManifestDeclarativeSource(manifest) @@ -42,14 +41,14 @@ def preparse(args: List[str]) -> Tuple[str, str]: return parsed.command, parsed.config -def handle_connector_builder_request(source: DeclarativeSource, config: Mapping[str, Any]): +def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): command = config.get("__command") if command == "resolve_manifest": return connector_builder_handler.resolve_manifest(source) raise ValueError(f"Unrecognized command {command}.") -def handle_connector_request(source: DeclarativeSource, args: List[str]): +def handle_connector_request(source: ManifestDeclarativeSource, args: List[str]): # Verify that the correct args are present for the production codepaths. AirbyteEntrypoint.parse_args(args) launch(source, sys.argv[1:]) @@ -59,7 +58,7 @@ def handle_request(args: List[str]): config = get_config_from_args(args) source = create_source(config) if "__command" in config: - handle_connector_builder_request(source, config) + print(handle_connector_builder_request(source, config)) else: handle_connector_request(source, args) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 26e652df0faf..a0ca267dd45f 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -164,7 +164,8 @@ def test_resolve_manifest(): ], "check": {"type": "CheckStream", "stream_names": ["lists"]}, } - assert resolved_manifest.data["manifest"] == expected_resolved_manifest + assert resolved_manifest.record.data["manifest"] == expected_resolved_manifest + assert resolved_manifest.record.stream == "resolve_manifest" def test_resolve_manifest_error_returns_error_response(): From aea625ee459d5e256c121cfbf746faa97ed18d0c Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Mon, 13 Mar 2023 20:40:33 +0000 Subject: [PATCH 07/10] Give connector_builder its own main.py --- airbyte-cdk/python/connector_builder/main.py | 67 +++++++++++ .../source_declarative_manifest/main.py | 54 ++------- .../test_connector_builder_handler.py | 20 ++++ .../test_source_declarative_manifest.py | 104 ++---------------- 4 files changed, 107 insertions(+), 138 deletions(-) create mode 100644 airbyte-cdk/python/connector_builder/main.py diff --git a/airbyte-cdk/python/connector_builder/main.py b/airbyte-cdk/python/connector_builder/main.py new file mode 100644 index 000000000000..0bff4b6781e8 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/main.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import argparse +import sys +from typing import Any, List, Mapping, Tuple + +from airbyte_cdk.connector import BaseConnector +from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from connector_builder.connector_builder_handler import resolve_manifest + + +def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: + manifest = config.get("__injected_declarative_manifest") + return ManifestDeclarativeSource(manifest) + + +def get_config_from_args(args: List[str]) -> Mapping[str, Any]: + command, config_filepath = preparse(args) + if command == "spec": + raise ValueError("spec command is not supported for injected declarative manifest") + + config = BaseConnector.read_config(config_filepath) + + if "__injected_declarative_manifest" not in config: + raise ValueError( + f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" + ) + + return config + + +def preparse(args: List[str]) -> Tuple[str, str]: + parser = argparse.ArgumentParser() + parser.add_argument("command", type=str, help="Airbyte Protocol command") + parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") + parsed, _ = parser.parse_known_args(args) + return parsed.command, parsed.config + + +def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): + command = config.get("__command") + if command == "resolve_manifest": + return resolve_manifest(source) + raise ValueError(f"Unrecognized command {command}.") + + +def handle_connector_request(source: ManifestDeclarativeSource, args: List[str]): + # Verify that the correct args are present for the production codepaths. + AirbyteEntrypoint.parse_args(args) + launch(source, sys.argv[1:]) + + +def handle_request(args: List[str]): + config = get_config_from_args(args) + source = create_source(config) + if "__command" in config: + print(handle_connector_builder_request(source, config)) + else: + handle_connector_request(source, args) + + +if __name__ == "__main__": + handle_request(sys.argv[1:]) diff --git a/airbyte-cdk/python/source_declarative_manifest/main.py b/airbyte-cdk/python/source_declarative_manifest/main.py index 369743eda454..2c1bdcb2b782 100644 --- a/airbyte-cdk/python/source_declarative_manifest/main.py +++ b/airbyte-cdk/python/source_declarative_manifest/main.py @@ -3,65 +3,27 @@ # -import argparse import sys -from typing import Any, List, Mapping, Tuple +from typing import List from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from connector_builder import connector_builder_handler -def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: - manifest = config.get("__injected_declarative_manifest") - return ManifestDeclarativeSource(manifest) - - -def get_config_from_args(args: List[str]) -> Mapping[str, Any]: - command, config_filepath = preparse(args) - if command == "spec": +def create_manifest(args: List[str]): + parsed_args = AirbyteEntrypoint.parse_args(args) + if parsed_args.command == "spec": raise ValueError("spec command is not supported for injected declarative manifest") - config = BaseConnector.read_config(config_filepath) - + config = BaseConnector.read_config(parsed_args.config) if "__injected_declarative_manifest" not in config: raise ValueError( f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" ) - - return config - - -def preparse(args: List[str]) -> Tuple[str, str]: - parser = argparse.ArgumentParser() - parser.add_argument("command", type=str, help="Airbyte Protocol command") - parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") - parsed, _ = parser.parse_known_args(args) - return parsed.command, parsed.config - - -def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): - command = config.get("__command") - if command == "resolve_manifest": - return connector_builder_handler.resolve_manifest(source) - raise ValueError(f"Unrecognized command {command}.") - - -def handle_connector_request(source: ManifestDeclarativeSource, args: List[str]): - # Verify that the correct args are present for the production codepaths. - AirbyteEntrypoint.parse_args(args) - launch(source, sys.argv[1:]) - - -def handle_request(args: List[str]): - config = get_config_from_args(args) - source = create_source(config) - if "__command" in config: - print(handle_connector_builder_request(source, config)) - else: - handle_connector_request(source, args) + return ManifestDeclarativeSource(config.get("__injected_declarative_manifest")) if __name__ == "__main__": - handle_request(sys.argv[1:]) + source = create_manifest(sys.argv[1:]) + launch(source, sys.argv[1:]) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index a0ca267dd45f..551aa7eac51a 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -2,8 +2,12 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import copy + +import pytest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from connector_builder.connector_builder_handler import resolve_manifest +from connector_builder.main import handle_connector_builder_request _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" @@ -177,3 +181,19 @@ def resolved_manifest(self): source = MockManifestDeclarativeSource() response = resolve_manifest(source) assert "Error resolving manifest" in response.trace.error.message + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("asdf", id="test_arbitrary_command_error"), + pytest.param(None, id="test_command_is_none_error"), + pytest.param("", id="test_command_is_empty_error"), + ], +) +def test_invalid_command(command): + config = copy.deepcopy(CONFIG) + config["__command"] = command + source = ManifestDeclarativeSource(CONFIG["__injected_declarative_manifest"]) + with pytest.raises(ValueError): + handle_connector_builder_request(source, config) diff --git a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py index 7b6d75f42f28..aa762395f4dc 100644 --- a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py +++ b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py @@ -4,12 +4,10 @@ import copy import json -from unittest import mock import pytest -import source_declarative_manifest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from source_declarative_manifest.main import handle_connector_builder_request, handle_request +from source_declarative_manifest.main import create_manifest CONFIG = { "__injected_declarative_manifest": { @@ -43,112 +41,34 @@ } } -CATALOG = {} - @pytest.fixture def valid_config_file(tmp_path): - return _write_to_tmp_path(tmp_path, CONFIG, "config") - - -@pytest.fixture -def catalog_file(tmp_path): - return _write_to_tmp_path(tmp_path, CATALOG, "catalog") + config_file = tmp_path / "config.json" + config_file.write_text(json.dumps(CONFIG)) + return config_file @pytest.fixture def config_file_without_injection(tmp_path): config = copy.deepcopy(CONFIG) del config["__injected_declarative_manifest"] - return _write_to_tmp_path(tmp_path, config, "config") - - -@pytest.fixture -def config_file_with_command(tmp_path): - config = copy.deepcopy(CONFIG) - config["__command"] = "command" - return _write_to_tmp_path(tmp_path, config, "config") - -def _write_to_tmp_path(tmp_path, config, filename): - config_file = tmp_path / f"{filename}.json" + config_file = tmp_path / "config.json" config_file.write_text(json.dumps(config)) return config_file -def test_on_spec_command_then_raise_value_error(valid_config_file): +def test_on_spec_command_then_raise_value_error(): with pytest.raises(ValueError): - handle_request(["spec", "--config", str(valid_config_file)]) + create_manifest(["spec"]) -@pytest.mark.parametrize( - "command", - [ - pytest.param("check", id="test_check_command_error"), - pytest.param("discover", id="test_discover_command_error"), - pytest.param("read", id="test_read_command_error"), - pytest.param("asdf", id="test_arbitrary_command_error"), - ], -) -def test_given_no_injected_declarative_manifest_then_raise_value_error(command, config_file_without_injection): +def test_given_no_injected_declarative_manifest_then_raise_value_error(config_file_without_injection): with pytest.raises(ValueError): - handle_request([command, "--config", str(config_file_without_injection)]) + create_manifest(["check", "--config", str(config_file_without_injection)]) -@pytest.mark.parametrize( - "command", - [ - pytest.param("check", id="test_check_command_error"), - pytest.param("discover", id="test_discover_command_error"), - pytest.param("read", id="test_read_command_error"), - pytest.param("asdf", id="test_arbitrary_command_error"), - ], -) -def test_missing_config_raises_value_error(command): - with pytest.raises(SystemExit): - handle_request([command]) - - -@pytest.mark.parametrize( - "command", - [ - pytest.param("check", id="test_check_command"), - pytest.param("discover", id="test_discover_command"), - pytest.param("read", id="test_read_command"), - ], -) -def test_given_injected_declarative_manifest_then_launch_with_declarative_manifest(command, valid_config_file, catalog_file): - with mock.patch("source_declarative_manifest.main.launch") as patch: - if command == "read": - handle_request([command, "--config", str(valid_config_file), "--catalog", str(catalog_file)]) - else: - handle_request([command, "--config", str(valid_config_file)]) - source, _ = patch.call_args[0] - assert isinstance(source, ManifestDeclarativeSource) - - -def test_given_injected_declarative_manifest_then_launch_with_declarative_manifest_missing_arg(valid_config_file): - with pytest.raises(SystemExit): - handle_request(["read", "--config", str(valid_config_file)]) - - -def test_given_command_then_use_connector_builder_handler(config_file_with_command): - with mock.patch.object(source_declarative_manifest.main, "handle_connector_builder_request") as patch: - handle_request(["read", "--config", str(config_file_with_command)]) - assert patch.call_count == 1 - - -@pytest.mark.parametrize( - "command", - [ - pytest.param("asdf", id="test_arbitrary_command_error"), - pytest.param(None, id="test_command_is_none_error"), - pytest.param("", id="test_command_is_empty_error"), - ], -) -def test_invalid_command(command): - config = copy.deepcopy(CONFIG) - config["__command"] = command - source = ManifestDeclarativeSource(CONFIG["__injected_declarative_manifest"]) - with pytest.raises(ValueError): - handle_connector_builder_request(source, config) +def test_given_injected_declarative_manifest_then_return_declarative_manifest(valid_config_file): + source = create_manifest(["check", "--config", str(valid_config_file)]) + assert isinstance(source, ManifestDeclarativeSource) From d38a76089c41b2397ff426134f74e99b2b06e527 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Mon, 13 Mar 2023 23:35:13 +0000 Subject: [PATCH 08/10] Add --catalog argument --- airbyte-cdk/python/connector_builder/main.py | 14 ++-- .../test_connector_builder_handler.py | 64 ++++++++++++++++--- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/airbyte-cdk/python/connector_builder/main.py b/airbyte-cdk/python/connector_builder/main.py index 0bff4b6781e8..aeeaf759df77 100644 --- a/airbyte-cdk/python/connector_builder/main.py +++ b/airbyte-cdk/python/connector_builder/main.py @@ -8,7 +8,6 @@ from typing import Any, List, Mapping, Tuple from airbyte_cdk.connector import BaseConnector -from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from connector_builder.connector_builder_handler import resolve_manifest @@ -20,8 +19,8 @@ def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: def get_config_from_args(args: List[str]) -> Mapping[str, Any]: command, config_filepath = preparse(args) - if command == "spec": - raise ValueError("spec command is not supported for injected declarative manifest") + if command != "read": + raise ValueError("Only read commands are allowed for Connector Builder requests.") config = BaseConnector.read_config(config_filepath) @@ -37,6 +36,7 @@ def preparse(args: List[str]) -> Tuple[str, str]: parser = argparse.ArgumentParser() parser.add_argument("command", type=str, help="Airbyte Protocol command") parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") + parser.add_argument("--catalog", type=str, required=True, help="path to the catalog file, if it exists (otherwise empty string)") parsed, _ = parser.parse_known_args(args) return parsed.command, parsed.config @@ -48,19 +48,13 @@ def handle_connector_builder_request(source: ManifestDeclarativeSource, config: raise ValueError(f"Unrecognized command {command}.") -def handle_connector_request(source: ManifestDeclarativeSource, args: List[str]): - # Verify that the correct args are present for the production codepaths. - AirbyteEntrypoint.parse_args(args) - launch(source, sys.argv[1:]) - - def handle_request(args: List[str]): config = get_config_from_args(args) source = create_source(config) if "__command" in config: print(handle_connector_builder_request(source, config)) else: - handle_connector_request(source, args) + raise ValueError("Missing __command argument in config file.") if __name__ == "__main__": diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index 551aa7eac51a..dfb2b4ec7c3c 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -3,11 +3,14 @@ # import copy +import json +from unittest import mock +import connector_builder import pytest from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from connector_builder.connector_builder_handler import resolve_manifest -from connector_builder.main import handle_connector_builder_request +from connector_builder.main import handle_connector_builder_request, handle_request _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" @@ -52,9 +55,33 @@ } -def test_resolve_manifest(): +@pytest.fixture +def valid_config_file(tmp_path): + config_file = tmp_path / "config.json" + config_file.write_text(json.dumps(CONFIG)) + return config_file + + +@pytest.fixture +def invalid_config_file(tmp_path): + invalid_config = copy.deepcopy(CONFIG) + invalid_config["__command"] = "bad_command" + config_file = tmp_path / "config.json" + config_file.write_text(json.dumps(invalid_config)) + return config_file + + +def test_handle_resolve_manifest(valid_config_file): + with mock.patch.object(connector_builder.main, "handle_connector_builder_request") as patch: + handle_request(["read", "--config", str(valid_config_file), "--catalog", ""]) + assert patch.call_count == 1 + + +def test_resolve_manifest(valid_config_file): + config = copy.deepcopy(CONFIG) + config["__command"] = "resolve_manifest" source = ManifestDeclarativeSource(MANIFEST) - resolved_manifest = resolve_manifest(source) + resolved_manifest = handle_connector_builder_request(source, config) expected_resolved_manifest = { "type": "DeclarativeSource", @@ -186,14 +213,35 @@ def resolved_manifest(self): @pytest.mark.parametrize( "command", [ - pytest.param("asdf", id="test_arbitrary_command_error"), + pytest.param("check", id="test_check_command_error"), + pytest.param("spec", id="test_spec_command_error"), + pytest.param("discover", id="test_discover_command_error"), pytest.param(None, id="test_command_is_none_error"), pytest.param("", id="test_command_is_empty_error"), ], ) -def test_invalid_command(command): +def test_invalid_protocol_command(command, valid_config_file): config = copy.deepcopy(CONFIG) - config["__command"] = command - source = ManifestDeclarativeSource(CONFIG["__injected_declarative_manifest"]) + config["__command"] = "list_streams" + with pytest.raises(ValueError): + handle_request([command, "--config", str(valid_config_file), "--catalog", ""]) + + +def test_missing_command(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["--config", str(valid_config_file), "--catalog", ""]) + + +def test_missing_catalog(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["read", "--config", str(valid_config_file)]) + + +def test_missing_config(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["read", "--catalog", str(valid_config_file)]) + + +def test_invalid_config_command(invalid_config_file): with pytest.raises(ValueError): - handle_connector_builder_request(source, config) + handle_request(["read", "--config", str(invalid_config_file), "--catalog", ""]) From d64cf5dba8ef6736c8cc684f38efdb571ae1823e Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Tue, 14 Mar 2023 11:14:29 +0000 Subject: [PATCH 09/10] Remove unneeded preparse --- airbyte-cdk/python/connector_builder/main.py | 19 +++++-------------- .../test_connector_builder_handler.py | 2 +- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/airbyte-cdk/python/connector_builder/main.py b/airbyte-cdk/python/connector_builder/main.py index aeeaf759df77..15ac5dcb8aa9 100644 --- a/airbyte-cdk/python/connector_builder/main.py +++ b/airbyte-cdk/python/connector_builder/main.py @@ -3,11 +3,11 @@ # -import argparse import sys -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping from airbyte_cdk.connector import BaseConnector +from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from connector_builder.connector_builder_handler import resolve_manifest @@ -18,11 +18,11 @@ def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: def get_config_from_args(args: List[str]) -> Mapping[str, Any]: - command, config_filepath = preparse(args) - if command != "read": + parsed_args = AirbyteEntrypoint.parse_args(args) + if parsed_args.command != "read": raise ValueError("Only read commands are allowed for Connector Builder requests.") - config = BaseConnector.read_config(config_filepath) + config = BaseConnector.read_config(parsed_args.config) if "__injected_declarative_manifest" not in config: raise ValueError( @@ -32,15 +32,6 @@ def get_config_from_args(args: List[str]) -> Mapping[str, Any]: return config -def preparse(args: List[str]) -> Tuple[str, str]: - parser = argparse.ArgumentParser() - parser.add_argument("command", type=str, help="Airbyte Protocol command") - parser.add_argument("--config", type=str, required=True, help="path to the json configuration file") - parser.add_argument("--catalog", type=str, required=True, help="path to the catalog file, if it exists (otherwise empty string)") - parsed, _ = parser.parse_known_args(args) - return parsed.command, parsed.config - - def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): command = config.get("__command") if command == "resolve_manifest": diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py index dfb2b4ec7c3c..a2f2c81d89fa 100644 --- a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -223,7 +223,7 @@ def resolved_manifest(self): def test_invalid_protocol_command(command, valid_config_file): config = copy.deepcopy(CONFIG) config["__command"] = "list_streams" - with pytest.raises(ValueError): + with pytest.raises(SystemExit): handle_request([command, "--config", str(valid_config_file), "--catalog", ""]) From 873263980d4cb8abe2cbc6345eda9fe226ab45c7 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Tue, 14 Mar 2023 11:30:30 +0000 Subject: [PATCH 10/10] Update README --- airbyte-cdk/python/connector_builder/README.md | 15 +++++++++++++-- .../python/source_declarative_manifest/README.md | 2 -- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/connector_builder/README.md b/airbyte-cdk/python/connector_builder/README.md index ac2db315bc3e..f56a422b0e8a 100644 --- a/airbyte-cdk/python/connector_builder/README.md +++ b/airbyte-cdk/python/connector_builder/README.md @@ -7,10 +7,21 @@ This is the backend for requests from the [Connector Builder](https://docs.airby ### Locally running the Connector Builder backend ``` -python main.py read --config secrets/config.json +python main.py read --config path/to/config --catalog path/to/catalog ``` -Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). +Note: +- Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`), i.e. +``` +{ + "config": , + "__injected_declarative_manifest": {...}, + "__command": <"resolve_manifest" | "list_streams" | "stream_read"> +} +``` +*See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed. + +- When the `__command` is `list_streams` or `resolve_manifest`, the argument to `catalog` should be an empty string. ### Locally running the docker image diff --git a/airbyte-cdk/python/source_declarative_manifest/README.md b/airbyte-cdk/python/source_declarative_manifest/README.md index 0ed9a0443476..7a723a4b6d33 100644 --- a/airbyte-cdk/python/source_declarative_manifest/README.md +++ b/airbyte-cdk/python/source_declarative_manifest/README.md @@ -7,8 +7,6 @@ This entrypoint is used for connectors created by the connector builder. These c The spec operation is not supported because the config is not known when running a spec. -This entrypoint is also the entrypoint for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/) Server. In addition to the `__injected_declarative_manifest`, the [Connector Builder backend](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/connector_builder/README.md) config requires the `__command` key, whose value is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). - ## Local development #### Building