Skip to content

Commit

Permalink
✨ New: Snowflake Cortex Destination 🚀 (#36807)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Steers <aj@airbyte.io>
  • Loading branch information
bindipankhudi and aaronsteers authored May 14, 2024
1 parent 100f4e0 commit 4b84c63
Show file tree
Hide file tree
Showing 22 changed files with 5,552 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/airbyte-integrations/connectors/destination-milvus @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-qdrant @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-chroma @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-snowflake-cortex @airbytehq/ai-language-models
/airbyte-cdk/python/airbyte_cdk/destinations/vector_db_based @airbytehq/ai-language-models

# CI/CD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*
!Dockerfile
!main.py
!destination_snowflake_cortex
!pyproject.toml
!poetry.lock
145 changes: 145 additions & 0 deletions airbyte-integrations/connectors/destination-snowflake-cortex/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Snowflake Cortex Destination

This is the repository for the Snowflake Cortex destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.9.0`

### Installing the connector
From this connector directory, run:
```bash
poetry install --with dev.
```

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_snowflake_cortex/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination snowflake-cortex test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
poetry run python main.py spec
poetry run python main.py check --config secrets/config.json
cat examples/messages.jsonl | poetry run python main.py write --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image

#### Use `airbyte-ci` to build your connector
The Airbyte way of building this connector is to use our `airbyte-ci` tool.
You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1).
Then running the following command will build your connector:

```bash
airbyte-ci connectors --name destination-snowflake-cortex build
```
Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-snowflake-cortex:dev`.

##### Customizing our build process
When contributing on our connector you might need to customize the build process to add a system dependency or set an env var.
You can customize our build process by adding a `build_customization.py` module to your connector.
This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively.
It will be imported at runtime by our build process and the functions will be called if they exist.

Here is an example of a `build_customization.py` module:
```python
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
# Feel free to check the dagger documentation for more information on the Container object and its methods.
# https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/
from dagger import Container


async def pre_connector_install(base_image_container: Container) -> Container:
return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value")

async def post_connector_install(connector_container: Container) -> Container:
return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value")
```

#### Build your own connector image
This connector is built using our dynamic built process in `airbyte-ci`.
The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`.
The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py).
It does not rely on a Dockerfile.

If you would like to patch our connector and build your own a simple approach would be to:

1. Create your own Dockerfile based on the latest version of the connector image.
```Dockerfile
FROM airbyte/destination-snowflake-cortex:latest

COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code

# The entrypoint and default env vars are already set in the base image
# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
```
Please use this as an example. This is not optimized.

2. Build your image:
```bash
docker build -t airbyte/destination-snowflake-cortex:dev .
# Running the spec command against your patched connector
docker run airbyte/destination-snowflake-cortex:dev spec
```
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-snowflake-cortex:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-snowflake-cortex:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-snowflake-cortex:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=destination-snowflake-cortex test
```

### Unit Tests
To run unit tests locally, from the connector directory run:
```
poetry run pytest -s unit_tests
```

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).

To run integration tests locally, make sure you have a secrets/config.json as explained above, and then run:
```
poetry run pytest -s integration_tests
```

### Customizing acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.

### Using `airbyte-ci` to run tests
See [airbyte-ci documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command)

## Dependency Management
All of your dependencies should go in `pyproject.toml`
* required for your connector to work need to go to `[tool.poetry.dependencies]` list.
* required for the testing need to go to `[tool.poetry.group.dev.dependencies]` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
acceptance_tests:
spec:
tests:
- spec_path: integration_tests/spec.json
connector_image: airbyte/destination-snowflake-cortex:dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Snowflake Cortex Destination Connector Bootstrap

This destination does three things:
* Split records into chunks and separates metadata from text data
* Embeds text data into an embedding vector
* Stores the metadata and embedding vector in Snowflake

The record processing is using the text split components from https://python.langchain.com/docs/modules/data_connection/document_transformers/.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .destination import DestinationSnowflakeCortex

__all__ = ["DestinationSnowflakeCortex"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from typing import Literal, Union

from airbyte_cdk.destinations.vector_db_based.config import VectorDBConfigModel
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field


class PasswordBasedAuthorizationModel(BaseModel):
password: str = Field(
...,
title="Password",
airbyte_secret=True,
description="Enter the password you want to use to access the database",
examples=["AIRBYTE_PASSWORD"],
)

class Config:
title = "Credentials"


# to-do - https://github.com/airbytehq/airbyte/issues/38007 - add Snowflake supported models to embedding options
class SnowflakeCortexIndexingModel(BaseModel):
host: str = Field(
...,
title="Host",
airbyte_secret=True,
description="Enter the account name you want to use to access the database. This is usually the identifier before .snowflakecomputing.com",
examples=["AIRBYTE_ACCOUNT"],
)
role: str = Field(
...,
title="Role",
airbyte_secret=True,
description="Enter the role that you want to use to access Snowflake",
examples=["AIRBYTE_ROLE", "ACCOUNTADMIN"],
)
warehouse: str = Field(
...,
title="Warehouse",
airbyte_secret=True,
description="Enter the name of the warehouse that you want to sync data into",
examples=["AIRBYTE_WAREHOUSE"],
)
database: str = Field(
...,
title="Database",
airbyte_secret=True,
description="Enter the name of the database that you want to sync data into",
examples=["AIRBYTE_DATABASE"],
)
default_schema: str = Field(
...,
title="Default Schema",
airbyte_secret=True,
description="Enter the name of the default schema",
examples=["AIRBYTE_SCHEMA"],
)
username: str = Field(
...,
title="Username",
airbyte_secret=True,
description="Enter the name of the user you want to use to access the database",
examples=["AIRBYTE_USER"],
)

credentials: PasswordBasedAuthorizationModel

class Config:
title = "Indexing"
schema_extra = {
"description": "Snowflake can be used to store vector data and retrieve embeddings.",
"group": "indexing",
}


class ConfigModel(VectorDBConfigModel):
indexing: SnowflakeCortexIndexingModel
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import os
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.destinations.vector_db_based.embedder import Embedder, create_from_config
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
from airbyte_cdk.destinations.vector_db_based.writer import Writer
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
from destination_snowflake_cortex.config import ConfigModel
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer

BATCH_SIZE = 32


class DestinationSnowflakeCortex(Destination):
indexer: Indexer
embedder: Embedder

def _init_indexer(self, config: ConfigModel, configured_catalog: Optional[ConfiguredAirbyteCatalog] = None):
self.embedder = create_from_config(config.embedding, config.processing)
self.indexer = SnowflakeCortexIndexer(config.indexing, self.embedder.embedding_dimensions, configured_catalog)

def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
parsed_config = ConfigModel.parse_obj(config)
self._init_indexer(parsed_config, configured_catalog)
writer = Writer(
parsed_config.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=parsed_config.omit_raw_text
)
yield from writer.write(configured_catalog, input_messages)

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
parsed_config = ConfigModel.parse_obj(config)
self._init_indexer(parsed_config)
self.indexer.check()
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")

def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
return ConnectorSpecification(
documentationUrl="https://docs.airbyte.com/integrations/destinations/snowflake-cortex",
supportsIncremental=True,
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append, DestinationSyncMode.append_dedup],
connectionSpecification=ConfigModel.schema(), # type: ignore[attr-defined]
)
Loading

0 comments on commit 4b84c63

Please sign in to comment.