Skip to content

Commit

Permalink
Source S3: maintain backwards compatibility between V3 & V4 state mes…
Browse files Browse the repository at this point in the history
…sages
  • Loading branch information
clnoll committed Aug 11, 2023
1 parent 1a120ec commit 5e32482
Show file tree
Hide file tree
Showing 13 changed files with 426 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DefaultFileBasedCursor(AbstractFileBasedCursor):
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10_000
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
CURSOR_FIELD = "_ab_source_file_last_modified"

def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
super().__init__(stream_config)
Expand Down Expand Up @@ -48,7 +49,7 @@ def add_file(self, file: RemoteFile) -> None:
)

def get_state(self) -> StreamState:
state = {"history": self._file_to_datetime_history, "_ab_source_file_last_modified": self._get_cursor()}
state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
return state

def _get_cursor(self) -> Optional[str]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.1.2
LABEL io.airbyte.version=3.1.3
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ acceptance_tests:
future_state:
future_state_path: integration_tests/abnormal_state.json
timeout_seconds: 1800
- config_path: secrets/config.json
configured_catalog_path: integration_tests/configured_catalogs/csv.json
cursor_paths:
test:
- _ab_source_file_last_modified
future_state:
future_state_path: integration_tests/v4_abnormal_state.json # verify that the v3 connector can handle v4-style state messages
timeout_seconds: 1800
- config_path: secrets/parquet_config.json
configured_catalog_path: integration_tests/configured_catalogs/parquet.json
cursor_paths:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": {
"history": {
"simple_test.csv": "2021-07-25T15:33:04.000000Z",
"simple_test_2.csv": "2021-12-20T12:35:05.000000Z",
"redshift_result.csv": "2022-05-26T09:17:47.000000Z",
"redshift_result_3.csv": "2022-05-26T09:55:15.000000Z",
"redshift_result_2.csv": "2022-05-26T09:55:16.000000Z"
},
"_ab_source_file_last_modified": "2999-01-01T00:00:00.000000Z"
},
"stream_descriptor": { "name": "test" }
}
}
]
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 3.1.2
dockerImageTag: 3.1.3
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta
from functools import cached_property, lru_cache
from traceback import format_exc
Expand Down Expand Up @@ -49,6 +50,8 @@ class FileStream(Stream, ABC):
# In version 2.0.1 the datetime format has been changed. Since the state may still store values in the old datetime format,
# we need to support both of them for a while
deprecated_datetime_format_string = "%Y-%m-%dT%H:%M:%S%z"
# Handle the datetime format used in V4, in the event that we need to roll back
v4_datetime_format_string = "%Y-%m-%dT%H:%M:%S.%fZ"

def __init__(self, dataset: str, provider: dict, format: dict, path_pattern: str, schema: str = None):
"""
Expand Down Expand Up @@ -325,6 +328,7 @@ def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None
If there is no state, defaults to 1970-01-01 in order to pick up all files present.
The datetime object is localized to UTC to match the timezone of the last_modified attribute of objects in S3.
"""
stream_state = self._get_converted_stream_state(stream_state)
if stream_state is not None and self.cursor_field in stream_state.keys():
try:
state_datetime = datetime.strptime(stream_state[self.cursor_field], self.datetime_format_string)
Expand Down Expand Up @@ -416,7 +420,6 @@ def stream_slices(
yield from super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)

else:

