Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start_date to all file-based configs #28845

Merged
merged 8 commits into from
Aug 8, 2023

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Jul 30, 2023

Closes #28136

What

Adds support for the start_date config option to be included for all file-based sources. The S3 connector implement this directly, but given a start_date is a pretty common behavior that all sources should implement.

How

Adds this to the Spec model so that it will be included in any generated spec and config validation for all sources. The start_date config is also used by the Cursor when we filter which files should be read or used for schema inference.

Recommended reading order

  1. abstract_file_based_spec.py
  2. default_file_based_cursor.py
  3. default_file_based_stream.py

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Jul 30, 2023
@brianjlai brianjlai changed the title add start_date config to abstract spec and apply it in the cursor Add start_date to all file-based configs Jul 30, 2023
files = self.list_files()
total_n_files = len(files)
all_files = self.list_files()
files_to_read = list(self._cursor.get_files_to_sync(all_files, self.logger))
Copy link
Contributor Author

@brianjlai brianjlai Jul 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is worth noting. The existing schema inference code never relied on the cursor (although it is defined by default). This modifies the flow a bit to also apply the cursor to filter out records for both discover and read.

A few alternative options I explored:

  • Using start_date in AbstractFileBasedStreamReader.filter_files_by_globs(). This has the nice benefit of always being applied even if the source implements their own FileBasedCursor. But it has the drawback of inserting more modified_at time comparisons across the codebase which I don't love
  • We also could just add a standalone check here to filter out files here or in self.list_files(), but the datetime checks are a more natural fit to be done in the cursor.

Up for discussion, but I think incorporating it here feels sensible since we do plan on a stateful discover at some point which will need the cursor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of thoughts -

I think I'd prefer not to restrict schema inference to only files that are supposed to be synced, because it will potentially be a totally normal case for there to just be 0 or 1 files that need to be synced, in which case schema inference will throw an exception (for the 0 files case) or only infer from the one file, which is more error prone.

That said, it makes sense to me to not include files from before start_date in schema inference.

So it's feeling to me like filter_files_by_globs would be a good place for it (although if we do that it we'd want to rename it). Also worth noting, if no files are returned from filter_files_by_globs, we currently fail check, because the implication is that we can't do schema inference and that there's a possible misconfiguration. If we incorporate a start_date check in there and there are no files before start_date, the failure might be useful to the user (i.e. don't have to wait until a discover/sync to see that their stream is empty).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's makes sense! That was the original approach that we had discussed, but this redesign is probably over-consolidating how we decide what files to pull from the file store. I will incorporate the feedback and revert back to old approach where we use filter_files_by_globs_and_start_date()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to filter_files_by_globs makes sense to me. The function would effectively become a filter on the name and timestamp, which seems reasonable.

@brianjlai brianjlai marked this pull request as ready for review July 30, 2023 21:39
@brianjlai brianjlai requested a review from a team as a code owner July 30, 2023 21:39
@brianjlai brianjlai requested review from girarda and clnoll July 30, 2023 21:39
description="UTC date and time in the format 2017-01-25T00:00:00Z. Any file modified before this date will not be replicated.",
examples=["2021-01-01T00:00:00Z"],
format="date-time",
pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a pattern descriptor whenever you specify a pattern? https://docs.airbyte.com/connector-development/connector-specification-reference#pattern-descriptors

self.logger.warn(
msg=f"Refusing to infer schema for all {total_n_files} files; using {max_n_files_for_schema_inference} files."
)

inferred_schema = self.infer_schema(files)
inferred_schema = self.infer_schema(files_to_read)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If files_to_read is empty, mind raising EMPTY_STREAM exception before we call self.infer_schema? It's a little more specific than the invalid schema exception.

files = self.list_files()
total_n_files = len(files)
all_files = self.list_files()
files_to_read = list(self._cursor.get_files_to_sync(all_files, self.logger))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of thoughts -

I think I'd prefer not to restrict schema inference to only files that are supposed to be synced, because it will potentially be a totally normal case for there to just be 0 or 1 files that need to be synced, in which case schema inference will throw an exception (for the 0 files case) or only infer from the one file, which is more error prone.

That said, it makes sense to me to not include files from before start_date in schema inference.

So it's feeling to me like filter_files_by_globs would be a good place for it (although if we do that it we'd want to rename it). Also worth noting, if no files are returned from filter_files_by_globs, we currently fail check, because the implication is that we can't do schema inference and that there's a possible misconfiguration. If we incorporate a start_date check in there and there are no files before start_date, the failure might be useful to the user (i.e. don't have to wait until a discover/sync to see that their stream is empty).

@@ -111,6 +114,8 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:

def _compute_start_time(self) -> datetime:
if not self._file_to_datetime_history:
# if self._config_start_date:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, or can it be deleted?

files = self.list_files()
total_n_files = len(files)
all_files = self.list_files()
files_to_read = list(self._cursor.get_files_to_sync(all_files, self.logger))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving to filter_files_by_globs makes sense to me. The function would effectively become a filter on the name and timestamp, which seems reasonable.

@@ -14,13 +14,15 @@
class DefaultFileBasedCursor(FileBasedCursor):
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
CONFIG_START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a different format for the config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3 connector stored them with milli/micro, so this was to retain backwards compatibility. But actually I think we're better off keeping consistent time format and the S3 config transformer can try to support the added precision. will fix this

@brianjlai brianjlai requested review from girarda and clnoll August 2, 2023 23:00
@@ -17,6 +17,16 @@ class AbstractFileBasedSpec(BaseModel):
that are needed when users configure a file-based source.
"""

start_date: Optional[str] = Field(
title="Start Date",
description="UTC date and time in the format 2017-01-25T00:00:00Z. Any file modified before this date will not be replicated.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the example should be 2017-01-25T00:00:00.000Z, right?

Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great @brianjlai! Just one small comment.

@brianjlai brianjlai merged commit 01045d6 into master Aug 8, 2023
@brianjlai brianjlai deleted the brian/file_based_config_start_date branch August 8, 2023 00:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

File CDK: add start_date config option
5 participants