diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index 2d42bbe74995..71fa7b3f0b8b 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.5.2 +Check that `emitted_at` increases during subsequent reads. [#22291](https://github.com/airbytehq/airbyte/pull/22291) + ## 0.5.1 Fix discovered catalog caching for different configs. [#22301](https://github.com/airbytehq/airbyte/pull/22301) @@ -231,7 +234,7 @@ Add validation of input config.json against spec.json. ## 0.1.9 Add configurable validation of schema for all records in BasicRead test: [#4345](https://github.com/airbytehq/airbyte/pull/4345) -The validation is ON by default. +The validation is ON by default. To disable validation for the source you need to set `validate_schema: off` in the config file. ## 0.1.8 diff --git a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile index a198c17604b1..44c2b694f50f 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY connector_acceptance_test ./connector_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.5.1 +LABEL io.airbyte.version=0.5.2 LABEL io.airbyte.name=airbyte/connector-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py index 7b3712637d32..dd25f390b18d 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_full_refresh.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import time from collections import defaultdict from functools import partial from logging import Logger @@ -36,24 +37,26 @@ def primary_keys_only(record, pks): @pytest.mark.default_timeout(20 * 60) class TestFullRefresh(BaseTest): - def test_sequential_reads( - self, - inputs: ConnectionTestConfig, - connector_config: SecretDict, - configured_catalog: ConfiguredAirbyteCatalog, - docker_runner: ConnectorRunner, - detailed_logger: Logger, + def assert_emitted_at_increase_on_subsequent_runs(self, first_read_records, second_read_records): + first_read_records_data = [record.data for record in first_read_records] + assert first_read_records_data, "At least one record should be read using provided catalog" + + first_read_records_emitted_at = [record.emitted_at for record in first_read_records] + max_emitted_at_first_read = max(first_read_records_emitted_at) + + second_read_reords_emitted_at = [record.emitted_at for record in second_read_records] + + min_emitted_at_second_read = min(second_read_reords_emitted_at) + + assert max_emitted_at_first_read < min_emitted_at_second_read, "emitted_at should increase on subsequent runs" + + def assert_two_sequential_reads_produce_same_or_subset_records( + self, records_1, records_2, configured_catalog, ignored_fields, detailed_logger ): - ignored_fields = getattr(inputs, "ignored_fields") or {} - configured_catalog = full_refresh_only_catalog(configured_catalog) - output = docker_runner.call_read(connector_config, configured_catalog) - records_1 = [message.record for message in output if message.type == Type.RECORD] records_by_stream_1 = defaultdict(list) for record in records_1: records_by_stream_1[record.stream].append(record.data) - output = docker_runner.call_read(connector_config, configured_catalog) - records_2 = [message.record for message in output if message.type == Type.RECORD] records_by_stream_2 = defaultdict(list) for record in records_2: records_by_stream_2[record.stream].append(record.data) @@ -78,3 +81,27 @@ def test_sequential_reads( detailed_logger.info("Missing records") detailed_logger.log_json_list(missing_records) pytest.fail(msg) + + def test_sequential_reads( + self, + inputs: ConnectionTestConfig, + connector_config: SecretDict, + configured_catalog: ConfiguredAirbyteCatalog, + docker_runner: ConnectorRunner, + detailed_logger: Logger, + ): + ignored_fields = getattr(inputs, "ignored_fields") or {} + configured_catalog = full_refresh_only_catalog(configured_catalog) + output_1 = docker_runner.call_read(connector_config, configured_catalog) + records_1 = [message.record for message in output_1 if message.type == Type.RECORD] + + # sleep for 1 second to ensure that the emitted_at timestamp is different + time.sleep(1) + + output_2 = docker_runner.call_read(connector_config, configured_catalog) + records_2 = [message.record for message in output_2 if message.type == Type.RECORD] + + self.assert_emitted_at_increase_on_subsequent_runs(records_1, records_2) + self.assert_two_sequential_reads_produce_same_or_subset_records( + records_1, records_2, configured_catalog, ignored_fields, detailed_logger + ) diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py index aa274f5b8cbd..98c8c518632c 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_test_full_refresh.py @@ -24,11 +24,11 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig): ignored_fields: Dict[str, List[str]] = {"test_stream": ["ignore_me", "ignore_me_too"]} -def record_message_from_record(records: List[Dict]) -> List[AirbyteMessage]: +def record_message_from_record(records: List[Dict], emitted_at: int) -> List[AirbyteMessage]: return [ AirbyteMessage( type=Type.RECORD, - record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111), + record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=emitted_at), ) for record in records ] @@ -115,7 +115,7 @@ def test_read_with_ignore_fields(mocker, schema, record, expected_record, fail_c sequence_of_docker_callread_results, list(reversed(sequence_of_docker_callread_results)), ): - docker_runner_mock.call_read.side_effect = [record_message_from_record([first]), record_message_from_record([second])] + docker_runner_mock.call_read.side_effect = [record_message_from_record([first], emitted_at=111), record_message_from_record([second], emitted_at=112)] t = _TestFullRefresh() with fail_context: @@ -208,8 +208,8 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re docker_runner_mock = mocker.MagicMock() docker_runner_mock.call_read.side_effect = [ - record_message_from_record(first_read_records), - record_message_from_record(second_read_records), + record_message_from_record(first_read_records, emitted_at=111), + record_message_from_record(second_read_records, emitted_at=112), ] t = _TestFullRefresh() @@ -221,3 +221,69 @@ def test_recordset_comparison(mocker, primary_key, first_read_records, second_re docker_runner=docker_runner_mock, detailed_logger=mocker.MagicMock(), ) + + +@pytest.mark.parametrize( + "schema, records_1, records_2, expectation", + [ + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=112)), + ], + does_not_raise() + ), + ( + {"type": "object"}, + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=111)), + ], + [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 23}, emitted_at=111)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"aa": 24}, emitted_at=112)), + ], + pytest.raises(AssertionError, match="emitted_at should increase on subsequent runs") + ), + ], +) +def test_emitted_at_increase_on_subsequent_runs(mocker, schema, records_1, records_2, expectation): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + ] + ) + docker_runner_mock = mocker.MagicMock() + docker_runner_mock.call_read.side_effect = [records_1, records_2] + input_config = ReadTestConfigWithIgnoreFields() + + t = _TestFullRefresh() + with expectation: + t.test_sequential_reads( + inputs=input_config, + connector_config=mocker.MagicMock(), + configured_catalog=configured_catalog, + docker_runner=docker_runner_mock, + detailed_logger=docker_runner_mock, + )