Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #20322] add datetime_granularity logic to DatetimeStreamSlicer… #20717

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ definitions:
- cursor_field
- end_datetime
- datetime_format
- datetime_format_granularity
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
- start_datetime
- step
properties:
Expand All @@ -340,6 +341,8 @@ definitions:
type: string
datetime_format:
type: string
datetime_format_granularity:
type: string
end_datetime:
anyOf:
- type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union

Expand All @@ -16,7 +15,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta
from isodate import parse_duration


@dataclass
Expand All @@ -27,16 +26,7 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
Given a start time, end time, a step function, and an optional lookback window,
the stream slicer will partition the date range from start time - lookback window to end time.

The step function is defined as a string of the form:
`"<number><unit>"`

where unit can be one of
- years, y
- months, m
- weeks, w
- days, d

For example, "1d" will produce windows of 1 day, and "2w" windows of 2 weeks.
The step function is defined as a string of the form ISO8601 duration

The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Expand All @@ -45,22 +35,24 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
Attributes:
start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
end_datetime (Union[MinMaxDatetime, str]): the datetime that determines the last record that should be synced
step (str): size of the timewindow
step (str): size of the timewindow (ISO8601 duration)
cursor_field (Union[InterpolatedString, str]): record's cursor field
datetime_format (str): format of the datetime
datetime_format_granularity (str): smallest increment the datetime_format has (ISO8601 duration)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
config (Config): connection config
start_time_option (Optional[RequestOption]): request option for start time
end_time_option (Optional[RequestOption]): request option for end time
stream_state_field_start (Optional[str]): stream slice start time field
stream_state_field_end (Optional[str]): stream slice end time field
lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for
lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
"""

start_datetime: Union[MinMaxDatetime, str]
end_datetime: Union[MinMaxDatetime, str]
step: str
cursor_field: Union[InterpolatedString, str]
datetime_format: str
datetime_format_granularity: str
config: Config
options: InitVar[Mapping[str, Any]]
_cursor: dict = field(repr=False, default=None) # tracks current datetime
Expand All @@ -71,10 +63,6 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
self.start_datetime = MinMaxDatetime(self.start_datetime, options)
Expand All @@ -85,6 +73,7 @@ def __post_init__(self, options: Mapping[str, Any]):
self._interpolation = JinjaInterpolation()

self._step = self._parse_timedelta(self.step)
self._datetime_format_granularity = self._parse_timedelta(self.datetime_format_granularity)
self.cursor_field = InterpolatedString.create(self.cursor_field, options=options)
self.stream_slice_field_start = InterpolatedString.create(self.stream_state_field_start or "start_time", options=options)
self.stream_slice_field_end = InterpolatedString.create(self.stream_state_field_end or "end_time", options=options)
Expand Down Expand Up @@ -144,7 +133,10 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
stream_state = stream_state or {}
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
if self.lookback_window and (evaluated_lookback_window := self.lookback_window.eval(self.config, **kwargs)):
lookback_delta = self._parse_timedelta(evaluated_lookback_window)
else:
lookback_delta = self._parse_timedelta("P0D")
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

earliest_possible_start_datetime = min(self.start_datetime.get_datetime(self.config, **kwargs), end_datetime)
cursor_datetime = self._calculate_cursor_datetime_from_state(stream_state)
Expand All @@ -154,7 +146,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) + datetime.timedelta(days=1)
return self.parse_date(stream_state[self.cursor_field.eval(self.config)])
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime):
Expand All @@ -165,7 +157,7 @@ def _partition_daterange(self, start, end, step: datetime.timedelta):
end_field = self.stream_slice_field_end.eval(self.config)
dates = []
while start <= end:
end_date = self._get_date(start + step - datetime.timedelta(days=1), end, min)
end_date = self._get_date(start + step - self._datetime_format_granularity, end, min)
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
start += step
return dates
Expand All @@ -180,17 +172,9 @@ def parse_date(self, date: str) -> datetime.datetime:
@classmethod
def _parse_timedelta(cls, time_str):
"""
Parse a time string e.g. (2h13m) into a timedelta object.
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
:param time_str: A string identifying a duration. (eg. 2h13m)
:return relativedelta: A relativedelta object
:return Parses an ISO 8601 durations into datetime.timedelta or Duration objects.
"""
parts = cls.timedelta_regex.match(time_str)

assert parts is not None

time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return relativedelta(**time_params)
return parse_duration(time_str)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

def get_request_params(
self,
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
# pinned to the last working version for us temporarily while we fix
"dataclasses-jsonschema==2.15.1",
"dpath~=2.0.1",
"isodate~=0.6.1",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
Expand Down
Loading