Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New connector_builder module for handling requests from the Connector Builder #23888

Merged
merged 10 commits into from
Mar 14, 2023
32 changes: 32 additions & 0 deletions airbyte-cdk/python/connector_builder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Connector Builder Backend
clnoll marked this conversation as resolved.
Show resolved Hide resolved

This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/).

## Local development

### Locally running the Connector Builder backend

```
python main.py read --config secrets/config.json
```

Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`).

### Locally running the docker image

#### Build

First, make sure you build the latest Docker image:
```
./gradlew airbyte-cdk:python:airbyteDocker
```

The docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in the Dockerfile.

#### Run

Then run any of the connector commands as follows:

```
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-declarative-manifest:dev read --config /secrets/config.json
```
3 changes: 3 additions & 0 deletions airbyte-cdk/python/connector_builder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
36 changes: 36 additions & 0 deletions airbyte-cdk/python/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import datetime

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def list_streams() -> AirbyteMessage:
raise NotImplementedError


def stream_read() -> AirbyteMessage:
raise NotImplementedError


def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
try:
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
data={"manifest": source.resolved_manifest},
emitted_at=_emitted_at(),
stream="resolve_manifest",
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.")
return error.as_airbyte_message()


def _emitted_at():
return int(datetime.now().timestamp()) * 1000
61 changes: 61 additions & 0 deletions airbyte-cdk/python/connector_builder/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import argparse
import sys
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from connector_builder.connector_builder_handler import resolve_manifest


def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource:
manifest = config.get("__injected_declarative_manifest")
return ManifestDeclarativeSource(manifest)


def get_config_from_args(args: List[str]) -> Mapping[str, Any]:
command, config_filepath = preparse(args)
if command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")

config = BaseConnector.read_config(config_filepath)

if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
)

return config


def preparse(args: List[str]) -> Tuple[str, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interface should be the same as other connectors so this should also accept --catalog and and optional --state

It might make sense to always use AirbyteEntrypoint.parse_args

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clnoll is there a reason to use a preparse method instead of directly calling AirbyteEntrypoint.parse_args? Using the same method ensures we're using the same interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, updated to use AirbyteEntrypoint.parse_args.

parser = argparse.ArgumentParser()
parser.add_argument("command", type=str, help="Airbyte Protocol command")
parser.add_argument("--config", type=str, required=True, help="path to the json configuration file")
parser.add_argument("--catalog", type=str, required=True, help="path to the catalog file, if it exists (otherwise empty string)")
parsed, _ = parser.parse_known_args(args)
return parsed.command, parsed.config


def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]):
command = config.get("__command")
if command == "resolve_manifest":
return resolve_manifest(source)
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]):
config = get_config_from_args(args)
source = create_source(config)
if "__command" in config:
print(handle_connector_builder_request(source, config))
else:
raise ValueError("Missing __command argument in config file.")


if __name__ == "__main__":
handle_request(sys.argv[1:])
2 changes: 2 additions & 0 deletions airbyte-cdk/python/source_declarative_manifest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This entrypoint is used for connectors created by the connector builder. These c

The spec operation is not supported because the config is not known when running a spec.

This entrypoint is also the entrypoint for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/) Server. In addition to the `__injected_declarative_manifest`, the [Connector Builder backend](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/connector_builder/README.md) config requires the `__command` key, whose value is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make sense to provide complete config examples (maybe without the manifest) to clarify how this is meant to be used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the connector_builder README.


## Local development

#### Building
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/unit_tests/connector_builder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
import json
from unittest import mock

import connector_builder
import pytest
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from connector_builder.connector_builder_handler import resolve_manifest
from connector_builder.main import handle_connector_builder_request, handle_request

_stream_name = "stream_with_custom_requester"
_stream_primary_key = "id"
_stream_url_base = "https://api.sendgrid.com"
_stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base}

MANIFEST = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$parameters": _stream_options,
"schema_loader": {"$ref": "#/definitions/schema_loader"},
"retriever": "#/definitions/retriever",
},
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}


CONFIG = {
"__injected_declarative_manifest": MANIFEST,
"__command": "resolve_manifest",
}


@pytest.fixture
def valid_config_file(tmp_path):
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps(CONFIG))
return config_file


@pytest.fixture
def invalid_config_file(tmp_path):
invalid_config = copy.deepcopy(CONFIG)
invalid_config["__command"] = "bad_command"
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps(invalid_config))
return config_file


def test_handle_resolve_manifest(valid_config_file):
with mock.patch.object(connector_builder.main, "handle_connector_builder_request") as patch:
handle_request(["read", "--config", str(valid_config_file), "--catalog", ""])
assert patch.call_count == 1


def test_resolve_manifest(valid_config_file):
config = copy.deepcopy(CONFIG)
config["__command"] = "resolve_manifest"
source = ManifestDeclarativeSource(MANIFEST)
resolved_manifest = handle_connector_builder_request(source, config)

expected_resolved_manifest = {
"type": "DeclarativeSource",
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path", "type": "RequestPath"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": "10"},
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"schema_loader": {
"type": "JsonFileSchemaLoader",
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"retriever": {
"type": "SimpleRetriever",
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {
"type": "RequestOption",
"inject_into": "request_parameter",
"field_name": "page_size",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"page_token_option": {
"type": "RequestPath",
"inject_into": "path",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"pagination_strategy": {
"type": "CursorPagination",
"cursor_value": "{{ response._metadata.next }}",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"requester": {
"type": "HttpRequester",
"path": "/v3/marketing/lists",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config.apikey }}",
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"request_parameters": {"page_size": "10"},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": ["result"],
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
"name": _stream_name,
"primary_key": _stream_primary_key,
"url_base": _stream_url_base,
"$parameters": _stream_options,
},
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
assert resolved_manifest.record.data["manifest"] == expected_resolved_manifest
assert resolved_manifest.record.stream == "resolve_manifest"


def test_resolve_manifest_error_returns_error_response():
class MockManifestDeclarativeSource:
@property
def resolved_manifest(self):
raise ValueError

source = MockManifestDeclarativeSource()
response = resolve_manifest(source)
assert "Error resolving manifest" in response.trace.error.message


@pytest.mark.parametrize(
"command",
[
pytest.param("check", id="test_check_command_error"),
pytest.param("spec", id="test_spec_command_error"),
pytest.param("discover", id="test_discover_command_error"),
pytest.param(None, id="test_command_is_none_error"),
pytest.param("", id="test_command_is_empty_error"),
],
)
def test_invalid_protocol_command(command, valid_config_file):
config = copy.deepcopy(CONFIG)
config["__command"] = "list_streams"
with pytest.raises(ValueError):
handle_request([command, "--config", str(valid_config_file), "--catalog", ""])


def test_missing_command(valid_config_file):
with pytest.raises(SystemExit):
handle_request(["--config", str(valid_config_file), "--catalog", ""])


def test_missing_catalog(valid_config_file):
with pytest.raises(SystemExit):
handle_request(["read", "--config", str(valid_config_file)])


def test_missing_config(valid_config_file):
with pytest.raises(SystemExit):
handle_request(["read", "--catalog", str(valid_config_file)])


def test_invalid_config_command(invalid_config_file):
with pytest.raises(ValueError):
handle_request(["read", "--config", str(invalid_config_file), "--catalog", ""])
Loading