From d0772b529fab84b9bb6bf77243537f64763b56d1 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 1 Feb 2023 16:03:29 -0800 Subject: [PATCH 1/8] Add skeleton for test_emitted_at_increase_on_subsequent_runs --- .../connector_acceptance_test/tests/test_core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 20c878c9fbb6..675bc395bcf1 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -852,6 +852,9 @@ def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicR assert len(error_trace_messages) >= 1, "Connector should emit at least one error trace message" + def test_emitted_at_increase_on_subsequent_runs(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): + assert False, "hahaha" + @staticmethod def remove_extra_fields(record: Any, spec: Any) -> Any: """Remove keys from record that spec doesn't have, works recursively""" From 76781a5142d7f238e6969b63f2acef7b067b5531 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 1 Feb 2023 18:14:06 -0800 Subject: [PATCH 2/8] Add emitted_at increase test --- .../tests/test_core.py | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 675bc395bcf1..4f0b8c1fa644 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -5,6 +5,9 @@ import json import logging import re +import time +from deepdiff import DeepDiff + from collections import Counter, defaultdict from functools import reduce from logging import Logger @@ -47,7 +50,6 @@ from docker.errors import ContainerError from jsonschema._utils import flatten - @pytest.fixture(name="connector_spec_dict") def connector_spec_dict_fixture(actual_connector_spec): return json.loads(actual_connector_spec.json()) @@ -826,6 +828,38 @@ def test_read( detailed_logger=detailed_logger, ) + def test_emitted_at_increase_on_subsequent_runs( + self, + connector_config, + configured_catalog, + docker_runner: ConnectorRunner, + ): + first_read_output = docker_runner.call_read(connector_config, configured_catalog) + first_read_records = filter_output(first_read_output, Type.RECORD) + first_read_records_data = [message.record.data for message in first_read_records] + + assert first_read_records_data, "At least one record should be read using provided catalog" + + first_read_records_emitted_at = [message.record.emitted_at for message in first_read_records] + max_emitted_at = max(first_read_records_emitted_at) + + # sleep for 1 second to ensure that the emitted_at timestamp is different + time.sleep(1) + + second_read_output = docker_runner.call_read(connector_config, configured_catalog) + second_read_records = filter_output(second_read_output, Type.RECORD) + second_read_records_data = [message.record.data for message in second_read_records] + second_read_reords_emitted_at = [message.record.emitted_at for message in second_read_records] + + min_emitted_at = min(second_read_reords_emitted_at) + + assert max_emitted_at < min_emitted_at, "emitted_at should increase on subsequent runs" + + # Compare two lists, ignoring order of elements. + diff = DeepDiff(first_read_records_data, second_read_records_data, ignore_order=True, report_repetition=True) + + assert diff == {}, f"records should be the same on subsequent runs. Diff: {diff}" + def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): if not inputs.expect_trace_message_on_failure: pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`") @@ -852,9 +886,6 @@ def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicR assert len(error_trace_messages) >= 1, "Connector should emit at least one error trace message" - def test_emitted_at_increase_on_subsequent_runs(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): - assert False, "hahaha" - @staticmethod def remove_extra_fields(record: Any, spec: Any) -> Any: """Remove keys from record that spec doesn't have, works recursively""" From e8cfac97376c77df67d27a563eda5bc03be4b858 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 1 Feb 2023 18:34:48 -0800 Subject: [PATCH 3/8] Add tests --- .../unit_tests/test_core.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py index c8d9c8329865..cf07f5d7e53a 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py @@ -329,6 +329,94 @@ def test_read(schema, record, expectation): ) +@pytest.mark.parametrize( + "schema, records_1, records_2, expectation", + [ + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + pytest.raises(AssertionError, match="emitted_at should increase on subsequent runs") + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 25}, emitted_at=112)), + ], + pytest.raises(AssertionError, match="records should be the same on subsequent runs") + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + pytest.raises(AssertionError, match="records should be the same on subsequent runs") + ), + + ], +) +def test_emitted_at_increase_on_subsequent_runs(schema, records_1, records_2, expectation): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + ] + ) + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.side_effect = [records_1, records_2] + t = test_core.TestBasicRead() + with expectation: + t.test_emitted_at_increase_on_subsequent_runs( + connector_config=None, + configured_catalog=configured_catalog, + docker_runner=docker_runner_mock, + ) + + @pytest.mark.parametrize( "output, expect_trace_message_on_failure, should_fail", [ From 0e98747c43645245c184b05e1b04e08980d8086a Mon Sep 17 00:00:00 2001 From: Ben Church Date: Mon, 6 Feb 2023 18:26:45 -0800 Subject: [PATCH 4/8] Move test_emitted_at_increase_on_subsequent_runs to test_test_full_refresh.py --- .../tests/test_core.py | 34 +------ .../tests/test_full_refresh.py | 53 +++++++---- .../unit_tests/test_core.py | 88 ------------------- .../unit_tests/test_test_full_refresh.py | 76 ++++++++++++++-- 4 files changed, 111 insertions(+), 140 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 4f0b8c1fa644..7336987d6674 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -5,8 +5,7 @@ import json import logging import re -import time -from deepdiff import DeepDiff + from collections import Counter, defaultdict from functools import reduce @@ -50,6 +49,7 @@ from docker.errors import ContainerError from jsonschema._utils import flatten + @pytest.fixture(name="connector_spec_dict") def connector_spec_dict_fixture(actual_connector_spec): return json.loads(actual_connector_spec.json()) @@ -828,37 +828,7 @@ def test_read( detailed_logger=detailed_logger, ) - def test_emitted_at_increase_on_subsequent_runs( - self, - connector_config, - configured_catalog, - docker_runner: ConnectorRunner, - ): - first_read_output = docker_runner.call_read(connector_config, configured_catalog) - first_read_records = filter_output(first_read_output, Type.RECORD) - first_read_records_data = [message.record.data for message in first_read_records] - - assert first_read_records_data, "At least one record should be read using provided catalog" - - first_read_records_emitted_at = [message.record.emitted_at for message in first_read_records] - max_emitted_at = max(first_read_records_emitted_at) - - # sleep for 1 second to ensure that the emitted_at timestamp is different - time.sleep(1) - - second_read_output = docker_runner.call_read(connector_config, configured_catalog) - second_read_records = filter_output(second_read_output, Type.RECORD) - second_read_records_data = [message.record.data for message in second_read_records] - second_read_reords_emitted_at = [message.record.emitted_at for message in second_read_records] - - min_emitted_at = min(second_read_reords_emitted_at) - - assert max_emitted_at < min_emitted_at, "emitted_at should increase on subsequent runs" - - # Compare two lists, ignoring order of elements. - diff = DeepDiff(first_read_records_data, second_read_records_data, ignore_order=True, report_repetition=True) - assert diff == {}, f"records should be the same on subsequent runs. Diff: {diff}" def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): if not inputs.expect_trace_message_on_failure: diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py index 7b3712637d32..cda7a1896ea6 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py @@ -6,12 +6,13 @@ from functools import partial from logging import Logger from typing import List, Mapping +import time import pytest from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type from connector_acceptance_test.base import BaseTest from connector_acceptance_test.config import ConnectionTestConfig -from connector_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, full_refresh_only_catalog, make_hashable +from connector_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, full_refresh_only_catalog, make_hashable from connector_acceptance_test.utils.json_schema_helper import CatalogField @@ -36,24 +37,24 @@ def primary_keys_only(record, pks): @pytest.mark.default_timeout(20 * 60) class TestFullRefresh(BaseTest): - def test_sequential_reads( - self, - inputs: ConnectionTestConfig, - connector_config: SecretDict, - configured_catalog: ConfiguredAirbyteCatalog, - docker_runner: ConnectorRunner, - detailed_logger: Logger, - ): - ignored_fields = getattr(inputs, "ignored_fields") or {} - configured_catalog = full_refresh_only_catalog(configured_catalog) - output = docker_runner.call_read(connector_config, configured_catalog) - records_1 = [message.record for message in output if message.type == Type.RECORD] + def assert_emitted_at_increase_on_subsequent_runs(self, first_read_records, second_read_records): + first_read_records_data = [record.data for record in first_read_records] + assert first_read_records_data, "At least one record should be read using provided catalog" + + first_read_records_emitted_at = [record.emitted_at for record in first_read_records] + max_emitted_at = max(first_read_records_emitted_at) + + second_read_reords_emitted_at = [record.emitted_at for record in second_read_records] + + min_emitted_at = min(second_read_reords_emitted_at) + + assert max_emitted_at < min_emitted_at, "emitted_at should increase on subsequent runs" + + def assert_two_sequential_reads_produce_same_or_subset_records(self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger): records_by_stream_1 = defaultdict(list) for record in records_1: records_by_stream_1[record.stream].append(record.data) - output = docker_runner.call_read(connector_config, configured_catalog) - records_2 = [message.record for message in output if message.type == Type.RECORD] records_by_stream_2 = defaultdict(list) for record in records_2: records_by_stream_2[record.stream].append(record.data) @@ -78,3 +79,25 @@ def test_sequential_reads( detailed_logger.info("Missing records") detailed_logger.log_json_list(missing_records) pytest.fail(msg) + + def test_sequential_reads( + self, + inputs: ConnectionTestConfig, + connector_config: SecretDict, + configured_catalog: ConfiguredAirbyteCatalog, + docker_runner: ConnectorRunner, + detailed_logger: Logger, + ): + ignored_fields = getattr(inputs, "ignored_fields") or {} + configured_catalog = full_refresh_only_catalog(configured_catalog) + output_1 = docker_runner.call_read(connector_config, configured_catalog) + records_1 = [message.record for message in output_1 if message.type == Type.RECORD] + + # sleep for 1 second to ensure that the emitted_at timestamp is different + time.sleep(1) + + output_2 = docker_runner.call_read(connector_config, configured_catalog) + records_2 = [message.record for message in output_2 if message.type == Type.RECORD] + + self.assert_emitted_at_increase_on_subsequent_runs(records_1, records_2) + self.assert_two_sequential_reads_produce_same_or_subset_records(records_1, records_2, configured_catalog, ignored_fields, detailed_logger) diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py index cf07f5d7e53a..c8d9c8329865 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py @@ -329,94 +329,6 @@ def test_read(schema, record, expectation): ) -@pytest.mark.parametrize( - "schema, records_1, records_2, expectation", - [ - ( - {"type": "object"}, - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), - ], - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), - ], - does_not_raise() - ), - ( - {"type": "object"}, - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), - ], - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), - ], - does_not_raise() - ), - ( - {"type": "object"}, - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), - ], - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), - ], - pytest.raises(AssertionError, match="emitted_at should increase on subsequent runs") - ), - ( - {"type": "object"}, - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), - ], - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 25}, emitted_at=112)), - ], - pytest.raises(AssertionError, match="records should be the same on subsequent runs") - ), - ( - {"type": "object"}, - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), - ], - [ - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), - AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), - ], - pytest.raises(AssertionError, match="records should be the same on subsequent runs") - ), - - ], -) -def test_emitted_at_increase_on_subsequent_runs(schema, records_1, records_2, expectation): - configured_catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), - sync_mode="full_refresh", - destination_sync_mode="overwrite", - ) - ] - ) - docker_runner_mock = MagicMock() - docker_runner_mock.call_read.side_effect = [records_1, records_2] - t = test_core.TestBasicRead() - with expectation: - t.test_emitted_at_increase_on_subsequent_runs( - connector_config=None, - configured_catalog=configured_catalog, - docker_runner=docker_runner_mock, - ) - - @pytest.mark.parametrize( "output, expect_trace_message_on_failure, should_fail", [ diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py index aa274f5b8cbd..98c8c518632c 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py @@ -24,11 +24,11 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig): ignored_fields: Dict[str, List[str]] = {"test_stream": ["ignore_me", "ignore_me_too"]} -def record_message_from_record(records: List[Dict]) -> List[AirbyteMessage]: +def record_message_from_record(records: List[Dict], emitted_at: int) -> List[AirbyteMessage]: return [ AirbyteMessage( type=Type.RECORD, - record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111), + record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=emitted_at), ) for record in records ] @@ -115,7 +115,7 @@ def test_read_with_ignore_fields(mocker, schema, record, expected_record, fail_c sequence_of_docker_callread_results, list(reversed(sequence_of_docker_callread_results)), ): - docker_runner_mock.call_read.side_effect = [record_message_from_record([first]), record_message_from_record([second])] + docker_runner_mock.call_read.side_effect = [record_message_from_record([first], emitted_at=111), record_message_from_record([second], emitted_at=112)] t = _TestFullRefresh() with fail_context: @@ -208,8 +208,8 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re docker_runner_mock = mocker.MagicMock() docker_runner_mock.call_read.side_effect = [ - record_message_from_record(first_read_records), - record_message_from_record(second_read_records), + record_message_from_record(first_read_records, emitted_at=111), + record_message_from_record(second_read_records, emitted_at=112), ] t = _TestFullRefresh() @@ -221,3 +221,69 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re docker_runner=docker_runner_mock, detailed_logger=mocker.MagicMock(), ) + + +@pytest.mark.parametrize( + "schema, records_1, records_2, expectation", + [ + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + pytest.raises(AssertionError, match="emitted_at should increase on subsequent runs") + ), + ], +) +def test_emitted_at_increase_on_subsequent_runs(mocker, schema, records_1, records_2, expectation): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + ] + ) + docker_runner_mock = mocker.MagicMock() + docker_runner_mock.call_read.side_effect = [records_1, records_2] + input_config = ReadTestConfigWithIgnoreFields() + + t = _TestFullRefresh() + with expectation: + t.test_sequential_reads( + inputs=input_config, + connector_config=mocker.MagicMock(), + configured_catalog=configured_catalog, + docker_runner=docker_runner_mock, + detailed_logger=docker_runner_mock, + ) From 8a59ca965be6a9ba900ebeedff4479d5d559b77a Mon Sep 17 00:00:00 2001 From: Ben Church Date: Mon, 6 Feb 2023 18:47:00 -0800 Subject: [PATCH 5/8] Fix format error --- .../connector_acceptance_test/tests/test_core.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 7336987d6674..20c878c9fbb6 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -5,8 +5,6 @@ import json import logging import re - - from collections import Counter, defaultdict from functools import reduce from logging import Logger @@ -828,8 +826,6 @@ def test_read( detailed_logger=detailed_logger, ) - - def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner): if not inputs.expect_trace_message_on_failure: pytest.skip("Skipping `test_airbyte_trace_message_on_failure` because `inputs.expect_trace_message_on_failure=False`") From 5e38d1e0f8bb97c9802e646847eb279c2810af08 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Tue, 7 Feb 2023 09:32:27 -0800 Subject: [PATCH 6/8] Fix flake test --- .../tests/test_full_refresh.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py index cda7a1896ea6..6fe49545ed4c 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py @@ -2,17 +2,17 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import time from collections import defaultdict from functools import partial from logging import Logger from typing import List, Mapping -import time import pytest from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type from connector_acceptance_test.base import BaseTest from connector_acceptance_test.config import ConnectionTestConfig -from connector_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, full_refresh_only_catalog, make_hashable +from connector_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, full_refresh_only_catalog, make_hashable from connector_acceptance_test.utils.json_schema_helper import CatalogField @@ -50,7 +50,9 @@ def assert_emitted_at_increase_on_subsequent_runs(self, first_read_records, seco assert max_emitted_at < min_emitted_at, "emitted_at should increase on subsequent runs" - def assert_two_sequential_reads_produce_same_or_subset_records(self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger): + def assert_two_sequential_reads_produce_same_or_subset_records( + self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger + ): records_by_stream_1 = defaultdict(list) for record in records_1: records_by_stream_1[record.stream].append(record.data) @@ -100,4 +102,6 @@ def test_sequential_reads( records_2 = [message.record for message in output_2 if message.type == Type.RECORD] self.assert_emitted_at_increase_on_subsequent_runs(records_1, records_2) - self.assert_two_sequential_reads_produce_same_or_subset_records(records_1, records_2, configured_catalog, ignored_fields, detailed_logger) + self.assert_two_sequential_reads_produce_same_or_subset_records( + records_1, records_2, configured_catalog, ignored_fields, detailed_logger + ) From a2c42d628ce76f1b21ca0d40ab0e082077b7db5d Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 8 Feb 2023 15:45:24 -0800 Subject: [PATCH 7/8] Update min and max names to reference run --- .../connector_acceptance_test/tests/test_full_refresh.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py index 6fe49545ed4c..dd25f390b18d 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py @@ -42,13 +42,13 @@ def assert_emitted_at_increase_on_subsequent_runs(self, first_read_records, seco assert first_read_records_data, "At least one record should be read using provided catalog" first_read_records_emitted_at = [record.emitted_at for record in first_read_records] - max_emitted_at = max(first_read_records_emitted_at) + max_emitted_at_first_read = max(first_read_records_emitted_at) second_read_reords_emitted_at = [record.emitted_at for record in second_read_records] - min_emitted_at = min(second_read_reords_emitted_at) + min_emitted_at_second_read = min(second_read_reords_emitted_at) - assert max_emitted_at < min_emitted_at, "emitted_at should increase on subsequent runs" + assert max_emitted_at_first_read < min_emitted_at_second_read, "emitted_at should increase on subsequent runs" def assert_two_sequential_reads_produce_same_or_subset_records( self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger From 6acf42b49185bdf55a8ee1eaf73e36779a65bcb4 Mon Sep 17 00:00:00 2001 From: Ben Church Date: Wed, 8 Feb 2023 17:20:50 -0800 Subject: [PATCH 8/8] Bump version --- .../bases/connector-acceptance-test/CHANGELOG.md | 5 ++++- .../bases/connector-acceptance-test/Dockerfile | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index 2d42bbe74995..71fa7b3f0b8b 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.5.2 +Check that `emitted_at` increases during subsequent reads. [#22291](https://github.com/airbytehq/airbyte/pull/22291) + ## 0.5.1 Fix discovered catalog caching for different configs. [#22301](https://github.com/airbytehq/airbyte/pull/22301) @@ -231,7 +234,7 @@ Add validation of input config.json against spec.json. ## 0.1.9 Add configurable validation of schema for all records in BasicRead test: [#4345](https://github.com/airbytehq/airbyte/pull/4345) -The validation is ON by default. +The validation is ON by default. To disable validation for the source you need to set `validate_schema: off` in the config file. ## 0.1.8 diff --git a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile index a198c17604b1..44c2b694f50f 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY connector_acceptance_test ./connector_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.5.1 +LABEL io.airbyte.version=0.5.2 LABEL io.airbyte.name=airbyte/connector-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"]