diff --git a/airbyte-cdk/python/connector_builder/README.md b/airbyte-cdk/python/connector_builder/README.md new file mode 100644 index 000000000000..f56a422b0e8a --- /dev/null +++ b/airbyte-cdk/python/connector_builder/README.md @@ -0,0 +1,43 @@ +# Connector Builder Backend + +This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/). + +## Local development + +### Locally running the Connector Builder backend + +``` +python main.py read --config path/to/config --catalog path/to/catalog +``` + +Note: +- Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`), i.e. +``` +{ + "config": , + "__injected_declarative_manifest": {...}, + "__command": <"resolve_manifest" | "list_streams" | "stream_read"> +} +``` +*See [ConnectionSpecification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#actor-specification) for details on the `"config"` key if needed. + +- When the `__command` is `list_streams` or `resolve_manifest`, the argument to `catalog` should be an empty string. + +### Locally running the docker image + +#### Build + +First, make sure you build the latest Docker image: +``` +./gradlew airbyte-cdk:python:airbyteDocker +``` + +The docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in the Dockerfile. + +#### Run + +Then run any of the connector commands as follows: + +``` +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-declarative-manifest:dev read --config /secrets/config.json +``` diff --git a/airbyte-cdk/python/connector_builder/__init__.py b/airbyte-cdk/python/connector_builder/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/connector_builder/connector_builder_handler.py b/airbyte-cdk/python/connector_builder/connector_builder_handler.py new file mode 100644 index 000000000000..a3d3c65a8641 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/connector_builder_handler.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime + +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +def list_streams() -> AirbyteMessage: + raise NotImplementedError + + +def stream_read() -> AirbyteMessage: + raise NotImplementedError + + +def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: + try: + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + data={"manifest": source.resolved_manifest}, + emitted_at=_emitted_at(), + stream="resolve_manifest", + ), + ) + except Exception as exc: + error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") + return error.as_airbyte_message() + + +def _emitted_at(): + return int(datetime.now().timestamp()) * 1000 diff --git a/airbyte-cdk/python/connector_builder/main.py b/airbyte-cdk/python/connector_builder/main.py new file mode 100644 index 000000000000..15ac5dcb8aa9 --- /dev/null +++ b/airbyte-cdk/python/connector_builder/main.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import sys +from typing import Any, List, Mapping + +from airbyte_cdk.connector import BaseConnector +from airbyte_cdk.entrypoint import AirbyteEntrypoint +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from connector_builder.connector_builder_handler import resolve_manifest + + +def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: + manifest = config.get("__injected_declarative_manifest") + return ManifestDeclarativeSource(manifest) + + +def get_config_from_args(args: List[str]) -> Mapping[str, Any]: + parsed_args = AirbyteEntrypoint.parse_args(args) + if parsed_args.command != "read": + raise ValueError("Only read commands are allowed for Connector Builder requests.") + + config = BaseConnector.read_config(parsed_args.config) + + if "__injected_declarative_manifest" not in config: + raise ValueError( + f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" + ) + + return config + + +def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): + command = config.get("__command") + if command == "resolve_manifest": + return resolve_manifest(source) + raise ValueError(f"Unrecognized command {command}.") + + +def handle_request(args: List[str]): + config = get_config_from_args(args) + source = create_source(config) + if "__command" in config: + print(handle_connector_builder_request(source, config)) + else: + raise ValueError("Missing __command argument in config file.") + + +if __name__ == "__main__": + handle_request(sys.argv[1:]) diff --git a/airbyte-cdk/python/unit_tests/connector_builder/__init__.py b/airbyte-cdk/python/unit_tests/connector_builder/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/connector_builder/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py new file mode 100644 index 000000000000..a2f2c81d89fa --- /dev/null +++ b/airbyte-cdk/python/unit_tests/connector_builder/test_connector_builder_handler.py @@ -0,0 +1,247 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import copy +import json +from unittest import mock + +import connector_builder +import pytest +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from connector_builder.connector_builder_handler import resolve_manifest +from connector_builder.main import handle_connector_builder_request, handle_request + +_stream_name = "stream_with_custom_requester" +_stream_primary_key = "id" +_stream_url_base = "https://api.sendgrid.com" +_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} + +MANIFEST = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": "10"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$parameters": _stream_options, + "schema_loader": {"$ref": "#/definitions/schema_loader"}, + "retriever": "#/definitions/retriever", + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, +} + + +CONFIG = { + "__injected_declarative_manifest": MANIFEST, + "__command": "resolve_manifest", +} + + +@pytest.fixture +def valid_config_file(tmp_path): + config_file = tmp_path / "config.json" + config_file.write_text(json.dumps(CONFIG)) + return config_file + + +@pytest.fixture +def invalid_config_file(tmp_path): + invalid_config = copy.deepcopy(CONFIG) + invalid_config["__command"] = "bad_command" + config_file = tmp_path / "config.json" + config_file.write_text(json.dumps(invalid_config)) + return config_file + + +def test_handle_resolve_manifest(valid_config_file): + with mock.patch.object(connector_builder.main, "handle_connector_builder_request") as patch: + handle_request(["read", "--config", str(valid_config_file), "--catalog", ""]) + assert patch.call_count == 1 + + +def test_resolve_manifest(valid_config_file): + config = copy.deepcopy(CONFIG) + config["__command"] = "resolve_manifest" + source = ManifestDeclarativeSource(MANIFEST) + resolved_manifest = handle_connector_builder_request(source, config) + + expected_resolved_manifest = { + "type": "DeclarativeSource", + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path", "type": "RequestPath"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": "10"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "schema_loader": { + "type": "JsonFileSchemaLoader", + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "retriever": { + "type": "SimpleRetriever", + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "page_token_option": { + "type": "RequestPath", + "inject_into": "path", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "requester": { + "type": "HttpRequester", + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "request_parameters": {"page_size": "10"}, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["result"], + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$parameters": _stream_options, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + assert resolved_manifest.record.data["manifest"] == expected_resolved_manifest + assert resolved_manifest.record.stream == "resolve_manifest" + + +def test_resolve_manifest_error_returns_error_response(): + class MockManifestDeclarativeSource: + @property + def resolved_manifest(self): + raise ValueError + + source = MockManifestDeclarativeSource() + response = resolve_manifest(source) + assert "Error resolving manifest" in response.trace.error.message + + +@pytest.mark.parametrize( + "command", + [ + pytest.param("check", id="test_check_command_error"), + pytest.param("spec", id="test_spec_command_error"), + pytest.param("discover", id="test_discover_command_error"), + pytest.param(None, id="test_command_is_none_error"), + pytest.param("", id="test_command_is_empty_error"), + ], +) +def test_invalid_protocol_command(command, valid_config_file): + config = copy.deepcopy(CONFIG) + config["__command"] = "list_streams" + with pytest.raises(SystemExit): + handle_request([command, "--config", str(valid_config_file), "--catalog", ""]) + + +def test_missing_command(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["--config", str(valid_config_file), "--catalog", ""]) + + +def test_missing_catalog(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["read", "--config", str(valid_config_file)]) + + +def test_missing_config(valid_config_file): + with pytest.raises(SystemExit): + handle_request(["read", "--catalog", str(valid_config_file)]) + + +def test_invalid_config_command(invalid_config_file): + with pytest.raises(ValueError): + handle_request(["read", "--config", str(invalid_config_file), "--catalog", ""]) diff --git a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py index 63757dda03f5..aa762395f4dc 100644 --- a/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py +++ b/airbyte-cdk/python/unit_tests/test_source_declarative_manifest.py @@ -13,35 +13,11 @@ "__injected_declarative_manifest": { "version": "0.1.0", "definitions": { - "selector": { - "extractor": { - "field_path": [] - } - }, - "requester": { - "url_base": "https://test.com/api", - "http_method": "GET" - }, - "retriever": { - "record_selector": { - "$ref": "#/definitions/selector" - }, - "requester": { - "$ref": "#/definitions/requester" - } - }, - "base_stream": { - "retriever": { - "$ref": "#/definitions/retriever" - } - }, - "data_stream": { - "$ref": "#/definitions/base_stream", - "$parameters": { - "name": "data", - "path": "/data" - } - }, + "selector": {"extractor": {"field_path": []}}, + "requester": {"url_base": "https://test.com/api", "http_method": "GET"}, + "retriever": {"record_selector": {"$ref": "#/definitions/selector"}, "requester": {"$ref": "#/definitions/requester"}}, + "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, + "data_stream": {"$ref": "#/definitions/base_stream", "$parameters": {"name": "data", "path": "/data"}}, }, "streams": [ "#/definitions/data_stream", @@ -59,9 +35,9 @@ "title": "Test Spec", "type": "object", "additionalProperties": True, - "properties": {} - } - } + "properties": {}, + }, + }, } }