Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Weaviate: Support any string based ID and fix issues with additionalProperties #22527

Merged
merged 15 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte-config/init/src/main/resources/icons/weaviate.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,10 @@
- name: Weaviate
destinationDefinitionId: 7b7d7a0d-954c-45a0-bcfc-39a634b97736
dockerRepository: airbyte/destination-weaviate
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.com/integrations/destinations/weaviate
releaseStage: alpha
icon: weaviate.svg
- name: DuckDB
destinationDefinitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610
dockerRepository: airbyte/destination-duckdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7230,7 +7230,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-weaviate:0.1.0"
- dockerImage: "airbyte/destination-weaviate:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/weaviate"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ WORKDIR /airbyte/integration_code
# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base

&& apk --no-cache add tzdata build-base \
&& apk add libffi-dev

COPY setup.py ./
# install necessary packages to a temporary folder
Expand All @@ -34,5 +34,5 @@ COPY destination_weaviate ./destination_weaviate
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-weaviate
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ def buffered_write_operation(self, stream_name: str, record: MutableMapping):
if isinstance(v, list) and len(v) == 0 and k not in self.schema[stream_name]:
record[k] = ""

missing_properties = set(self.schema[stream_name].keys()).difference(record.keys()).discard("id")
for prop in missing_properties or []:
record[prop] = None

additional_props = set(record.keys()).difference(self.schema[stream_name].keys())
for prop in additional_props or []:
if isinstance(record[prop], dict):
record[prop] = json.dumps(record[prop])
if isinstance(record[prop], list) and len(record[prop]) > 0 and isinstance(record[prop][0], dict):
record[prop] = json.dumps(record[prop])

# Property names in Weaviate have to start with lowercase letter
record = {k[0].lower() + k[1:]: v for k, v in record.items()}
vector = None
Expand All @@ -77,7 +88,7 @@ def buffered_write_operation(self, stream_name: str, record: MutableMapping):

def flush(self, retries: int = 3):
if len(self.objects_with_error) > 0 and retries == 0:
error_msg = f"Objects had errors and retries failed as well. Object IDs: {self.objects_with_error.keys}"
error_msg = f"Objects had errors and retries failed as well. Object IDs: {self.objects_with_error.keys()}"
raise WeaviatePartialBatchError(error_msg)

results = self.client.batch.create_objects()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import hashlib
import re
import uuid
from typing import Any, Mapping
Expand Down Expand Up @@ -40,16 +41,23 @@ def hex_to_int(hex_str: str) -> int:
return 0


def is_uuid_string(uuid_string):
uuid_pattern = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-5][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}$"
return re.match(uuid_pattern, uuid_string)


def generate_id(record_id: Any) -> uuid.UUID:
if isinstance(record_id, int):
return uuid.UUID(int=record_id)

if isinstance(record_id, str):
if is_uuid_string(record_id):
return uuid.UUID(record_id)
id_int = hex_to_int(record_id)
if hex_to_int(record_id) > 0:
if id_int > 0:
return uuid.UUID(int=id_int)

return uuid.UUID(record_id)
hex_string = hashlib.md5(record_id.encode("UTF-8")).hexdigest()
return uuid.UUID(hex=hex_string)


def get_schema_from_catalog(configured_catalog: ConfiguredAirbyteCatalog) -> Mapping[str, Mapping[str, str]]:
Expand All @@ -59,7 +67,7 @@ def get_schema_from_catalog(configured_catalog: ConfiguredAirbyteCatalog) -> Map
for k, v in stream.stream.json_schema.get("properties").items():
stream_schema[k] = "default"
if "array" in v.get("type", []) and (
"object" in v.get("items", {}).get("type", []) or "array" in v.get("items", {}).get("type", [])
"object" in v.get("items", {}).get("type", []) or "array" in v.get("items", {}).get("type", []) or v.get("items", {}) == {}
):
stream_schema[k] = "jsonify"
if "object" in v.get("type", []):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from destination_weaviate import DestinationWeaviate
from destination_weaviate.client import Client, WeaviatePartialBatchError
from destination_weaviate.utils import stream_to_class_name
from destination_weaviate.utils import get_schema_from_catalog, stream_to_class_name


@pytest.fixture(name="config")
Expand Down Expand Up @@ -95,7 +95,7 @@ def setup_teardown(config: Mapping):
pass

docker_client.containers.run(
"semitechnologies/weaviate:1.16.1", detach=True, environment=env_vars, name=name,
"semitechnologies/weaviate:1.17.3", detach=True, environment=env_vars, name=name,
ports={8080: ('127.0.0.1', 8081)}
)
time.sleep(0.5)
Expand Down Expand Up @@ -403,6 +403,160 @@ def test_id_starting_with_underscore(config: Mapping, client: Client):
assert actual.get("id") == str(uuid.UUID(int=int(data.get("_id"), 16))), "UUID should be created for _id field"


def test_id_with_text_string(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"title": {"type": "string"},
"id": {"type": "string"}
}}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
data = {"title": "test1", "id": "not a real id"}
first_record_chunk = [_record(stream_name, data)]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"
actual = get_objects(client, class_name)[0]
assert actual.get("id")


def test_array_no_item_type(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"arr": {"type": "array", "items": {}},
}}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
data = {"arr": {"test1": "test"}}
first_record_chunk = [_record(stream_name, data)]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"
actual = get_objects(client, class_name)[0]
assert actual["properties"].get("arr") == json.dumps(data["arr"])


def test_array_of_objects_empty(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"arr": {"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {"name": {"type": ["null", "string"]}}}
},
}}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
data = {"arr": [{}]}
first_record_chunk = [_record(stream_name, data)]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"
actual = get_objects(client, class_name)[0]
assert actual["properties"].get("arr") == '[{}]'