# logic here is to bundle all files with exact same last modified timestamp together in each slice
prev_file_last_mod: datetime = None # init variable to hold previous iterations last modified
grouped_files_by_time: List[Dict[str, Any]] = []
Expand All @@ -437,3 +440,86 @@ def stream_slices(
else:
# in case we have no files
yield None

def _is_v4_state_format(self, stream_state: Optional[dict]) -> bool:
"""
Returns True if the stream_state is in the v4 format, otherwise False.
The stream_state is in the v4 format if the history dictionary is a map
of str to str (instead of str to list) and the cursor value is in the
format `%Y-%m-%dT%H:%M:%S.%fZ`
"""
if not stream_state:
return False
if history := stream_state.get("history"):
item = list(history.items())[0]
if isinstance(item[-1], str):
return True
else:
return False
if cursor := stream_state.get(self.cursor_field):
try:
datetime.strptime(cursor, self.v4_datetime_format_string)
except ValueError:
return False
else:
return True
return False

def _get_converted_stream_state(self, stream_state: Optional[dict]) -> dict:
"""
Transform the history from the new format to the old.
This will only be used in the event that we roll back from v4.
e.g.
{
"stream_name": {
"history": {
"simple_test.csv": "2022-05-26T17:49:11.000000Z",
"simple_test_2.csv": "2022-05-27T01:01:01.000000Z",
"redshift_result.csv": "2022-05-27T04:22:20.000000Z",
...
},
"_ab_source_file_last_modified": "2022-05-27T04:22:20.000000Z_redshift_result.csv"
}
}
=>
{
"stream_name": {
"history": {
"2022-05-26": ["simple_test.csv.csv"],
"2022-05-27": ["simple_test_2.csv", "redshift_result.csv"],
...
}
},
"_ab_source_file_last_modified": "2022-05-26T09:55:16Z"
}
"""
if not self._is_v4_state_format(stream_state):
return stream_state

converted_history = defaultdict(list)

for filename, timestamp in stream_state.get("history", {}).items():
if date_str := self._get_ts_from_millis_ts(timestamp, "%Y-%m-%d"):
converted_history[date_str].append(filename)

converted_state = {}
if self.cursor_field in stream_state:
timestamp_millis = stream_state[self.cursor_field].split("_")[0]
converted_state[self.cursor_field] = self._get_ts_from_millis_ts(timestamp_millis, self.datetime_format_string)
if "history" in stream_state:
converted_state["history"] = converted_history

return converted_state

def _get_ts_from_millis_ts(self, timestamp: Optional[str], output_format: str) -> Optional[str]:
if not timestamp:
return timestamp
try:
timestamp_millis = datetime.strptime(timestamp, self.v4_datetime_format_string)
except ValueError:
self.logger.warning(f"Unable to parse {timestamp} as v4 timestamp.")
return timestamp
return timestamp_millis.strftime(output_format)
5 changes: 3 additions & 2 deletions airbyte-integrations/connectors/source-s3/source_s3/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def filepath_iterator(self, stream_state: Mapping[str, Any] = None) -> Iterator[
"""
:yield: url filepath to use in S3File()
"""
stream_state = self._get_converted_stream_state(stream_state)
prefix = self._provider.get("path_prefix")
if prefix is None:
prefix = ""
Expand Down Expand Up @@ -66,7 +67,7 @@ def filepath_iterator(self, stream_state: Mapping[str, Any] = None) -> Iterator[
pass
else:
for file in content:
if self.is_not_folder(file) and self.filter_by_last_modified_date(file, stream_state):
if self.is_not_folder(file) and self._filter_by_last_modified_date(file, stream_state):
yield FileInfo(key=file["Key"], last_modified=file["LastModified"], size=file["Size"])
ctoken = response.get("NextContinuationToken", None)
if not ctoken:
Expand All @@ -76,7 +77,7 @@ def filepath_iterator(self, stream_state: Mapping[str, Any] = None) -> Iterator[
def is_not_folder(file) -> bool:
return not file["Key"].endswith("/")

def filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
def _filter_by_last_modified_date(self, file: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None):
cursor_date = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state else self.start_date

file_in_history_and_last_modified_is_earlier_than_cursor_value = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#

from .config import Config
from .cursor import Cursor
from .legacy_config_transformer import LegacyConfigTransformer
from .source import SourceS3
from .stream_reader import SourceS3StreamReader

__all__ = ["Config", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
__all__ = ["Config", "Cursor", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
68 changes: 68 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from typing import MutableMapping

from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamState


class Cursor(DefaultFileBasedCursor):
DATE_FORMAT = "%Y-%m-%d"
CURSOR_FIELD = "_ab_source_file_last_modified"

def set_initial_state(self, value: StreamState) -> None:
if self._is_legacy_state(value):
value = self._convert_legacy_state(value)
super().set_initial_state(value)

@staticmethod
def _is_legacy_state(value: StreamState) -> bool:
if not value:
return False
item = list(value.get("history", {}).keys())[0]
try:
datetime.strptime(item, Cursor.DATE_FORMAT)
except ValueError:
return False
return True

@staticmethod
def _convert_legacy_state(legacy_state: StreamState) -> MutableMapping[str, str]:
"""
Transform the history from the old state message format to the new.
e.g.
{
"2022-05-26": ["simple_test.csv.csv", "simple_test_2.csv"],
"2022-05-27": ["simple_test_2.csv", "redshift_result.csv"],
...
}
=>
{
"simple_test.csv": "2022-05-26T00:00:00.000000Z",
"simple_test_2.csv": "2022-05-27T00:00:00.000000Z",
"redshift_result.csv": "2022-05-27T00:00:00.000000Z",
...
}
"""
converted_history = {}

for date_str, filenames in legacy_state.get("history", {}).items():
date_obj = datetime.strptime(date_str, Cursor.DATE_FORMAT)

for filename in filenames:
if filename in converted_history:
if date_obj > datetime.strptime(converted_history[filename], DefaultFileBasedCursor.DATE_TIME_FORMAT):
converted_history[filename] = date_obj.strftime(DefaultFileBasedCursor.DATE_TIME_FORMAT)
else:
converted_history[filename] = date_obj.strftime(DefaultFileBasedCursor.DATE_TIME_FORMAT)

if converted_history:
filename, timestamp = max(converted_history.items(), key=lambda x: (x[1], x[0]))
cursor = f"{timestamp}_{filename}"
else:
cursor = None
return {"history": converted_history, "_ab_source_file_last_modified": cursor}
Loading

0 comments on commit 5e32482

Please sign in to comment.