Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File cdk parser and cursor updates #28900

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ class DatetimeBasedCursor(BaseModel):
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field",
title="Cursor Datetime Format",
title="Cursor Datetime Formats",
)
cursor_granularity: Optional[str] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import json
import logging
from typing import Any, Dict, Iterable, Mapping
import os
from typing import Any, Dict, Iterable, List, Mapping
from urllib.parse import unquote

import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -27,11 +29,16 @@ async def infer_schema(
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}")

# Pyarrow can detect the schema of a parquet file by reading only its metadata.
# https://github.com/apache/arrow/blob/main/python/pyarrow/_parquet.pyx#L1168-L1243
parquet_file = pq.ParquetFile(stream_reader.open_file(file, self.file_read_mode, logger))
parquet_schema = parquet_file.schema_arrow
with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
parquet_file = pq.ParquetFile(fp)
parquet_schema = parquet_file.schema_arrow

# Inferred non-partition schema
schema = {field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format) for field in parquet_schema}
# Inferred partition schema
partition_columns = {partition.split("=")[0]: {"type": "string"} for partition in self._extract_partitions(file.uri)}

schema.update(partition_columns)
return schema

def parse_records(
Expand All @@ -45,13 +52,18 @@ def parse_records(
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}") # FIXME test this branch!
with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
table = pq.read_table(fp)
for batch in table.to_batches():
for i in range(batch.num_rows):
row_dict = {
column: ParquetParser._to_output_value(batch.column(column)[i], parquet_format) for column in table.column_names
}
yield row_dict
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch_dict = reader.read_row_group(row_group).to_pydict()
for record_values in zip(*batch_dict.values()):
record = dict(zip(batch_dict.keys(), record_values))
record.update(partition_columns)
yield record

@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
return [unquote(partition) for partition in filepath.split(os.sep) if "=" in partition]

@property
def file_read_mode(self) -> FileReadMode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,21 @@ def add_file(self, file: RemoteFile) -> None:
)

def get_state(self) -> StreamState:
state = {
"history": self._file_to_datetime_history,
}
state = {"history": self._file_to_datetime_history, "_ab_source_file_last_modified": self._get_cursor()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your CAT testing that had discrepencies was _ab_source_file_last_modified always present in the state message even if the cursor was None? Or should it be omitted if so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that isn't actually covered by the CATs. Do you know what we usually do in that situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about setting it to datetime.min?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I'll leave it as-is for now, since we don't actually use the cursor field for deciding what to sync; it's just being used as a tool for integration tests.

return state

def _get_cursor(self) -> Optional[str]:
"""
Returns the cursor value.

Files are synced in order of last-modified with secondary sort on filename, so the cursor value is
a string joining the last-modified timestamp of the last synced file and the name of the file.
"""
if self._file_to_datetime_history.items():
filename, timestamp = max(self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0]))
return f"{timestamp}_{filename}"
return None

def _is_history_full(self) -> bool:
"""
Returns true if the state's history is full, meaning new entries will start to replace old entries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
The default file-based stream.
"""

DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
ab_file_name_col = "_ab_source_file_url"
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
Expand Down Expand Up @@ -78,7 +79,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
parser = self.get_parser(self.config.file_type)
for file in stream_slice["files"]:
# only serialize the datetime once
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%SZ")
file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
n_skipped = line_no = 0

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
"data": {
"col1": "val11",
"col2": 12,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -228,7 +228,7 @@
"data": {
"col1": "val21",
"col2": 22,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -282,7 +282,7 @@
"col_double": "20.02",
"col_string": "Robbers",
"col_album": {"album": "The 1975"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -292,7 +292,7 @@
"col_double": "20.23",
"col_string": "Somebody Else",
"col_album": {"album": "I Like It When You Sleep, for You Are So Beautiful yet So Unaware of It"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -302,7 +302,7 @@
"col_double": "1975.1975",
"col_string": "It's Not Living (If It's Not with You)",
"col_song": {"title": "Love It If We Made It"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand All @@ -312,7 +312,7 @@
"col_double": "5791.5791",
"col_string": "The 1975",
"col_song": {"title": "About You"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -407,7 +407,7 @@
"col_timestamp_micros": "2022-05-30T00:00:00.456789+00:00",
"col_local_timestamp_millis": "2022-05-29T00:00:00.456000",
"col_local_timestamp_micros": "2022-05-30T00:00:00.456789",
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -488,7 +488,7 @@
"col_album": "A_MOMENT_APART",
"col_year": 2017,
"col_vocals": False,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -499,7 +499,7 @@
"col_album": "IN_RETURN",
"col_year": 2014,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -510,7 +510,7 @@
"col_album": "THE_LAST_GOODBYE",
"col_year": 2022,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -521,7 +521,7 @@
"col_album": "SUMMERS_GONE",
"col_year": 2012,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -532,7 +532,7 @@
"col_album": "IN_RETURN",
"col_year": 2014,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -542,7 +542,7 @@
"col_name": "Coachella",
"col_location": {"country": "USA", "state": "California", "city": "Indio"},
"col_attendance": 250000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -552,7 +552,7 @@
"col_name": "CRSSD",
"col_location": {"country": "USA", "state": "California", "city": "San Diego"},
"col_attendance": 30000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -562,7 +562,7 @@
"col_name": "Lightning in a Bottle",
"col_location": {"country": "USA", "state": "California", "city": "Buena Vista Lake"},
"col_attendance": 18000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -572,7 +572,7 @@
"col_name": "Outside Lands",
"col_location": {"country": "USA", "state": "California", "city": "San Francisco"},
"col_attendance": 220000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand Down Expand Up @@ -653,7 +653,7 @@
"col_double": 20.02,
"col_string": "Robbers",
"col_album": {"album": "The 1975"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -663,7 +663,7 @@
"col_double": 20.23,
"col_string": "Somebody Else",
"col_album": {"album": "I Like It When You Sleep, for You Are So Beautiful yet So Unaware of It"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -673,7 +673,7 @@
"col_double": 1975.1975,
"col_string": "It's Not Living (If It's Not with You)",
"col_song": {"title": "Love It If We Made It"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand All @@ -683,7 +683,7 @@
"col_double": 5791.5791,
"col_string": "The 1975",
"col_song": {"title": "About You"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand Down
Loading