Skip to content

Commit

Permalink
Catch empty state in incremental SAT (#22353)
Browse files Browse the repository at this point in the history
* Catch state being empty

* Update test_two_sequential_reads to catch empty state on first read

* Add integration test of empty state

* Fix legacy state test

* Move state_name to variable

* Clean up

* Format

* Fix rogue test
  • Loading branch information
bnchrch authored Feb 9, 2023
1 parent ddb80cd commit ce770d3
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def configured_catalog_for_incremental_fixture(configured_catalog) -> Configured
return catalog


def records_with_state(records, state, stream_mapping, state_cursor_paths) -> Iterable[Tuple[Any, Any]]:
def records_with_state(records, state, stream_mapping, state_cursor_paths) -> Iterable[Tuple[Any, Any, Any]]:
"""Iterate over records and return cursor value with corresponding cursor value from state"""
for record in records:
stream_name = record.record.stream
Expand Down Expand Up @@ -180,7 +180,13 @@ def test_two_sequential_reads(
latest_state = states_1[-1].state.data
state_input = states_1[-1].state.data

for record_value, state_value, stream_name in records_with_state(records_1, latest_state, stream_mapping, cursor_paths):
parsed_records_1 = list(records_with_state(records_1, latest_state, stream_mapping, cursor_paths))

# This catches the case of a connector that emits an invalid state that is not compatible with the schema
# See https://github.com/airbytehq/airbyte/issues/21863 to understand more
assert parsed_records_1, "At least one valid state should be produced, given a cursor path"

for record_value, state_value, stream_name in parsed_records_1:
assert (
record_value <= state_value
), f"First incremental sync should produce records younger or equal to cursor value from the state. Stream: {stream_name}"
Expand All @@ -197,7 +203,7 @@ def test_read_sequential_slices(
self, inputs: IncrementalConfig, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner
):
"""
Incremental test that makes calls the read method without a state checkpoint. Then we partition the results by stream and
Incremental test that makes calls to the read method without a state checkpoint. Then we partition the results by stream and
slice checkpoints.
Then we make additional read method calls using the state message and verify the correctness of the
messages in the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,96 @@ def test_incremental_two_sequential_reads(
)


@pytest.mark.parametrize(
"stream_name, cursor_type, cursor_paths, records1, records2, latest_state, expected_error",
[
(
"test_stream",
{
"dateCreated": {
"type": "string",
"format": "date-time"
}
},
{'test_stream': ['dateCreated']},
[{"dateCreated": "2020-01-01T01:01:01.000000Z"}, {"dateCreated": "2020-01-02T01:01:01.000000Z"}],
[],
{"dateCreated": "2020-01-02T01:01:01.000000Z"},
does_not_raise(),
),
(
"test_stream",
{
"dateCreated": {
"type": "string",
"format": "date-time"
}
},
{'test_stream': ['dateCreated']},
[{"dateCreated": "2020-01-01T01:01:01.000000Z"}, {"dateCreated": "2020-01-02T01:01:01.000000Z"}],
[],
{},
pytest.raises(AssertionError, match="At least one valid state should be produced, given a cursor path")
),
],
)
@pytest.mark.parametrize(
"run_per_stream_test",
[
pytest.param(False, id="test_two_sequential_reads_using_a_mock_connector_emitting_legacy_state"),
pytest.param(True, id="test_two_sequential_reads_using_a_mock_connector_emitting_per_stream_state"),
],
)
def test_incremental_two_sequential_reads_state_invalid(
stream_name, records1, records2, latest_state, cursor_type, cursor_paths, expected_error, run_per_stream_test
):
input_config = IncrementalConfig()
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name=stream_name,
json_schema={"type": "object", "properties": cursor_type},
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
default_cursor_field=["dateCreated"],
cursor_field=["dateCreated"],
)
]
)

if run_per_stream_test:
call_read_output_messages = [
*build_messages_from_record_data(stream_name, records1),
build_per_stream_state_message(descriptor=StreamDescriptor(name=stream_name), stream_state=latest_state),
]
else:
stream_state = dict()
stream_state[stream_name] = latest_state
call_read_output_messages = [
*build_messages_from_record_data(stream_name, records1),
build_state_message(stream_state),
]

call_read_with_state_output_messages = build_messages_from_record_data(stream_name, records2)

docker_runner_mock = MagicMock()
docker_runner_mock.call_read.return_value = call_read_output_messages
docker_runner_mock.call_read_with_state.return_value = call_read_with_state_output_messages

t = _TestIncremental()
with expected_error:
t.test_two_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog_for_incremental=catalog,
cursor_paths=cursor_paths,
docker_runner=docker_runner_mock,
)


@pytest.mark.parametrize(
"records, state_records, threshold_days, expected_error",
[
Expand Down

0 comments on commit ce770d3

Please sign in to comment.