Skip to content

Commit

Permalink
[ISSUE #28782] support multiple cursor field datetime formats (#28936)
Browse files Browse the repository at this point in the history
* [ISSUE #28782] support multiple cursor field datetime formats

* Making sure we use the proper format for creating slices

* Code review
  • Loading branch information
maxi297 authored and bnchrch committed Aug 3, 2023
1 parent 57ed9a5 commit 44afab5
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ definitions:
examples:
- "2020-01-1T00:00:00Z"
- "{{ config['start_time'] }}"
cursor_datetime_formats:
title: Cursor Datetime Formats
description: The possible formats for the cursor field
type: array
items:
type: string
examples:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%d"
- "%s"
cursor_granularity:
title: Cursor Granularity
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import datetime
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
Expand Down Expand Up @@ -52,7 +52,7 @@ class DatetimeBasedCursor(Cursor):
datetime_format: str
config: Config
parameters: InitVar[Mapping[str, Any]]
_cursor: str = field(repr=False, default=None) # tracks current datetime
_cursor: Optional[str] = field(repr=False, default=None) # tracks current datetime
end_datetime: Optional[Union[MinMaxDatetime, str]] = None
step: Optional[Union[InterpolatedString, str]] = None
cursor_granularity: Optional[str] = None
Expand All @@ -62,8 +62,9 @@ class DatetimeBasedCursor(Cursor):
partition_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None
message_repository: Optional[MessageRepository] = None
cursor_datetime_formats: List[str] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if (self.step and not self.cursor_granularity) or (not self.step and self.cursor_granularity):
raise ValueError(
f"If step is defined, cursor_granularity should be as well and vice-versa. "
Expand Down Expand Up @@ -95,6 +96,9 @@ def __post_init__(self, parameters: Mapping[str, Any]):
if self.end_datetime and not self.end_datetime.datetime_format:
self.end_datetime.datetime_format = self.datetime_format

if not self.cursor_datetime_formats:
self.cursor_datetime_formats = [self.datetime_format]

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}

Expand Down Expand Up @@ -154,7 +158,7 @@ def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any])
return self.parse_date(stream_state[self.cursor_field.eval(self.config)])
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
def _format_datetime(self, dt: datetime.datetime) -> str:
return self._parser.format(dt, self.datetime_format)

def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]):
Expand Down Expand Up @@ -184,7 +188,12 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -
return comparator(cursor_date, default_date)

def parse_date(self, date: str) -> datetime.datetime:
return self._parser.parse(date, self.datetime_format)
for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
try:
return self._parser.parse(date, datetime_format)
except ValueError:
pass
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")

@classmethod
def _parse_timedelta(cls, time_str) -> Union[datetime.timedelta, Duration]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ class DatetimeBasedCursor(BaseModel):
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
title="Start Datetime",
)
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field",
title="Cursor Datetime Format",
)
cursor_granularity: Optional[str] = Field(
None,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config:

return DatetimeBasedCursor(
cursor_field=model.cursor_field,
cursor_datetime_formats=model.cursor_datetime_formats if model.cursor_datetime_formats else [],
cursor_granularity=model.cursor_granularity,
datetime_format=model.datetime_format,
end_datetime=end_datetime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,23 +388,24 @@ def test_close_slice(test_name, previous_cursor, stream_slice, latest_record_dat
cursor._cursor = previous_cursor
cursor.close_slice(stream_slice, Record(latest_record_data, stream_slice) if latest_record_data else None)
updated_state = cursor.get_stream_state()
assert expected_state == updated_state
assert updated_state == expected_state


def test_given_datetime_format_differs_from_cursor_value_when_close_slice_then_use_cursor_value_and_not_formatted_value():
def test_given_different_format_and_slice_is_highest_when_close_slice_then_slice_datetime_format():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime(datetime="2021-01-01T00:00:00.000000+0000", parameters={}),
cursor_field=cursor_field,
datetime_format="%Y-%m-%dT%H:%M:%S.%fZ",
cursor_datetime_formats=["%Y-%m-%d"],
config=config,
parameters={},
)

_slice = {}
record_cursor_value = "2023-01-04T17:30:19.000Z"
_slice = {"end_time": "2023-01-04T17:30:19.000Z"}
record_cursor_value = "2023-01-03"
cursor.close_slice(_slice, Record({cursor_field: record_cursor_value}, _slice))

assert cursor.get_stream_state()[cursor_field] == record_cursor_value
assert cursor.get_stream_state()[cursor_field] == "2023-01-04T17:30:19.000Z"


def test_given_partition_end_is_specified_and_greater_than_record_when_close_slice_then_use_partition_end():
Expand Down Expand Up @@ -508,7 +509,7 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
("test_parse_date_number", "20210101", "%Y%m%d", "P1D", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, date_format_granularity, expected_output_date):
def test_parse_date_legacy_merge_datetime_format_in_cursor_datetime_format(test_name, input_date, date_format, date_format_granularity, expected_output_date):
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", parameters={}),
Expand All @@ -524,6 +525,48 @@ def test_parse_date(test_name, input_date, date_format, date_format_granularity,
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_date, date_formats, expected_output_date",
[
(
"test_match_first_format",
"2021-01-01T00:00:00.000000+0000",
["%Y-%m-%dT%H:%M:%S.%f%z", "%s"],
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"test_match_second_format",
"1609459200",
["%Y-%m-%dT%H:%M:%S.%f%z", "%s"],
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_date(test_name, input_date, date_formats, expected_output_date):
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=date_formats,
config=config,
parameters={},
)
assert slicer.parse_date(input_date) == expected_output_date


def test_given_unknown_format_when_parse_date_then_raise_error():
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=["%Y-%m-%d", "%s"],
config=config,
parameters={},
)
with pytest.raises(ValueError):
slicer.parse_date("2021-01-01T00:00:00.000000+0000")


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, datetimeformat_granularity, expected_output",
[
Expand Down Expand Up @@ -575,6 +618,20 @@ def test_cursor_granularity_but_no_step():
)


def test_given_multiple_cursor_datetime_format_then_slice_using_first_format():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01", parameters={}),
end_datetime=MinMaxDatetime("2023-01-10", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"],
config=config,
parameters={},
)
stream_slices = cursor.stream_slices()
assert stream_slices == [{"start_time": "2021-01-01", "end_time": "2023-01-10"}]


def test_no_cursor_granularity_and_no_step_then_only_return_one_slice():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01", parameters={}),
Expand Down

0 comments on commit 44afab5

Please sign in to comment.