Skip to content

Commit

Permalink
Have StateBuilder return our actual state object and not simply a dict (
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored and jatinyadav-cc committed Feb 26, 2024
1 parent 5672e54 commit a012d2f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 149 deletions.
15 changes: 12 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@
from airbyte_cdk.exception_handler import assemble_uncaught_exception
from airbyte_cdk.logger import AirbyteLogFormatter
from airbyte_cdk.sources import Source
from airbyte_protocol.models import AirbyteLogMessage, AirbyteMessage, AirbyteStreamStatus, ConfiguredAirbyteCatalog, Level, TraceType, Type
from airbyte_protocol.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteStateMessage,
AirbyteStreamStatus,
ConfiguredAirbyteCatalog,
Level,
TraceType,
Type,
)
from pydantic.error_wrappers import ValidationError


Expand Down Expand Up @@ -104,7 +113,7 @@ def read(
source: Source,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Any] = None,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False,
) -> EntrypointOutput:
"""
Expand Down Expand Up @@ -133,7 +142,7 @@ def read(
args.extend(
[
"--state",
make_file(tmp_directory_path / "state.json", state),
make_file(tmp_directory_path / "state.json", f"[{','.join([stream_state.json() for stream_state in state])}]"),
]
)
args.append("--debug")
Expand Down
12 changes: 7 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/test/state_builder.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from typing import Any, Dict, List
from typing import Any, List

from airbyte_protocol.models import AirbyteStateMessage


class StateBuilder:
def __init__(self) -> None:
self._state: List[Dict[str, Any]] = []
self._state: List[AirbyteStateMessage] = []

def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
self._state.append({
self._state.append(AirbyteStateMessage.parse_obj({
"type": "STREAM",
"stream": {
"stream_state": state,
"stream_descriptor": {
"name": stream_name
}
}
})
}))
return self

def build(self) -> List[Dict[str, Any]]:
def build(self) -> List[AirbyteStateMessage]:
return self._state
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.test.state_builder import StateBuilder
from unit_tests.sources.file_based.helpers import LowHistoryLimitCursor
from unit_tests.sources.file_based.scenarios.file_based_source_builder import FileBasedSourceBuilder
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
Expand Down Expand Up @@ -39,17 +40,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"some_old_file.csv": "2023-06-01T03:54:07.000000Z"},
}).build(),
)
)
.set_expected_records(
Expand Down Expand Up @@ -140,17 +133,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
}).build(),
)
)
.set_expected_records(
Expand Down Expand Up @@ -223,17 +208,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-01T03:54:07.000000Z"},
}).build(),
)
)
.set_expected_records(
Expand Down Expand Up @@ -377,7 +354,7 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[],
input_state=StateBuilder().build(),
)
)
).build()
Expand Down Expand Up @@ -499,7 +476,7 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[],
input_state=StateBuilder().build(),
)
)
).build()
Expand Down Expand Up @@ -593,15 +570,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"}},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"recent_file.csv": "2023-07-15T23:59:59.000000Z"},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -731,7 +702,7 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[],
input_state=StateBuilder().build(),
)
)
).build()
Expand Down Expand Up @@ -891,7 +862,7 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[],
input_state=StateBuilder().build(),
)
)
).build()
Expand Down Expand Up @@ -1035,15 +1006,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {"history": {"a.csv": "2023-06-05T03:54:07.000000Z"}},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z"},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -1163,17 +1128,9 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {"a.csv": "2023-06-05T03:54:07.000000Z", "c.csv": "2023-06-06T03:54:07.000000Z"},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -1348,21 +1305,13 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"very_very_old_file.csv": "2023-06-01T03:54:07.000000Z",
"very_old_file.csv": "2023-06-02T03:54:07.000000Z",
"old_file_same_timestamp_as_a.csv": "2023-06-06T03:54:07.000000Z",
},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -1546,7 +1495,7 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[],
input_state=StateBuilder().build(),
)
)
).build()
Expand Down Expand Up @@ -1652,21 +1601,13 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"b.csv": "2023-06-05T03:54:07.000000Z",
"c.csv": "2023-06-05T03:54:07.000000Z",
"d.csv": "2023-06-05T03:54:07.000000Z",
},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -1794,21 +1735,13 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
"e.csv": "2023-06-08T03:54:07.000000Z",
},
}).build(),
)
)
).build()
Expand Down Expand Up @@ -1962,21 +1895,13 @@
)
.set_incremental_scenario_config(
IncrementalScenarioConfig(
input_state=[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {
"old_file.csv": "2023-06-05T00:00:00.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
},
"stream_descriptor": {"name": "stream1"},
},
}
],
input_state=StateBuilder().with_stream_state("stream1", {
"history": {
"old_file.csv": "2023-06-05T00:00:00.000000Z",
"c.csv": "2023-06-07T03:54:07.000000Z",
"d.csv": "2023-06-08T03:54:07.000000Z",
},
}).build(),
)
)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType
from airbyte_cdk.test.state_builder import StateBuilder
from unit_tests.sources.file_based.scenarios.scenario_builder import IncrementalScenarioConfig, TestScenarioBuilder
from unit_tests.sources.streams.concurrent.scenarios.stream_facade_builder import StreamFacadeSourceBuilder
from unit_tests.sources.streams.concurrent.scenarios.utils import MockStream
Expand Down Expand Up @@ -85,7 +86,7 @@
)


LEGACY_STATE = [{"type": "STREAM", "stream": {"stream_state": {"cursor_field": 0}, "stream_descriptor": {"name": "stream1"}}}]
LEGACY_STATE = StateBuilder().with_stream_state("stream1", {"cursor_field": 0}).build()
test_incremental_stream_without_slice_boundaries_with_legacy_state = (
TestScenarioBuilder()
.set_name("test_incremental_stream_without_slice_boundaries_with_legacy_state")
Expand Down Expand Up @@ -162,18 +163,10 @@
)


CONCURRENT_STATE = [
{
"type": "STREAM",
"stream": {
"stream_state": {
"slices": [{"start": 0, "end": 0}],
"state_type": ConcurrencyCompatibleStateType.date_range.value,
},
"stream_descriptor": {"name": "stream1"},
},
},
]
CONCURRENT_STATE = StateBuilder().with_stream_state("stream1", {
"slices": [{"start": 0, "end": 0}],
"state_type": ConcurrencyCompatibleStateType.date_range.value,
}).build()
test_incremental_stream_without_slice_boundaries_with_concurrent_state = (
TestScenarioBuilder()
.set_name("test_incremental_stream_without_slice_boundaries_with_concurrent_state")
Expand Down
Loading

0 comments on commit a012d2f

Please sign in to comment.