Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code connectors] Extract datetime parser and handle %s format directive #15429

Merged
merged 13 commits into from
Aug 10, 2022
Merged
5 changes: 4 additions & 1 deletion airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Changelog

## 0.1.72
## 0.1.73
- Bugfix: Fix bug in DatetimeStreamSlicer's parsing method

## 0.1.72
- Bugfix: Fix bug in DatetimeStreamSlicer's format method

## 0.1.71
- Refactor declarative package to dataclasses
- Bugfix: Requester header always converted to string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Union


class DatetimeParser:
"""
Parses and formats datetime objects according to a specified format.

This class mainly acts as a wrapper to properly handling timestamp formatting through the "%s" directive.

%s is part of the list of format codes required by the 1989 C standard, but it is unreliable because it always return a datetime in the system's timezone.
Instead of using the directive directly, we can use datetime.fromtimestamp and dt.timestamp()
"""

def parse(self, date: Union[str, int], format: str, timezone):
# "%s" is a valid (but unreliable) directive for formatting, but not for parsing
# It is defined as
# The number of seconds since the Epoch, 1970-01-01 00:00:00+0000 (UTC). https://man7.org/linux/man-pages/man3/strptime.3.html
#
# The recommended way to parse a date from its timestamp representation is to use datetime.fromtimestamp
# See https://stackoverflow.com/a/4974930
if format == "%s":
return datetime.datetime.fromtimestamp(int(date), tz=timezone)
else:
return datetime.datetime.strptime(str(date), format).replace(tzinfo=timezone)

def format(self, dt: datetime.datetime, format: str) -> str:
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(format)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from dataclasses_jsonschema import JsonSchemaMixin

Expand Down Expand Up @@ -40,6 +41,7 @@ class MinMaxDatetime(JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.datetime = InterpolatedString.create(self.datetime, options=options or {})
self.timezone = dt.timezone.utc
self._parser = DatetimeParser()
self.min_datetime = InterpolatedString.create(self.min_datetime, options=options) if self.min_datetime else None
self.max_datetime = InterpolatedString.create(self.max_datetime, options=options) if self.max_datetime else None

Expand All @@ -57,17 +59,13 @@ def get_datetime(self, config, **additional_options) -> dt.datetime:
if not datetime_format:
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"

time = dt.datetime.strptime(str(self.datetime.eval(config, **additional_options)), datetime_format).replace(tzinfo=self._timezone)
time = self._parser.parse(str(self.datetime.eval(config, **additional_options)), datetime_format, self.timezone)

if self.min_datetime:
min_time = dt.datetime.strptime(str(self.min_datetime.eval(config, **additional_options)), datetime_format).replace(
tzinfo=self._timezone
)
min_time = self._parser.parse(str(self.min_datetime.eval(config, **additional_options)), datetime_format, self.timezone)
time = max(time, min_time)
if self.max_datetime:
max_time = dt.datetime.strptime(str(self.max_datetime.eval(config, **additional_options)), datetime_format).replace(
tzinfo=self._timezone
)
max_time = self._parser.parse(str(self.max_datetime.eval(config, **additional_options)), datetime_format, self.timezone)
time = min(time, max_time)
return time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
Expand Down Expand Up @@ -77,6 +78,7 @@ def __post_init__(self, options: Mapping[str, Any]):
self.cursor_field = InterpolatedString.create(self.cursor_field, options=options)
self.stream_slice_field_start = InterpolatedString.create(self.stream_state_field_start or "start_time", options=options)
self.stream_slice_field_end = InterpolatedString.create(self.stream_state_field_end or "end_time", options=options)
self._parser = DatetimeParser()

# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
if not self.start_datetime.datetime_format:
Expand Down Expand Up @@ -142,7 +144,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->

start_datetime = max(cursor_datetime, start_datetime)

state_date = self.parse_date(stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this used to call self.parse_date with a potential None value

state_cursor_value = stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state))

if state_cursor_value:
state_date = self.parse_date(state_cursor_value)
else:
state_date = None
if state_date:
# If the input_state's date is greater than start_datetime, the start of the time window is the state's next day
next_date = state_date + datetime.timedelta(days=1)
Expand All @@ -151,13 +158,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
return dates

def _format_datetime(self, dt: datetime.datetime):
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if self.datetime_format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(self.datetime_format)
return self._parser.format(dt, self.datetime_format)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract logic to the parser so it can be reused


def _partition_daterange(self, start, end, step: datetime.timedelta):
start_field = self.stream_slice_field_start.eval(self.config)
Expand All @@ -170,14 +171,11 @@ def _partition_daterange(self, start, end, step: datetime.timedelta):
return dates

def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -> datetime.datetime:
cursor_date = self.parse_date(cursor_value or default_date)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call parse_date because the cursor_value is already a datetime object

cursor_date = cursor_value or default_date
return comparator(cursor_date, default_date)

def parse_date(self, date: Union[str, datetime.datetime]) -> datetime.datetime:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method should not accept a datetime as argument.

if isinstance(date, str):
return datetime.datetime.strptime(str(date), self.datetime_format).replace(tzinfo=self._timezone)
else:
return date
def parse_date(self, date: str) -> datetime.datetime:
return self._parser.parse(date, self.datetime_format, self._timezone)

@classmethod
def _parse_timedelta(cls, time_str):
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.72",
version="0.1.73",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime

import pytest
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser


@pytest.mark.parametrize(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests are pasted from airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py

"test_name, input_date, date_format, expected_output_date",
[
(
"test_parse_date_iso",
"2021-01-01T00:00:00.000000+0000",
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"test_parse_timestamp",
"1609459200",
"%s",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
parser = DatetimeParser()
output_date = parser.parse(input_date, date_format, datetime.timezone.utc)
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, expected_output",
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
("test_format_to_number", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y%m%d", "20210101"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
parser = DatetimeParser()
output_date = parser.format(input_dt, datetimeformat)
assert expected_output == output_date
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,13 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
(
"test_parse_date_datetime",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datetime objects are not valid input to the parse method anymore

datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
"%Y%m%d",
"test_parse_timestamp",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test for parsing a timestamp to datetime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I reading this correctly that we removed a test case? if so why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada yes. datetime.datetime objects are not valid inputs anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense

"1609459200",
"%s",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
Expand All @@ -483,6 +483,7 @@ def test_parse_date(test_name, input_date, date_format, expected_output_date):
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
("test_format_to_number", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y%m%d", "20210101"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.72",
"airbyte-cdk~=0.1.73",
]

TEST_REQUIREMENTS = [
Expand Down