Skip to content

Commit

Permalink
add tests for destination
Browse files Browse the repository at this point in the history
  • Loading branch information
abrahamy committed Dec 17, 2024
1 parent b2a3595 commit b900b5f
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
from destination_deepset import util
from destination_deepset.api import APIError
from destination_deepset.models import DeepsetCloudFile, Filetypes
from destination_deepset.writer import DeepsetCloudFileWriter, WriterError
from destination_deepset.writer import DeepsetCloudFileWriter

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -41,7 +40,7 @@ def write(
Returns:
Iterable[AirbyteMessage]: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
writer = DeepsetCloudFileWriter.factory(config=config)
writer = DeepsetCloudFileWriter.factory(config)

streams: set[str] = set()
for catalog_stream in configured_catalog.streams:
Expand Down Expand Up @@ -82,13 +81,9 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
AirbyteConnectionStatus: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
writer = DeepsetCloudFileWriter.factory(config=config)
writer = DeepsetCloudFileWriter.factory(config)
writer.client.health_check()
except APIError:
logger.exception("Failed to connect to deepset cloud API!")
except WriterError:
logger.exception("Failed to initialize writer due to invalid configuration!")
except Exception as ex:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"Failed to connect to deepset cloud, reason: {ex!s}")
else:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

return AirbyteConnectionStatus(status=Status.FAILED, message="Failed to connect to deepset cloud!")
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
import json
from enum import Enum, unique
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any
from urllib.parse import unquote, urlparse

from airbyte_cdk.models import AirbyteRecordMessage
from pydantic import BaseModel, Field

if TYPE_CHECKING:
from datetime import datetime


__all__ = [
"DeepsetCloudConfig",
"DeepsetCloudFile",
Expand Down Expand Up @@ -68,7 +64,7 @@ class FileData(BaseModel):
title="Document Key",
description="A unique identifier for the processed file which can be used as a primary key.",
)
last_modified: datetime | None = Field(
last_modified: str | None = Field(
None,
alias="_ab_source_file_last_modified",
title="Last Modified",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from __future__ import annotations

import secrets
from collections.abc import Mapping
from logging import Logger
from pathlib import Path
from typing import Any
from unittest.mock import Mock
from uuid import uuid4

import pytest
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog, Type
from destination_deepset.api import DeepsetCloudApi
from destination_deepset.models import DeepsetCloudConfig
from destination_deepset.writer import DeepsetCloudFileWriter
from pytest import MonkeyPatch


@pytest.fixture()
def config() -> Mapping[str, Any]:
return {
"api_key": secrets.token_urlsafe(16),
"base_url": "https://api.dev.cloud.dpst.dev",
"workspace": "airbyte-test",
"retries": 5,
}


@pytest.fixture()
def configured_catalog() -> ConfiguredAirbyteCatalog:
path = Path("./sample_files/configured_catalog.json")
return ConfiguredAirbyteCatalog.parse_file(path)


@pytest.fixture()
def input_messages() -> list[AirbyteMessage]:
with Path("./sample_files/messages.jsonl").open() as f:
return [AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage.parse_raw(line)) for line in f]


@pytest.fixture()
def logger() -> Logger:
return Mock(spec=Logger)


@pytest.fixture()
def api_client(monkeypatch: MonkeyPatch, config: Mapping[str, Any]) -> DeepsetCloudApi:
cloud_config = DeepsetCloudConfig.parse_obj(config)
patched = DeepsetCloudApi(cloud_config)

monkeypatch.setattr(patched, "health_check", lambda *_: None)
monkeypatch.setattr(patched, "upload", lambda *_: uuid4())

return patched


@pytest.fixture(autouse=True)
def _ensure_mock_api(monkeypatch: MonkeyPatch, api_client: DeepsetCloudApi) -> None:
def factory(*args) -> DeepsetCloudFileWriter:
return DeepsetCloudFileWriter(api_client)

monkeypatch.setattr("destination_deepset.destination.DeepsetCloudFileWriter.factory", factory)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

from collections.abc import Mapping
from logging import Logger
from typing import Any

from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Level, Status, Type
from destination_deepset.destination import DestinationDeepset


def test_write(
config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: list[AirbyteMessage]
) -> None:
batch_size = len(input_messages)
assert batch_size > 0, "Number of messages should match lines in ./sample_files/messages.jsonl"

destination = DestinationDeepset()

results = list(destination.write(config, configured_catalog, iter(input_messages)))
assert len(results) == batch_size
for result in results:
assert isinstance(result, AirbyteMessage)
assert result.type == Type.LOG
assert result.log.level == Level.INFO
assert result.log.stack_trace is None


def test_check(logger: Logger, config: Mapping[str, Any]) -> None:
destination = DestinationDeepset()
outcome = destination.check(logger, config)
assert outcome.status == Status.SUCCEEDED
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"name": "test_unstructured",
"json_schema": {
"type": "object",
"additionalProperties": false,
"properties": {
"document_key": {
"type": "string"
Expand All @@ -14,6 +15,9 @@
}
}
},
"format": {
"filetype": "unstructured"
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": false
},
Expand Down

0 comments on commit b900b5f

Please sign in to comment.