Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File cdk parser and cursor updates #28900

Merged
merged 4 commits into from
Aug 2, 2023
Merged

Conversation

clnoll
Copy link
Contributor

@clnoll clnoll commented Aug 1, 2023

What

This PR consists of a few updates to handle some issues encountered during integration testing.

  1. Updates the parquet parser so that we're returning all partitions. Unfortunately I haven't found a great way to add local test coverage for the exact but that was encountered, but it is covered by S3 CATs. I created an issue to add it at a later time.
  2. Updates the DefaultFileBasedCursor datetime format to agree with the format expected by CATs.
  3. Returns the cursor in the state message, as required by CATs.

Recommended reading order

  1. Parquet parser
  2. File-based cursor

@clnoll clnoll requested a review from a team as a code owner August 1, 2023 02:19
@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Aug 1, 2023
@clnoll clnoll requested review from girarda, brianjlai and maxi297 August 1, 2023 02:19
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly non-blocking comments!

But wanted to discuss the switch from milliseconds to seconds precision. Are we converting the entire file based CDK to the less precise type to satisfy the S3 CAT tests? Because if so it does feel a little bit backwards that we're reverting precision in the CDK to support a specific connector. Not taking CAT backwards compatibility into account, we do have the ability to add more transformations from a legacy config to the new config since we already have to do other ones.

@@ -13,7 +13,7 @@

class DefaultFileBasedCursor(FileBasedCursor):
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, okay i'll revert my start_date PR back to this then. I was originally converting back out to milliseconds

So do our CAT tests in general just not support anything more granular than seconds? I'm curious because I do see some configs where we use more granular precisions. Or is this specifically to retain the precision being used by S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey sorry, I took a closer look at where the granularity came from that was causing CATs to fail, and realized that we just need to be internally consistent between the cursor granularity and the records that we're emitting. I updated the code & tests to use %f with everyting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. thanks for checking!

schema = {field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format) for field in parquet_schema}
# Inferred partition schema
partition_columns = {x.split("=")[0]: {"type": "string"} for x in self._extract_partitions(file.uri)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can we use a more descriptive variable name for the current partition than x

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done.


@staticmethod
def _extract_partitions(filepath: str) -> List[str]:
return [unquote(x) for x in filepath.split(os.sep) if "=" in x]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done.

state = {
"history": self._file_to_datetime_history,
}
state = {"history": self._file_to_datetime_history, "_ab_source_file_last_modified": self._get_cursor()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your CAT testing that had discrepencies was _ab_source_file_last_modified always present in the state message even if the cursor was None? Or should it be omitted if so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that isn't actually covered by the CATs. Do you know what we usually do in that situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about setting it to datetime.min?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I'll leave it as-is for now, since we don't actually use the cursor field for deciding what to sync; it's just being used as a tool for integration tests.

@clnoll clnoll force-pushed the file-cdk-parser-and-cursor-updates branch from bb2ff24 to fef04b8 Compare August 1, 2023 22:14
@clnoll clnoll requested a review from brianjlai August 1, 2023 22:18
@clnoll clnoll force-pushed the file-cdk-parser-and-cursor-updates branch from fef04b8 to d921f11 Compare August 1, 2023 22:32
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me!

@@ -78,7 +78,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
parser = self.get_parser(self.config.file_type)
for file in stream_slice["files"]:
# only serialize the datetime once
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%SZ")
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets put this as a constant on the class just in case we do end up using time in multiple places

@clnoll clnoll merged commit 09ebb47 into master Aug 2, 2023
@clnoll clnoll deleted the file-cdk-parser-and-cursor-updates branch August 2, 2023 01:47
@clnoll clnoll restored the file-cdk-parser-and-cursor-updates branch August 2, 2023 01:48
bnchrch pushed a commit that referenced this pull request Aug 3, 2023
* File-based CDK: update parquet parser to handle partitions

* File-based CDK: make the record output & cursor date time format consistent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants