Skip to content

Commit

Permalink
[ISSUE #28782] support multiple cursor field datetime formats
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Aug 1, 2023
1 parent a68ea60 commit 9c3e2c1
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ definitions:
examples:
- "2020-01-1T00:00:00Z"
- "{{ config['start_time'] }}"
cursor_datetime_formats:
title: Cursor Datetime Format
description: The possible formats for the cursor field
type: array
items:
type: string
examples:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%d"
- "%s"
cursor_granularity:
title: Cursor Granularity
description:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import datetime
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
Expand Down Expand Up @@ -52,7 +52,7 @@ class DatetimeBasedCursor(Cursor):
datetime_format: str
config: Config
parameters: InitVar[Mapping[str, Any]]
_cursor: str = field(repr=False, default=None) # tracks current datetime
_cursor: Optional[str] = field(repr=False, default=None) # tracks current datetime
end_datetime: Optional[Union[MinMaxDatetime, str]] = None
step: Optional[Union[InterpolatedString, str]] = None
cursor_granularity: Optional[str] = None
Expand All @@ -62,8 +62,9 @@ class DatetimeBasedCursor(Cursor):
partition_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None
message_repository: Optional[MessageRepository] = None
cursor_datetime_formats: List[str] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if (self.step and not self.cursor_granularity) or (not self.step and self.cursor_granularity):
raise ValueError(
f"If step is defined, cursor_granularity should be as well and vice-versa. "
Expand Down Expand Up @@ -95,6 +96,9 @@ def __post_init__(self, parameters: Mapping[str, Any]):
if self.end_datetime and not self.end_datetime.datetime_format:
self.end_datetime.datetime_format = self.datetime_format

if not self.cursor_datetime_formats:
self.cursor_datetime_formats = [self.datetime_format]

def get_stream_state(self) -> StreamState:
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {}

Expand Down Expand Up @@ -154,17 +158,22 @@ def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any])
return self.parse_date(stream_state[self.cursor_field.eval(self.config)])
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
def _format_datetime(self, dt: datetime.datetime) -> str:
return self._parser.format(dt, self.datetime_format)

def _format_slice_datetime(self, dt: datetime.datetime) -> str:
if not self.cursor_datetime_formats:
raise RuntimeError("DatetimeBasedCursor can't operate without cursor_datetime_formats")
return self._parser.format(dt, self.cursor_datetime_formats[0])

def _partition_daterange(self, start: datetime.datetime, end: datetime.datetime, step: Union[datetime.timedelta, Duration]):
start_field = self.partition_field_start.eval(self.config)
end_field = self.partition_field_end.eval(self.config)
dates = []
while start <= end:
next_start = self._evaluate_next_start_date_safely(start, step)
end_date = self._get_date(next_start - self._cursor_granularity, end, min)
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
dates.append({start_field: self._format_slice_datetime(start), end_field: self._format_slice_datetime(end_date)})
start = next_start
return dates

Expand All @@ -184,7 +193,12 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -
return comparator(cursor_date, default_date)

def parse_date(self, date: str) -> datetime.datetime:
return self._parser.parse(date, self.datetime_format)
for datetime_format in self.cursor_datetime_formats:
try:
return self._parser.parse(date, datetime_format)
except ValueError:
pass
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")

@classmethod
def _parse_timedelta(cls, time_str) -> Union[datetime.timedelta, Duration]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ class DatetimeBasedCursor(BaseModel):
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
title="Start Datetime",
)
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field",
title="Cursor Datetime Format",
)
cursor_granularity: Optional[str] = Field(
None,
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def create_datetime_based_cursor(self, model: DatetimeBasedCursorModel, config:

return DatetimeBasedCursor(
cursor_field=model.cursor_field,
cursor_datetime_formats=model.cursor_datetime_formats if model.cursor_datetime_formats else [],
cursor_granularity=model.cursor_granularity,
datetime_format=model.datetime_format,
end_datetime=end_datetime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
("test_parse_date_number", "20210101", "%Y%m%d", "P1D", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, date_format_granularity, expected_output_date):
def test_parse_date_legacy_merge_datetime_format_in_cursor_datetime_format(test_name, input_date, date_format, date_format_granularity, expected_output_date):
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", parameters={}),
Expand All @@ -524,6 +524,48 @@ def test_parse_date(test_name, input_date, date_format, date_format_granularity,
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_date, date_formats, expected_output_date",
[
(
"test_match_first_format",
"2021-01-01T00:00:00.000000+0000",
["%Y-%m-%dT%H:%M:%S.%f%z", "%s"],
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"test_match_second_format",
"1609459200",
["%Y-%m-%dT%H:%M:%S.%f%z", "%s"],
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_date(test_name, input_date, date_formats, expected_output_date):
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=date_formats,
config=config,
parameters={},
)
assert slicer.parse_date(input_date) == expected_output_date


def test_given_unknown_format_when_parse_date_then_raise_error():
slicer = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=["%Y-%m-%d", "%s"],
config=config,
parameters={},
)
with pytest.raises(ValueError):
slicer.parse_date("2021-01-01T00:00:00.000000+0000")


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, datetimeformat_granularity, expected_output",
[
Expand Down Expand Up @@ -575,6 +617,20 @@ def test_cursor_granularity_but_no_step():
)


def test_given_multiple_cursor_datetime_format_then_slice_using_first_format():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01", parameters={}),
end_datetime=MinMaxDatetime("2023-01-10", parameters={}),
cursor_field=InterpolatedString(cursor_field, parameters={}),
datetime_format="%Y-%m-%d",
cursor_datetime_formats=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"],
config=config,
parameters={},
)
stream_slices = cursor.stream_slices()
assert stream_slices == [{"start_time": "2021-01-01T00:00:00", "end_time": "2023-01-10T00:00:00"}]


def test_no_cursor_granularity_and_no_step_then_only_return_one_slice():
cursor = DatetimeBasedCursor(
start_datetime=MinMaxDatetime("2021-01-01", parameters={}),
Expand Down

0 comments on commit 9c3e2c1

Please sign in to comment.