Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ New: Snowflake Cortex Destination 🚀 #36807

Merged
merged 43 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
babe351
apply auto-scaffold from generator
aaronsteers Apr 3, 2024
4bde544
update metadata.yml from Pinecone as example
aaronsteers Apr 3, 2024
27fe035
added poetry and docker ignore files
bindipankhudi Apr 3, 2024
e71f696
add more files
bindipankhudi Apr 4, 2024
885ee2b
add check method
bindipankhudi Apr 8, 2024
35789ce
add some write logic
bindipankhudi Apr 12, 2024
5850dd4
adding back vector db related stuff
bindipankhudi Apr 26, 2024
ed0fd08
added missing indexer
bindipankhudi Apr 26, 2024
af5a7e9
added unit tests
bindipankhudi Apr 26, 2024
65a9fa1
added unit tests for indexer
bindipankhudi Apr 27, 2024
d0472f9
added primary key logic
bindipankhudi Apr 29, 2024
0f08b7a
added state message
bindipankhudi Apr 30, 2024
197a0d1
added end to end integration test with PyAirbyte
bindipankhudi May 2, 2024
667540e
added more integration tests
bindipankhudi May 3, 2024
74cc465
add cortex embed_text tests
bindipankhudi May 3, 2024
3680c6e
updating - Merge remote-tracking branch 'origin/master' into new/dest…
bindipankhudi May 3, 2024
d1762fd
added docs and format fixes
bindipankhudi May 7, 2024
0de044f
addressed some review comments
bindipankhudi May 7, 2024
830ec14
added spec unit test
bindipankhudi May 7, 2024
e6b930a
added icon plus format fix
bindipankhudi May 7, 2024
3119fd7
added new poetry.lock file
bindipankhudi May 7, 2024
89681fa
pinned PyAirbyte version
bindipankhudi May 10, 2024
d5fd4ef
chore: format fix
bindipankhudi May 10, 2024
dd60874
fixing unit tests
bindipankhudi May 10, 2024
af2a795
dummy commit to trigger build
bindipankhudi May 10, 2024
b28bc07
adding more integration tests
bindipankhudi May 11, 2024
7a66ef1
adding some tests
bindipankhudi May 11, 2024
b263da5
adding merge test
bindipankhudi May 11, 2024
9a09587
added documentation and disable pypi publish
bindipankhudi May 11, 2024
c2d1a6a
chore: format code
bindipankhudi May 11, 2024
e3de6d3
Merge remote-tracking branch 'origin/master' into new/destination-sno…
bindipankhudi May 11, 2024
bf4fb87
dummy change to trigger a new build
bindipankhudi May 12, 2024
163f5e9
pinned airbyte-cdk
bindipankhudi May 12, 2024
3be2c2d
commiting updated lock file
bindipankhudi May 12, 2024
58b68b9
added team name for code ownership
bindipankhudi May 12, 2024
290c79c
Merge remote-tracking branch 'origin/master' into new/destination-sno…
bindipankhudi May 12, 2024
76139c4
adddress PR review feedback
bindipankhudi May 14, 2024
6576922
added host and schema and ordered config fields to match snowflake de…
bindipankhudi May 14, 2024
286327b
added new credentials section in config
bindipankhudi May 14, 2024
0ad2f81
Update airbyte-integrations/connectors/destination-snowflake-cortex/d…
bindipankhudi May 14, 2024
4f5bc4c
Update airbyte-integrations/connectors/destination-snowflake-cortex/m…
bindipankhudi May 14, 2024
5a55537
updated test config
bindipankhudi May 14, 2024
83eca09
Merge remote-tracking branch 'origin/master' into new/destination-sno…
bindipankhudi May 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Snowflake Cortex Destination

This is the repository for the Snowflake Cortex destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.9.0`

#### Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.


#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/snowflake-cortex)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_snowflake_cortex/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination snowflake-cortex test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image

#### Use `airbyte-ci` to build your connector
The Airbyte way of building this connector is to use our `airbyte-ci` tool.
You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1).
Then running the following command will build your connector:

```bash
airbyte-ci connectors --name destination-snowflake-cortex build
```
Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-snowflake-cortex:dev`.

##### Customizing our build process
When contributing on our connector you might need to customize the build process to add a system dependency or set an env var.
You can customize our build process by adding a `build_customization.py` module to your connector.
This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively.
It will be imported at runtime by our build process and the functions will be called if they exist.

Here is an example of a `build_customization.py` module:
```python
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
# Feel free to check the dagger documentation for more information on the Container object and its methods.
# https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/
from dagger import Container


async def pre_connector_install(base_image_container: Container) -> Container:
return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value")

async def post_connector_install(connector_container: Container) -> Container:
return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value")
```

