-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File-based CDK: add read mode to stream reader interface & parsers #28862
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple of questions around the ownership of the file_type and the stream_reader instantiation then we'll be good to 🚢
@@ -90,6 +91,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
""" | |||
try: | |||
parsed_config = self.spec_class(**config) | |||
self.stream_reader.config = parsed_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set the config as a stream_reader property instead of passing it to the methods that need it?
As it is, the stream_reader isn't fully initialized until we set its config, which is a non-obvious gotcha to keep in mind (similar to how we need to instantiate a stream's state separately from it's instantiation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion of that is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just copying the convo over to continue here
To give some context to other reviewers - the basic background here is that config information is needed by both the FileBasedSource and the StreamReader, but for different reasons; FileBasedSource needs it to configure streams, and the StreamReader needs it so that it knows how to, e.g. authenticate with the storage system.
In this PR, I'm allowing FileBasedSource to read the config from disk and parse it (like normal), and once parsed, the source sets the config on the StreamReader. However, the type of config accepted by FileBasedSource is AbstractFileBasedSpec, and it only cares about keys that are source-agnostic, whereas StreamReader cares about keys that are specific to the 3rd party that it's reading from. For example, S3 will be looking for a aws_access_token_id key, but FileBasedSource won't itself need that.
So that leads us to this situation, where the S3 StreamReader's config setter requires a config of type (S3) Config, but the interface config setter takes an AbstractFileBasedSpec, which doesn't have S3-specific keys. To solve this, we assert that we were given the correct Config type for our type of StreamReader.
One alternative route that I went down involved reading in the config prior to the initialization of the Source, so that it could be given to the StreamReader as an argument. Since the Source requires a StreamReader, it could get the config off of that. This is somewhat undesirable because we end up reading the config twice (because AbstractSource still reads it deep within the CDK code), and it also deviates from the pattern of letting the Source validate the config, which may have error handling behavior that we want to keep. For those two reasons I'd prefer to keep the code as-is. But I'm open to other opinions.
Thanks for adding the comment in the setter. The aspect Alex brought up makes sense, and then there's the drawback that every file based implementation will have to remember to check the instance type in their setter. But a better solution still isn't obvious to me (although I'm not sure if we would end up with a type mismatch that would still pass validation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably remove the property setter by passing the config as a parameter to open_file
, and get_matching_files
. That would however force us to verify the config in both methods instead of only in the setter.
I'm fine with leaving as-is then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think my preference would be to leave it as-is for now. It's a bit awkward but is not going to lead to an insidious issue - if the stream reader needs the config (which the S3 connector does) there will be a total failure if it's not set.
""" | ||
Utility method for extracting prefixes from the globs. | ||
""" | ||
prefixes = {glob.split("*")[0].rstrip("/") for glob in globs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was the rstrip just a bug? How do we know/verify this new behavior is the expected one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In S3, if the trailing "/" is removed, the search wont' be restricted to a directory. For instance, the prefix a/b
would match a/bbbb.csv
instead of "everything under directory b". So allowing the "/" just allows us to be more restrictive about which files are requested.
from typing import Optional | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class FileReadMode(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this enum be in file_type_parser
since it's a property of the parser?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. Moved.
@@ -42,48 +43,61 @@ def __init__( | |||
file_write_options: Mapping[str, Any], | |||
max_history_size: int, | |||
): | |||
# Attributes required for test purposes | |||
self.files = files | |||
self.file_type = file_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the S3 source have the same restriction of one file_type
per source?
I assume not, but it's not clear who owns the file_type
now that it isn't part of the RemoteFile
anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_type
is defined per-stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @girarda - responded to your questions. Since this PR was split off from one that includes changes to source-S3, some of the conversation is happening there.
@@ -90,6 +91,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
""" | |||
try: | |||
parsed_config = self.spec_class(**config) | |||
self.stream_reader.config = parsed_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion of that is here.
from typing import Optional | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class FileReadMode(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. Moved.
@@ -42,48 +43,61 @@ def __init__( | |||
file_write_options: Mapping[str, Any], | |||
max_history_size: int, | |||
): | |||
# Attributes required for test purposes | |||
self.files = files | |||
self.file_type = file_type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_type
is defined per-stream.
""" | ||
Utility method for extracting prefixes from the globs. | ||
""" | ||
prefixes = {glob.split("*")[0].rstrip("/") for glob in globs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In S3, if the trailing "/" is removed, the search wont' be restricted to a directory. For instance, the prefix a/b
would match a/bbbb.csv
instead of "everything under directory b". So allowing the "/" just allows us to be more restrictive about which files are requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the additional context @clnoll
Separates out the file-based CDK portion of #28786.
These changes were made during the development of the source-S3 connector, which is the first real connector to use the file-based CDK and so identified a few changes that were needed to the interfaces.