diff --git a/airbyte-integrations/connectors/destination-kvdb/.dockerignore b/airbyte-integrations/connectors/destination-kvdb/.dockerignore new file mode 100644 index 000000000000..1b4b5767b554 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/.dockerignore @@ -0,0 +1,5 @@ +* +!Dockerfile +!main.py +!destination_kvdb +!setup.py diff --git a/airbyte-integrations/connectors/destination-kvdb/Dockerfile b/airbyte-integrations/connectors/destination-kvdb/Dockerfile new file mode 100644 index 000000000000..b0493da1cfc1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.9.11-alpine3.15 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY destination_kvdb ./destination_kvdb + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-kvdb diff --git a/airbyte-integrations/connectors/destination-kvdb/README.md b/airbyte-integrations/connectors/destination-kvdb/README.md new file mode 100644 index 000000000000..b834894111b6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/README.md @@ -0,0 +1,118 @@ +# Kvdb Destination + +This is the repository for the [Kvdb](https://kvdb.io) destination connector, written in Python. It is intended to be an example for how to write a Python destination. KvDB is a very simple key value store, which makes it great for the purposes of illustrating how to write a Python destination connector. + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:destination-kvdb:build +``` + +#### Create credentials +**If you are a community contributor**, generate the necessary credentials from [Kvdb](https://kvdb.io/docs/api/), and then create a file `secrets/config.json` conforming to the `destination_kvdb/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination kvdb test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + + + +#### Build +**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):** +```bash +airbyte-ci connectors --name=destination-kvdb build +``` + +An image will be built with the tag `airbyte/destination-kvdb:dev`. + +**Via `docker build`:** +```bash +docker build -t airbyte/destination-kvdb:dev . +``` +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-kvdb:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-kvdb:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-kvdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md): +```bash +airbyte-ci connectors --name=destination-kvdb test +``` + + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-kvdb test` +2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors). +3. Make sure the `metadata.yaml` content is up to date. +4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/kvdb.md`). +5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention). +6. Pat yourself on the back for being an awesome contributor. +7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. + diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py new file mode 100644 index 000000000000..5f3b041035bf --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/__init__.py @@ -0,0 +1,26 @@ +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +from .destination import DestinationKvdb + +__all__ = ["DestinationKvdb"] diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py new file mode 100644 index 000000000000..74d9f41176f5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/client.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Iterable, List, Mapping, Tuple, Union + +import requests + + +class KvDbClient: + base_url = "https://kvdb.io" + PAGE_SIZE = 1000 + + def __init__(self, bucket_id: str, secret_key: str = None): + self.secret_key = secret_key + self.bucket_id = bucket_id + + def write(self, key: str, value: Mapping[str, Any]): + return self.batch_write([(key, value)]) + + def batch_write(self, keys_and_values: List[Tuple[str, Mapping[str, Any]]]): + """ + https://kvdb.io/docs/api/#execute-transaction + """ + request_body = {"txn": [{"set": key, "value": value} for key, value in keys_and_values]} + return self._request("POST", json=request_body) + + def list_keys(self, list_values: bool = False, prefix: str = None) -> Iterable[Union[str, List]]: + """ + https://kvdb.io/docs/api/#list-keys + """ + # TODO handle rate limiting + pagination_complete = False + offset = 0 + + while not pagination_complete: + response = self._request( + "GET", + params={ + "limit": self.PAGE_SIZE, + "skip": offset, + "format": "json", + "prefix": prefix or "", + "values": "true" if list_values else "false", + }, + endpoint="/", # the "list" endpoint doesn't work without adding a trailing slash to the URL + ) + + response_json = response.json() + yield from response_json + + pagination_complete = len(response_json) < self.PAGE_SIZE + offset += self.PAGE_SIZE + + def delete(self, key: Union[str, List[str]]): + """ + https://kvdb.io/docs/api/#execute-transaction + """ + key_list = key if isinstance(key, List) else [key] + request_body = {"txn": [{"delete": k} for k in key_list]} + return self._request("POST", json=request_body) + + def _get_base_url(self) -> str: + return f"{self.base_url}/{self.bucket_id}" + + def _get_auth_headers(self) -> Mapping[str, Any]: + return {"Authorization": f"Bearer {self.secret_key}"} if self.secret_key else {} + + def _request( + self, http_method: str, endpoint: str = None, params: Mapping[str, Any] = None, json: Mapping[str, Any] = None + ) -> requests.Response: + url = self._get_base_url() + (endpoint or "") + headers = {"Accept": "application/json", **self._get_auth_headers()} + + response = requests.request(method=http_method, params=params, url=url, headers=headers, json=json) + + response.raise_for_status() + return response diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py new file mode 100644 index 000000000000..33ab8565fae4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/destination.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import time +import traceback +import uuid +from typing import Any, Iterable, Mapping + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type +from destination_kvdb.client import KvDbClient +from destination_kvdb.writer import KvDbWriter + + +class DestinationKvdb(Destination): + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + + """ + Reads the input stream of messages, config, and catalog to write data to the destination. + + This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received + in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been + successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, + then the source is given the last state message output from this method as the starting point of the next sync. + """ + writer = KvDbWriter(KvDbClient(**config)) + + for configured_stream in configured_catalog.streams: + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + writer.delete_stream_entries(configured_stream.stream.name) + + for message in input_messages: + if message.type == Type.STATE: + # Emitting a state message indicates that all records which came before it have been written to the destination. So we flush + # the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state + writer.flush() + yield message + elif message.type == Type.RECORD: + record = message.record + writer.queue_write_operation( + record.stream, record.data, time.time_ns() / 1_000_000 + ) # convert from nanoseconds to milliseconds + else: + # ignore other message types for now + continue + + # Make sure to flush any records still in the queue + writer.flush() + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the destination with the needed permissions + e.g: if a provided API token or password can be used to connect and write to the destination. + """ + try: + # Verify write access by attempting to write and then delete to a random key + client = KvDbClient(**config) + random_key = str(uuid.uuid4()) + client.write(random_key, {"value": "_airbyte_connection_check"}) + client.delete(random_key) + except Exception as e: + traceback.print_exc() + return AirbyteConnectionStatus( + status=Status.FAILED, message=f"An exception occurred: {e}. \nStacktrace: \n{traceback.format_exc()}" + ) + else: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json new file mode 100644 index 000000000000..0ced52c17a22 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/spec.json @@ -0,0 +1,26 @@ +{ + "documentationUrl": "https://kvdb.io/docs/api/", + "supported_destination_sync_modes": ["overwrite", "append"], + "supportsIncremental": true, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination KVdb", + "type": "object", + "required": ["bucket_id", "secret_key"], + "additionalProperties": false, + "properties": { + "bucket_id": { + "title": "Bucket ID", + "type": "string", + "description": "The ID of your KVdb bucket.", + "order": 1 + }, + "secret_key": { + "title": "Secret Key", + "type": "string", + "description": "Your bucket Secret Key.", + "order": 2 + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py new file mode 100644 index 000000000000..33acbf8a22fb --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/destination_kvdb/writer.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from collections import Mapping + +from destination_kvdb.client import KvDbClient + + +class KvDbWriter: + """ + Data is written to KvDB in the following format: + key: stream_name__ab__ + value: a JSON object representing the record's data + + This is because unless a data source explicitly designates a primary key, we don't know what to key the record on. + Since KvDB allows reading records with certain prefixes, we treat it more like a message queue, expecting the reader to + read messages with a particular prefix e.g: name__ab__123, where 123 is the timestamp they last read data from. + """ + + write_buffer = [] + flush_interval = 1000 + + def __init__(self, client: KvDbClient): + self.client = client + + def delete_stream_entries(self, stream_name: str): + """Deletes all the records belonging to the input stream""" + keys_to_delete = [] + for key in self.client.list_keys(prefix=f"{stream_name}__ab__"): + keys_to_delete.append(key) + if len(keys_to_delete) == self.flush_interval: + self.client.delete(keys_to_delete) + keys_to_delete.clear() + if len(keys_to_delete) > 0: + self.client.delete(keys_to_delete) + + def queue_write_operation(self, stream_name: str, record: Mapping, written_at: int): + kv_pair = (f"{stream_name}__ab__{written_at}", record) + self.write_buffer.append(kv_pair) + if len(self.write_buffer) == self.flush_interval: + self.flush() + + def flush(self): + self.client.batch_write(self.write_buffer) + self.write_buffer.clear() diff --git a/airbyte-integrations/connectors/destination-kvdb/main.py b/airbyte-integrations/connectors/destination-kvdb/main.py new file mode 100644 index 000000000000..178789589e5a --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/main.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import sys + +from destination_kvdb import DestinationKvdb + +if __name__ == "__main__": + DestinationKvdb().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-kvdb/requirements.txt b/airbyte-integrations/connectors/destination-kvdb/requirements.txt new file mode 100644 index 000000000000..d6e1198b1ab1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-kvdb/setup.py b/airbyte-integrations/connectors/destination-kvdb/setup.py new file mode 100644 index 000000000000..dab5520718ab --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/setup.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", + "requests", +] + +TEST_REQUIREMENTS = ["pytest~=6.1"] + +setup( + name="destination_kvdb", + description="Destination implementation for Kvdb.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py new file mode 100644 index 000000000000..219ae0142c72 --- /dev/null +++ b/airbyte-integrations/connectors/destination-kvdb/unit_tests/unit_test.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +def test_example_method(): + assert True