Skip to content

Commit

Permalink
Fix issue with arrays of no data type
Browse files Browse the repository at this point in the history
  • Loading branch information
samos123 committed Feb 15, 2023
1 parent 73034a8 commit bae1015
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def buffered_write_operation(self, stream_name: str, record: MutableMapping):

def flush(self, retries: int = 3):
if len(self.objects_with_error) > 0 and retries == 0:
error_msg = f"Objects had errors and retries failed as well. Object IDs: {self.objects_with_error.keys}"
error_msg = f"Objects had errors and retries failed as well. Object IDs: {self.objects_with_error.keys()}"
raise WeaviatePartialBatchError(error_msg)

results = self.client.batch.create_objects()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def get_schema_from_catalog(configured_catalog: ConfiguredAirbyteCatalog) -> Map
for k, v in stream.stream.json_schema.get("properties").items():
stream_schema[k] = "default"
if "array" in v.get("type", []) and (
"object" in v.get("items", {}).get("type", []) or "array" in v.get("items", {}).get("type", [])
"object" in v.get("items", {}).get("type", []) or
"array" in v.get("items", {}).get("type", []) or
v.get("items", {}) == {}
):
stream_schema[k] = "jsonify"
if "object" in v.get("type", []):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,32 @@ def test_id_with_text_string(config: Mapping, client: Client):
assert actual.get("id")


def test_array_no_item_type(config: Mapping, client: Client):
stream_name = "article"
stream_schema = {"type": "object", "properties": {
"arr": {"type": "array", "items": {}},
}}
catalog = create_catalog(stream_name, stream_schema)
first_state_message = _state({"state": "1"})
data = {"arr": {"test1": "test"}}
first_record_chunk = [_record(stream_name, data)]

destination = DestinationWeaviate()

expected_states = [first_state_message]
output_states = list(
destination.write(
config, catalog, [*first_record_chunk, first_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"

class_name = stream_to_class_name(stream_name)
assert count_objects(client, class_name) == 1, "There should be only 1 object of in Weaviate"
actual = get_objects(client, class_name)[0]
assert actual["properties"].get("arr") == json.dumps(data["arr"])


def test_id_custom_field_name(config: Mapping, client: Client):
# This is common scenario from mongoDB
stream_name = "article"
Expand Down

0 comments on commit bae1015

Please sign in to comment.