Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add emitted_at SAT test (22240) #22291

Merged
merged 8 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.5.2
Check that `emitted_at` increases during subsequent reads. [#22291](https://github.com/airbytehq/airbyte/pull/22291)

## 0.5.1
Fix discovered catalog caching for different configs. [#22301](https://github.com/airbytehq/airbyte/pull/22301)

Expand Down Expand Up @@ -231,7 +234,7 @@ Add validation of input config.json against spec.json.
## 0.1.9
Add configurable validation of schema for all records in BasicRead test: [#4345](https://github.com/airbytehq/airbyte/pull/4345)

The validation is ON by default.
The validation is ON by default.
To disable validation for the source you need to set `validate_schema: off` in the config file.

## 0.1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY connector_acceptance_test ./connector_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.5.1
LABEL io.airbyte.version=0.5.2
LABEL io.airbyte.name=airbyte/connector-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import time
from collections import defaultdict
from functools import partial
from logging import Logger
Expand Down Expand Up @@ -36,24 +37,26 @@ def primary_keys_only(record, pks):

@pytest.mark.default_timeout(20 * 60)
class TestFullRefresh(BaseTest):
def test_sequential_reads(
self,
inputs: ConnectionTestConfig,
connector_config: SecretDict,
configured_catalog: ConfiguredAirbyteCatalog,
docker_runner: ConnectorRunner,
detailed_logger: Logger,
def assert_emitted_at_increase_on_subsequent_runs(self, first_read_records, second_read_records):
first_read_records_data = [record.data for record in first_read_records]
assert first_read_records_data, "At least one record should be read using provided catalog"

first_read_records_emitted_at = [record.emitted_at for record in first_read_records]
max_emitted_at_first_read = max(first_read_records_emitted_at)

second_read_reords_emitted_at = [record.emitted_at for record in second_read_records]

min_emitted_at_second_read = min(second_read_reords_emitted_at)

assert max_emitted_at_first_read < min_emitted_at_second_read, "emitted_at should increase on subsequent runs"

def assert_two_sequential_reads_produce_same_or_subset_records(
self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger
):
ignored_fields = getattr(inputs, "ignored_fields") or {}
configured_catalog = full_refresh_only_catalog(configured_catalog)
output = docker_runner.call_read(connector_config, configured_catalog)
records_1 = [message.record for message in output if message.type == Type.RECORD]
records_by_stream_1 = defaultdict(list)
for record in records_1:
records_by_stream_1[record.stream].append(record.data)

output = docker_runner.call_read(connector_config, configured_catalog)
records_2 = [message.record for message in output if message.type == Type.RECORD]
records_by_stream_2 = defaultdict(list)
for record in records_2:
records_by_stream_2[record.stream].append(record.data)
Expand All @@ -78,3 +81,27 @@ def test_sequential_reads(
detailed_logger.info("Missing records")
detailed_logger.log_json_list(missing_records)
pytest.fail(msg)

def test_sequential_reads(
self,
inputs: ConnectionTestConfig,
connector_config: SecretDict,
configured_catalog: ConfiguredAirbyteCatalog,
docker_runner: ConnectorRunner,
detailed_logger: Logger,
):
ignored_fields = getattr(inputs, "ignored_fields") or {}
configured_catalog = full_refresh_only_catalog(configured_catalog)
output_1 = docker_runner.call_read(connector_config, configured_catalog)
records_1 = [message.record for message in output_1 if message.type == Type.RECORD]

# sleep for 1 second to ensure that the emitted_at timestamp is different
time.sleep(1)

output_2 = docker_runner.call_read(connector_config, configured_catalog)
records_2 = [message.record for message in output_2 if message.type == Type.RECORD]
bnchrch marked this conversation as resolved.
Show resolved Hide resolved

self.assert_emitted_at_increase_on_subsequent_runs(records_1, records_2)
self.assert_two_sequential_reads_produce_same_or_subset_records(
records_1, records_2, configured_catalog, ignored_fields, detailed_logger
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig):
ignored_fields: Dict[str, List[str]] = {"test_stream": ["ignore_me", "ignore_me_too"]}


def record_message_from_record(records: List[Dict]) -> List[AirbyteMessage]:
def record_message_from_record(records: List[Dict], emitted_at: int) -> List[AirbyteMessage]:
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111),
record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=emitted_at),
)
for record in records
]
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_read_with_ignore_fields(mocker, schema, record, expected_record, fail_c
sequence_of_docker_callread_results,
list(reversed(sequence_of_docker_callread_results)),
):
docker_runner_mock.call_read.side_effect = [record_message_from_record([first]), record_message_from_record([second])]
docker_runner_mock.call_read.side_effect = [record_message_from_record([first], emitted_at=111), record_message_from_record([second], emitted_at=112)]

t = _TestFullRefresh()
with fail_context:
Expand Down Expand Up @@ -208,8 +208,8 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re
docker_runner_mock = mocker.MagicMock()

docker_runner_mock.call_read.side_effect = [
record_message_from_record(first_read_records),
record_message_from_record(second_read_records),
record_message_from_record(first_read_records, emitted_at=111),
record_message_from_record(second_read_records, emitted_at=112),
]

t = _TestFullRefresh()
Expand All @@ -221,3 +221,69 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re
docker_runner=docker_runner_mock,
detailed_logger=mocker.MagicMock(),
)


@pytest.mark.parametrize(
"schema, records_1, records_2, expectation",
[
(
{"type": "object"},
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)),
],
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)),
],
does_not_raise()
),
(
{"type": "object"},
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)),
],
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)),
],
does_not_raise()
),
(
{"type": "object"},
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)),
],
[
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)),
],
pytest.raises(AssertionError, match="emitted_at should increase on subsequent runs")
),
],
)
def test_emitted_at_increase_on_subsequent_runs(mocker, schema, records_1, records_2, expectation):
configured_catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}),
sync_mode="full_refresh",
destination_sync_mode="overwrite",
)
]
)
docker_runner_mock = mocker.MagicMock()
docker_runner_mock.call_read.side_effect = [records_1, records_2]
input_config = ReadTestConfigWithIgnoreFields()

t = _TestFullRefresh()
with expectation:
t.test_sequential_reads(
inputs=input_config,
connector_config=mocker.MagicMock(),
configured_catalog=configured_catalog,
docker_runner=docker_runner_mock,
detailed_logger=docker_runner_mock,
)