def test_missing_fields(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"title": {"type": "string"},
"arr": {"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {"name": {"type": ["null", "string"]}}}
},
}}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
data = {"title": "test-missing-array"}
first_record_chunk = [_record(stream_name, data)]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"
actual = get_objects(client, class_name)[0]
assert actual["properties"].get("arr") is None


def test_record_additional_properties(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {
"type": "object", "additionalProperties": True,
"properties": {"title": {"type": "string"}}
}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
first_record_chunk = [_record(stream_name, {"title": "a-first-record"})]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"
class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"

data = {"title": "with-add-props", "add_prop": "test", "add_prop2": ["test"],
"objArray": [{"title": "sam"}], "obj": {"title": "sam"}}
second_record_chunk = [_record(stream_name, data)]
second_state_message = _state({"state": 2})
expected_states = [second_state_message]
output_states = list(
destination.write(
config, catalog, [*second_record_chunk, second_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

assert count_objects(client, class_name) == 2
actual = sorted(get_objects(client, class_name), key=lambda x: x["properties"]["title"])[1]
assert actual["properties"].get("title") == data["title"]
assert actual["properties"].get("add_prop") == data["add_prop"]


def test_id_custom_field_name(config: Mapping, client: Client):
# This is common scenario from mongoDB
stream_name = "article"
Expand Down Expand Up @@ -493,10 +647,17 @@ def test_client_delete_stream_entries(caplog, client: Client):


def test_client_flush_partial_error(client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"id": {"type": "string"},
"title": {"type": "string"},
}}
catalog = create_catalog(stream_name, stream_schema, sync_mode=DestinationSyncMode.overwrite)
client.schema = get_schema_from_catalog(catalog)
partial_error_result = load_json_file("create_objects_partial_error.json")
client.client.batch.create_objects = Mock(return_value=partial_error_result)
time.sleep = Mock(return_value=None)
client.buffered_write_operation("Article", {"id": "b7b1cfbe-20da-496c-b932-008d35805f26"})
client.buffered_write_operation("Article", {"id": "154cbccd-89f4-4b29-9c1b-001a3339d89a"})
client.buffered_write_operation("article", {"id": "b7b1cfbe-20da-496c-b932-008d35805f26", "title": "test1"})
client.buffered_write_operation("article", {"id": "154cbccd-89f4-4b29-9c1b-001a3339d89a", "title": "test2"})
with pytest.raises(WeaviatePartialBatchError):
client.flush()
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "weaviate-client==3.9.0"]
MAIN_REQUIREMENTS = ["airbyte-cdk", "weaviate-client==3.11.0"]

TEST_REQUIREMENTS = ["pytest~=6.2", "docker"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ def test_generate_id():
assert generate_id("0x1") == uuid.UUID(int=1)
assert generate_id(1) == uuid.UUID(int=1)
assert generate_id("123e4567-e89b-12d3-a456-426614174000") == uuid.UUID("123e4567-e89b-12d3-a456-426614174000")
assert generate_id("123e4567e89b12d3a456426614174000") == uuid.UUID("123e4567-e89b-12d3-a456-426614174000")
for i in range(10):
assert generate_id("this should be using md5") == uuid.UUID("802a479a-190e-92c8-8340-d687c860f53d")
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,5 +330,5 @@
| **Teradata Vantage** | <img alt="Teradata Vantage icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/teradata.svg" height="30" height="30"/> | Destination | airbyte/destination-teradata:0.1.0 | alpha | [link](https://docs.airbyte.io/integrations/destinations/teradata) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-teradata) | <small>`58e6f9da-904e-11ed-a1eb-0242ac120002`</small> |
| **TiDB** | <img alt="TiDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/tidb.svg" height="30" height="30"/> | Destination | airbyte/destination-tidb:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/tidb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-tidb) | <small>`06ec60c7-7468-45c0-91ac-174f6e1a788b`</small> |
| **Typesense** | x | Destination | airbyte/destination-typesense:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/typesense) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-typesense) | <small>`36be8dc6-9851-49af-b776-9d4c30e4ab6a`</small> |
| **Weaviate** | x | Destination | airbyte/destination-weaviate:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/weaviate) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-weaviate) | <small>`7b7d7a0d-954c-45a0-bcfc-39a634b97736`</small> |
| **Weaviate** | <img alt="Weaviate icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/weaviate.svg" height="30" height="30"/> | Destination | airbyte/destination-weaviate:0.1.1 | alpha | [link](https://docs.airbyte.com/integrations/destinations/weaviate) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-weaviate) | <small>`7b7d7a0d-954c-45a0-bcfc-39a634b97736`</small> |
| **YugabyteDB** | <img alt="YugabyteDB icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/yugabytedb.svg" height="30" height="30"/> | Destination | airbyte/destination-yugabytedb:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/destinations/yugabytedb) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/destination-yugabytedb) | <small>`2300fdcf-a532-419f-9f24-a014336e7966`</small> |
3 changes: 2 additions & 1 deletion docs/integrations/destinations/weaviate.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ You should now have all the requirements needed to configure Weaviate as a desti

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:---------------------------------------------|
| 0.1.0 | 2022-12-06 | [\#20094](https://github.com/airbytehq/airbyte/pull/20094) | Add Weaviate destination |
| 0.1.1 | 2022-02-08 | [\#22527](https://github.com/airbytehq/airbyte/pull/22527) | Multiple bug fixes: Support String based IDs, arrays of uknown type and additionalProperties of type object and array of objects |
| 0.1.0 | 2022-12-06 | [\#20094](https://github.com/airbytehq/airbyte/pull/20094) | Add Weaviate destination |