#### Build your own connector image
This connector is built using our dynamic built process in `airbyte-ci`.
The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`.
The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py).
It does not rely on a Dockerfile.

If you would like to patch our connector and build your own a simple approach would be to:

1. Create your own Dockerfile based on the latest version of the connector image.
```Dockerfile
FROM airbyte/destination-snowflake-cortex:latest

COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code

# The entrypoint and default env vars are already set in the base image
# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
```
Please use this as an example. This is not optimized.

2. Build your image:
```bash
docker build -t airbyte/destination-snowflake-cortex:dev .
# Running the spec command against your patched connector
docker run airbyte/destination-snowflake-cortex:dev spec
````
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-snowflake-cortex:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-snowflake-cortex:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-snowflake-cortex:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Coming soon:

### Using `airbyte-ci` to run tests
See [airbyte-ci documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command)

## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
acceptance_tests:
spec:
tests:
- spec_path: integration_tests/spec.json
connector_image: airbyte/destination-snowflake-cortex:dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Pinecone Destination Connector Bootstrap

This destination does three things:
* Split records into chunks and separates metadata from text data
* Embeds text data into an embedding vector
* Stores the metadata and embedding vector in Pinecone

The record processing is using the text split components from https://python.langchain.com/docs/modules/data_connection/document_transformers/.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*
!Dockerfile
!main.py
!destination_pinecone
!pyproject.toml
!poetry.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .destination import DestinationSnowflakeCortex

__all__ = ["DestinationSnowflakeCortex"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from pydantic import BaseModel, Field
from airbyte_cdk.destinations.vector_db_based.config import VectorDBConfigModel


# to-do - override the embedding model to use Snowflake supported model
class SnowflakeCortexIndexingModel(BaseModel):
account: str = Field(
...,
title="Account",
airbyte_secret=True,
description="Enter the account name you want to use to access the database.",
examples=["xxx.us-east-2.aws"]
)
username: str = Field(
...,
title="Username",
airbyte_secret=True,
description="Enter the name of the user you want to use to access the database",
examples=["AIRBYTE_USER"]
)
password: str = Field(
...,
title="Password",
airbyte_secret=True,
description="Enter the password associated with the user you entered above"
)
database: str = Field(
...,
title="Database",
airbyte_secret=True,
description="Enter the name of the database that you want to sync data into",
examples=["AIRBYTE_DATABASE"]
)
warehouse: str = Field(
...,
title="Warehouse",
airbyte_secret=True,
description="Enter the name of the warehouse that you want to sync data into",
examples=["AIRBYTE_WAREHOUSE"]
)
role: str = Field(
...,
title="Role",
airbyte_secret=True,
description="Enter the name of the role that you want to sync data into",
examples=["AIRBYTE_ROLE", "ACCOUNTADMIN"]
)

class Config:
title = "Indexing"
schema_extra = {
"description": "Snowflake can be used to store vector data and retrieve embeddings.",
"group": "indexing",
}

class ConfigModel(VectorDBConfigModel):
indexing: SnowflakeCortexIndexingModel
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from typing import Any, Iterable, Mapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
from destination_snowflake_cortex.config import ConfigModel
from airbyte_cdk.destinations.vector_db_based.indexer import Indexer
from airbyte_cdk.destinations.vector_db_based.writer import Writer
from destination_snowflake_cortex.indexer import SnowflakeCortexIndexer
from airbyte_cdk.destinations.vector_db_based.embedder import Embedder, create_from_config

BATCH_SIZE = 32

class DestinationSnowflakeCortex(Destination):
indexer: Indexer
embedder: Embedder

def _init_indexer(self, config: ConfigModel, configured_catalog: ConfiguredAirbyteCatalog | None = None):
self.embedder = create_from_config(config.embedding, config.processing)
self.indexer = SnowflakeCortexIndexer(config.indexing, self.embedder.embedding_dimensions, configured_catalog)


def write(self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
parsed_config = ConfigModel.parse_obj(config)
self._init_indexer(parsed_config, configured_catalog)
writer = Writer(
parsed_config.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=parsed_config.omit_raw_text
)
yield from writer.write(configured_catalog, input_messages)


def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
parsed_config = ConfigModel.parse_obj(config)
self._init_indexer(parsed_config)
self.indexer.check()
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")


def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
return ConnectorSpecification(
documentationUrl="https://docs.airbyte.com/integrations/destinations/snowflake-cortex",
supportsIncremental=True,
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append, DestinationSyncMode.append_dedup],
connectionSpecification=ConfigModel.schema(), # type: ignore[attr-defined]
)
Loading
Loading