Skip to content

Commit

Permalink
[ISSUE #15628] apply lookback window on earliest datetime between sta… (
Browse files Browse the repository at this point in the history
#20156)

* [ISSUE #15628] apply lookback window on earliest datetime between start and cursor

* [ISSUE #15628] update release information and clean return statement
  • Loading branch information
maxi297 authored Dec 8, 2022
1 parent 3877df9 commit 50f22cd
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.12.4
Lookback window should applied when a state is supplied as well

## 0.12.3
Low-code: Finally, make `OffsetIncrement.page_size` interpolated string or int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,17 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta
start_datetime = min(start_datetime, end_datetime)
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
cursor_datetime = self.parse_date(stream_state[self.cursor_field.eval(self.config)])
else:
cursor_datetime = start_datetime

start_datetime = max(cursor_datetime, start_datetime)
earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config, **kwargs), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(stream_state)
start_datetime = max(earliest_possible_start_datetime, cursor_datetime) - lookback_delta

state_cursor_value = stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state))
return self._partition_daterange(start_datetime, end_datetime, self._step)

if state_cursor_value:
state_date = self.parse_date(state_cursor_value)
else:
state_date = None
if state_date:
# If the input_state's date is greater than start_datetime, the start of the time window is the state's next day
next_date = state_date + datetime.timedelta(days=1)
start_datetime = max(start_datetime, next_date)
dates = self._partition_daterange(start_datetime, end_datetime, self._step)
return dates
def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) + datetime.timedelta(days=1)
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
return self._parser.format(dt, self.datetime_format)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.12.3",
version="0.12.4",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
timezone = datetime.timezone.utc


class MockedNowDatetime(datetime.datetime):
@classmethod
def now(cls, tz=None):
return FAKE_NOW


@pytest.fixture()
def mock_datetime_now(monkeypatch):
datetime_mock = unittest.mock.MagicMock(wraps=datetime.datetime)
datetime_mock.now.return_value = FAKE_NOW
monkeypatch.setattr(datetime, "datetime", datetime_mock)
monkeypatch.setattr(datetime, "datetime", MockedNowDatetime)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -286,6 +290,22 @@ def mock_datetime_now(monkeypatch):
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_with_lookback_window_from_cursor",
{cursor_field: "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime(datetime="2021-01-01T00:00:00.000000+0000", options={}),
MinMaxDatetime(datetime="2021-01-06T00:00:00.000000+0000", options={}),
"1d",
cursor_field,
"3d",
datetime_format,
[
{"start_time": "2021-01-03T00:00:00.000000+0000", "end_time": "2021-01-03T00:00:00.000000+0000"},
{"start_time": "2021-01-04T00:00:00.000000+0000", "end_time": "2021-01-04T00:00:00.000000+0000"},
{"start_time": "2021-01-05T00:00:00.000000+0000", "end_time": "2021-01-05T00:00:00.000000+0000"},
{"start_time": "2021-01-06T00:00:00.000000+0000", "end_time": "2021-01-06T00:00:00.000000+0000"},
],
),
(
"test_with_lookback_window_defaults_to_0d",
{"date": "2021-01-05"},
Expand Down

0 comments on commit 50f22cd

Please sign in to comment.