Skip to content

Commit

Permalink
File cdk parser and cursor updates (#28900)
Browse files Browse the repository at this point in the history
* File-based CDK: update parquet parser to handle partitions

* File-based CDK: make the record output & cursor date time format consistent
  • Loading branch information
clnoll authored Aug 2, 2023
1 parent 3365015 commit 09ebb47
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ class DatetimeBasedCursor(BaseModel):
cursor_datetime_formats: Optional[List[str]] = Field(
None,
description="The possible formats for the cursor field",
title="Cursor Datetime Format",
title="Cursor Datetime Formats",
)
cursor_granularity: Optional[str] = Field(
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import json
import logging
from typing import Any, Dict, Iterable, Mapping
import os
from typing import Any, Dict, Iterable, List, Mapping
from urllib.parse import unquote

import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -27,11 +29,16 @@ async def infer_schema(
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}")

# Pyarrow can detect the schema of a parquet file by reading only its metadata.
# https://github.com/apache/arrow/blob/main/python/pyarrow/_parquet.pyx#L1168-L1243
parquet_file = pq.ParquetFile(stream_reader.open_file(file, self.file_read_mode, logger))
parquet_schema = parquet_file.schema_arrow
with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
parquet_file = pq.ParquetFile(fp)
parquet_schema = parquet_file.schema_arrow

# Inferred non-partition schema
schema = {field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format) for field in parquet_schema}
# Inferred partition schema
partition_columns = {partition.split("=")[0]: {"type": "string"} for partition in self._extract_partitions(file.uri)}

schema.update(partition_columns)
return schema

def parse_records(
Expand All @@ -45,13 +52,18 @@ def parse_records(
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}") # FIXME test this branch!
with stream_reader.open_file(file, self.file_read_mode, logger) as fp:
table = pq.read_table(fp)
for batch in table.to_batches():
for i in range(batch.num_rows):
row_dict = {
column: ParquetParser._to_output_value(batch.column(column)[i], parquet_format) for column in table.column_names
}
yield row_dict
reader = pq.ParquetFile(fp)
partition_columns = {x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)}
for row_group in range(reader.num_row_groups):
batch_dict = reader.read_row_group(row_group).to_pydict()
for record_values in zip(*batch_dict.values()):
record = dict(zip(batch_dict.keys(), record_values))
record.update(partition_columns)
yield record

@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
return [unquote(partition) for partition in filepath.split(os.sep) if "=" in partition]

@property
def file_read_mode(self) -> FileReadMode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,21 @@ def add_file(self, file: RemoteFile) -> None:
)

def get_state(self) -> StreamState:
state = {
"history": self._file_to_datetime_history,
}
state = {"history": self._file_to_datetime_history, "_ab_source_file_last_modified": self._get_cursor()}
return state

def _get_cursor(self) -> Optional[str]:
"""
Returns the cursor value.
Files are synced in order of last-modified with secondary sort on filename, so the cursor value is
a string joining the last-modified timestamp of the last synced file and the name of the file.
"""
if self._file_to_datetime_history.items():
filename, timestamp = max(self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0]))
return f"{timestamp}_{filename}"
return None

def _is_history_full(self) -> bool:
"""
Returns true if the state's history is full, meaning new entries will start to replace old entries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
The default file-based stream.
"""

DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
ab_last_mod_col = "_ab_source_file_last_modified"
ab_file_name_col = "_ab_source_file_url"
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
Expand Down Expand Up @@ -78,7 +79,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
parser = self.get_parser(self.config.file_type)
for file in stream_slice["files"]:
# only serialize the datetime once
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%SZ")
file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT)
n_skipped = line_no = 0

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
"data": {
"col1": "val11",
"col2": 12,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -228,7 +228,7 @@
"data": {
"col1": "val21",
"col2": 22,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -282,7 +282,7 @@
"col_double": "20.02",
"col_string": "Robbers",
"col_album": {"album": "The 1975"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -292,7 +292,7 @@
"col_double": "20.23",
"col_string": "Somebody Else",
"col_album": {"album": "I Like It When You Sleep, for You Are So Beautiful yet So Unaware of It"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -302,7 +302,7 @@
"col_double": "1975.1975",
"col_string": "It's Not Living (If It's Not with You)",
"col_song": {"title": "Love It If We Made It"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand All @@ -312,7 +312,7 @@
"col_double": "5791.5791",
"col_string": "The 1975",
"col_song": {"title": "About You"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -407,7 +407,7 @@
"col_timestamp_micros": "2022-05-30T00:00:00.456789+00:00",
"col_local_timestamp_millis": "2022-05-29T00:00:00.456000",
"col_local_timestamp_micros": "2022-05-30T00:00:00.456789",
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand Down Expand Up @@ -488,7 +488,7 @@
"col_album": "A_MOMENT_APART",
"col_year": 2017,
"col_vocals": False,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -499,7 +499,7 @@
"col_album": "IN_RETURN",
"col_year": 2014,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -510,7 +510,7 @@
"col_album": "THE_LAST_GOODBYE",
"col_year": 2022,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -521,7 +521,7 @@
"col_album": "SUMMERS_GONE",
"col_year": 2012,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -532,7 +532,7 @@
"col_album": "IN_RETURN",
"col_year": 2014,
"col_vocals": True,
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "odesza_songs.avro",
},
"stream": "songs_stream",
Expand All @@ -542,7 +542,7 @@
"col_name": "Coachella",
"col_location": {"country": "USA", "state": "California", "city": "Indio"},
"col_attendance": 250000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -552,7 +552,7 @@
"col_name": "CRSSD",
"col_location": {"country": "USA", "state": "California", "city": "San Diego"},
"col_attendance": 30000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -562,7 +562,7 @@
"col_name": "Lightning in a Bottle",
"col_location": {"country": "USA", "state": "California", "city": "Buena Vista Lake"},
"col_attendance": 18000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand All @@ -572,7 +572,7 @@
"col_name": "Outside Lands",
"col_location": {"country": "USA", "state": "California", "city": "San Francisco"},
"col_attendance": 220000,
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "california_festivals.avro",
},
"stream": "festivals_stream",
Expand Down Expand Up @@ -653,7 +653,7 @@
"col_double": 20.02,
"col_string": "Robbers",
"col_album": {"album": "The 1975"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -663,7 +663,7 @@
"col_double": 20.23,
"col_string": "Somebody Else",
"col_album": {"album": "I Like It When You Sleep, for You Are So Beautiful yet So Unaware of It"},
"_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.avro",
},
"stream": "stream1",
Expand All @@ -673,7 +673,7 @@
"col_double": 1975.1975,
"col_string": "It's Not Living (If It's Not with You)",
"col_song": {"title": "Love It If We Made It"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand All @@ -683,7 +683,7 @@
"col_double": 5791.5791,
"col_string": "The 1975",
"col_song": {"title": "About You"},
"_ab_source_file_last_modified": "2023-06-06T03:54:07Z",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "b.avro",
},
"stream": "stream1",
Expand Down
Loading

0 comments on commit 09ebb47

Please sign in to comment.