Skip to content

Commit

Permalink
unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Nov 5, 2022
1 parent bad305b commit d78628f
Showing 1 changed file with 67 additions and 9 deletions.
76 changes: 67 additions & 9 deletions airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,81 @@
from unittest.mock import MagicMock

import pytest
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, \
AirbyteLogMessage, Level, AirbyteTraceMessage, TraceType, AirbyteStateMessage, \
AirbyteStateType
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.utils.record_helper import data_to_airbyte_record

NOW = 1234567
STREAM_NAME = "my_stream"


@pytest.mark.parametrize(
"test_name, data, schema, expected_data",
[("test_data_to_airbyte_record", {"id": 0, "field_A": 1.0, "field_B": "airbyte"}, {}, {"id": 0, "field_A": 1.0, "field_B": "airbyte"})],
"test_name, data, expected_message",
[
(
"test_data_to_airbyte_record", {"id": 0, "field_A": 1.0, "field_B": "airbyte"},
AirbyteMessage(type=MessageType.RECORD,
record=AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"},
emitted_at=NOW))),
(
"test_record_to_airbyte_record",
AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"},
emitted_at=NOW),
AirbyteMessage(type=MessageType.RECORD,
record=AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"},
emitted_at=NOW))),
],
)
def test_data_to_airbyte_record(test_name, data, schema, expected_data):
NOW = 1234567
stream_name = "my_stream"
def test_data_or_record_to_airbyte_record(test_name, data, expected_message):
transformer = MagicMock()
message = data_to_airbyte_record(stream_name, data, transformer, schema)
schema = {}
message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema)
message.record.emitted_at = NOW

expected_message = AirbyteMessage(type=MessageType.RECORD, record=AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=NOW))
if isinstance(data, dict):
transformer.transform.assert_called_with(data, schema)
else:
assert not transformer.transform.called
assert expected_message == message


transformer.transform.assert_called_with(data, schema)
@pytest.mark.parametrize(
"test_name, data, expected_message",
[
(
"test_log_message_to_airbyte_record",
AirbyteLogMessage(level=Level.INFO, message="Hello, this is a log message"),
AirbyteMessage(type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message="Hello, this is a log message"))),
(
"test_trace_message_to_airbyte_record",
AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=101),
AirbyteMessage(type=MessageType.TRACE,
trace=AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=101))),
],
)
def test_log_or_trace_to_message(test_name, data, expected_message):
transformer = MagicMock()
schema = {}
message = data_to_airbyte_record(STREAM_NAME, data, transformer, schema)

assert not transformer.transform.called
assert expected_message == message


@pytest.mark.parametrize(
"test_name, data",
[
(
"test_log_message_to_airbyte_record",
AirbyteStateMessage(type=AirbyteStateType.STREAM)),
],
)
def test_state_message_to_message(test_name, data):
transformer = MagicMock()
schema = {}
with pytest.raises(ValueError):
data_to_airbyte_record(STREAM_NAME, data, transformer, schema)

0 comments on commit d78628f

Please sign in to comment.