diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 7e846736be25..5d36305edf7b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -624,6 +624,16 @@ definitions: examples: - "2020-01-1T00:00:00Z" - "{{ config['start_time'] }}" + cursor_datetime_formats: + title: Cursor Datetime Formats + description: The possible formats for the cursor field + type: array + items: + type: string + examples: + - "%Y-%m-%dT%H:%M:%S.%f%z" + - "%Y-%m-%d" + - "%s" cursor_granularity: title: Cursor Granularity description: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py index 15f3b88d91e6..685f0b7e6876 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py @@ -4,7 +4,7 @@ import datetime from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, Optional, Union from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser @@ -52,7 +52,7 @@ class DatetimeBasedCursor(Cursor): datetime_format: str config: Config parameters: InitVar[Mapping[str, Any]] - _cursor: str = field(repr=False, default=None) # tracks current datetime + _cursor: Optional[str] = field(repr=False, default=None) # tracks current datetime end_datetime: Optional[Union[MinMaxDatetime, str]] = None step: Optional[Union[InterpolatedString, str]] = None cursor_granularity: Optional[str] = None @@ -62,8 +62,9 @@ class DatetimeBasedCursor(Cursor): partition_field_end: Optional[str] = None lookback_window: Optional[Union[InterpolatedString, str]] = None message_repository: Optional[MessageRepository] = None + cursor_datetime_formats: List[str] = field(default_factory=lambda: []) - def __post_init__(self, parameters: Mapping[str, Any]): + def __post_init__(self, parameters: Mapping[str, Any]) -> None: if (self.step and not self.cursor_granularity) or (not self.step and self.cursor_granularity): raise ValueError( f"If step is defined, cursor_granularity should be as well and vice-versa. " @@ -95,6 +96,9 @@ def __post_init__(self, parameters: Mapping[str, Any]): if self.end_datetime and not self.end_datetime.datetime_format: self.end_datetime.datetime_format = self.datetime_format + if not self.cursor_datetime_formats: + self.cursor_datetime_formats = [self.datetime_format] + def get_stream_state(self) -> StreamState: return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} @@ -154,7 +158,7 @@ def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - def _format_datetime(self, dt: datetime.datetime): + def _format_datetime(self, dt: datetime.datetime) -> str: return self._parser.format(dt, self.datetime_format) def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]): @@ -184,7 +188,12 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) - return comparator(cursor_date, default_date) def parse_date(self, date: str) -> datetime.datetime: - return self._parser.parse(date, self.datetime_format) + for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: + try: + return self._parser.parse(date, datetime_format) + except ValueError: + pass + raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}") @classmethod def _parse_timedelta(cls, time_str) -> Union[datetime.timedelta, Duration]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 64c0164ac038..66419308e254 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -820,6 +820,11 @@ class DatetimeBasedCursor(BaseModel): examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"], title="Start Datetime", ) + cursor_datetime_formats: Optional[List[str]] = Field( + None, + description="The possible formats for the cursor field", + title="Cursor Datetime Format", + ) cursor_granularity: Optional[str] = Field( None, description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.", diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2aec0e32d554..98e866f3090e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -507,6 +507,7 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config: return DatetimeBasedCursor( cursor_field=model.cursor_field, + cursor_datetime_formats=model.cursor_datetime_formats if model.cursor_datetime_formats else [], cursor_granularity=model.cursor_granularity, datetime_format=model.datetime_format, end_datetime=end_datetime, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py index cd00561a8db3..67617a9f0124 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py @@ -388,23 +388,24 @@ def test_close_slice(test_name, previous_cursor, stream_slice, latest_record_dat cursor._cursor = previous_cursor cursor.close_slice(stream_slice, Record(latest_record_data, stream_slice) if latest_record_data else None) updated_state = cursor.get_stream_state() - assert expected_state == updated_state + assert updated_state == expected_state -def test_given_datetime_format_differs_from_cursor_value_when_close_slice_then_use_cursor_value_and_not_formatted_value(): +def test_given_different_format_and_slice_is_highest_when_close_slice_then_slice_datetime_format(): cursor = DatetimeBasedCursor( start_datetime=MinMaxDatetime(datetime="2021-01-01T00:00:00.000000+0000", parameters={}), cursor_field=cursor_field, datetime_format="%Y-%m-%dT%H:%M:%S.%fZ", + cursor_datetime_formats=["%Y-%m-%d"], config=config, parameters={}, ) - _slice = {} - record_cursor_value = "2023-01-04T17:30:19.000Z" + _slice = {"end_time": "2023-01-04T17:30:19.000Z"} + record_cursor_value = "2023-01-03" cursor.close_slice(_slice, Record({cursor_field: record_cursor_value}, _slice)) - assert cursor.get_stream_state()[cursor_field] == record_cursor_value + assert cursor.get_stream_state()[cursor_field] == "2023-01-04T17:30:19.000Z" def test_given_partition_end_is_specified_and_greater_than_record_when_close_slice_then_use_partition_end(): @@ -508,7 +509,7 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params, ("test_parse_date_number", "20210101", "%Y%m%d", "P1D", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)), ], ) -def test_parse_date(test_name, input_date, date_format, date_format_granularity, expected_output_date): +def test_parse_date_legacy_merge_datetime_format_in_cursor_datetime_format(test_name, input_date, date_format, date_format_granularity, expected_output_date): slicer = DatetimeBasedCursor( start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}), end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", parameters={}), @@ -524,6 +525,48 @@ def test_parse_date(test_name, input_date, date_format, date_format_granularity, assert expected_output_date == output_date +@pytest.mark.parametrize( + "test_name, input_date, date_formats, expected_output_date", + [ + ( + "test_match_first_format", + "2021-01-01T00:00:00.000000+0000", + ["%Y-%m-%dT%H:%M:%S.%f%z", "%s"], + datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + "test_match_second_format", + "1609459200", + ["%Y-%m-%dT%H:%M:%S.%f%z", "%s"], + datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ], +) +def test_parse_date(test_name, input_date, date_formats, expected_output_date): + slicer = DatetimeBasedCursor( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}), + cursor_field=InterpolatedString(cursor_field, parameters={}), + datetime_format="%Y-%m-%d", + cursor_datetime_formats=date_formats, + config=config, + parameters={}, + ) + assert slicer.parse_date(input_date) == expected_output_date + + +def test_given_unknown_format_when_parse_date_then_raise_error(): + slicer = DatetimeBasedCursor( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}), + cursor_field=InterpolatedString(cursor_field, parameters={}), + datetime_format="%Y-%m-%d", + cursor_datetime_formats=["%Y-%m-%d", "%s"], + config=config, + parameters={}, + ) + with pytest.raises(ValueError): + slicer.parse_date("2021-01-01T00:00:00.000000+0000") + + @pytest.mark.parametrize( "test_name, input_dt, datetimeformat, datetimeformat_granularity, expected_output", [ @@ -575,6 +618,20 @@ def test_cursor_granularity_but_no_step(): ) +def test_given_multiple_cursor_datetime_format_then_slice_using_first_format(): + cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime("2021-01-01", parameters={}), + end_datetime=MinMaxDatetime("2023-01-10", parameters={}), + cursor_field=InterpolatedString(cursor_field, parameters={}), + datetime_format="%Y-%m-%d", + cursor_datetime_formats=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"], + config=config, + parameters={}, + ) + stream_slices = cursor.stream_slices() + assert stream_slices == [{"start_time": "2021-01-01", "end_time": "2023-01-10"}] + + def test_no_cursor_granularity_and_no_step_then_only_return_one_slice(): cursor = DatetimeBasedCursor( start_datetime=MinMaxDatetime("2021-01-01", parameters={}),