-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): add and use file system abstraction in file source #8415
Conversation
@simaov thanks for the PR - I haven't given it a detailed look yet, but overall seems pretty nifty. I am wondering if using smart-open (https://pypi.org/project/smart-open/) or requests-file might yield a similar outcome with less code on our side. I think we have dependencies on both of those libraries in certain places already. |
Hi @hsheth2, thank you for your comment. Sorry, I am not very familiar with python ecosystem. I will have a look. Thank you. |
Hi @hsheth2, you are right, smart_open does what we need, so I used it to read data from different sources. Thank you. |
return [str(self.config.path)] | ||
def get_filenames(self) -> Iterable[FileStatus]: | ||
path_str = str(self.config.path) | ||
fs = FileSystem.get(path_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this overall feels a bit complex
is there a reason we can't use smart_open directly without building wrappers for each thing (e.g. http, local, s3)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not mistaken smart_open does not have methods to list path and get file info. It can open stream only. Currently there are 3 file systems: local, s3 and http. In the future, there can be more. And if someone need to add azure support, for instance, the only one thing that should be done is to implement FileSystem for Azure. Thats it.
@simaov you're right - I think we probably do need this sort of per-system implementation, and the complexity is warranted here @asikowitz will chime in with some more detailed comments, but the main things I'm thinking about here:
|
@hsheth2, thanks for your comments. I agree that we need to avoid hard dependency. Could you please share more details about registry and how it can be used? Maybe some examples or how it is used in project. |
@simaov here's where we're setting up the registry for sources:
In this case, we should just use |
Hi @hsheth2. I tried to address your comments:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall this looks pretty good
had a few questions about naming / config
class FileSystem(metaclass=ABCMeta): | ||
|
||
@classmethod | ||
def create_fs(cls) -> "FileSystem": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be more consistent to just call this create
also should this method be taking kwargs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
|
||
@dataclass | ||
class FileStatus: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should call this FileInfo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
raise NotImplementedError('File system implementations must implement "create_fs"') | ||
|
||
@abstractmethod | ||
def open(self, path: str, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to add a return type annotation here, or is it too messy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at this point we don't know actual return type and it depends on underlying implementation. smart_open says that open method returns A file-like object
return S3FileSystem() | ||
|
||
def open(self, path: str, **kwargs): | ||
transport_params = kwargs.update({'client': S3FileSystem._s3}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does an end user configure the s3 client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on boto3 docs there are 3 options. But anyway I added ability to configure S3FileSystem using Config object
@@ -273,15 +273,15 @@ def _iterate_file(self, file_status: FileStatus) -> Iterable[Tuple[int, Any]]: | |||
def iterate_mce_file(self, path: str) -> Iterator[MetadataChangeEvent]: | |||
schema = get_path_schema(path) | |||
fs_class = fs_registry.get(schema) | |||
fs = fs_class.create_fs() | |||
fs = fs_class.create() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll probably need to add a mechanism for passing config here from the recipe, but we can leave that for a follow up PR
87f2583
to
c57d12c
Compare
1ee9e09
to
3c2858f
Compare
@simaov looks like there's still a few small lint issues from isort
|
3c2858f
to
a47b5ea
Compare
@simaov I went ahead and fixed up the code here. It looked like the |
Hi @hsheth2. Thanks for fixed up the code. To be honest, the idea was to unify reads, because in general it does not matter what is the source, reading from file can also be stream read and we could avoid read mode. But I am ok with it. There is still one test failed. Is it related to changes that were made? |
@simaov doesn't look related. I just retriggered CI. |
The smoke tests persistently fail with this error, which suggests something is actually broken here. Still need to investigate further, but it seems plausibly related to this change.
|
Warning Rate limit exceeded@hsheth2 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 41 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe recent updates introduce a modular file system plugin architecture for Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (8)
- metadata-ingestion/setup.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/fs/fs_base.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/fs/http_fs.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/fs/local_fs.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py (1 hunks)
- metadata-ingestion/src/datahub/ingestion/source/file.py (7 hunks)
- metadata-ingestion/tests/unit/test_plugin_system.py (2 hunks)
Files not reviewed due to errors (2)
- metadata-ingestion/src/datahub/ingestion/fs/fs_base.py (no review received)
- metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py (no review received)
Files skipped from review due to trivial changes (2)
- metadata-ingestion/setup.py
- metadata-ingestion/tests/unit/test_plugin_system.py
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/fs/local_fs.py
16-16: Use context handler for opening files
(SIM115)
metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
57-57: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)
metadata-ingestion/src/datahub/ingestion/source/file.py
295-296: Replace
yield
overfor
loop withyield from
Replace with
yield from
(UP028)
Additional comments not posted (7)
metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py (1)
1-5
: Registry setup looks clean and efficient.The use of
PluginRegistry
forFileSystem
types and the registration from the entry pointdatahub.fs.plugins
is a standard and effective way to handle plugin architectures in Python. This approach supports extensibility and modular design.metadata-ingestion/src/datahub/ingestion/fs/http_fs.py (1)
14-15
: Use ofsmart_open
for file handling is appropriate.The use of
smart_open
enables efficient handling of different types of streams with a uniform API, which is beneficial for an HTTP file system implementation.metadata-ingestion/src/datahub/ingestion/source/file.py (5)
194-203
: Review: Implementation ofget_filenames
method.This method correctly utilizes the new file system abstraction to list files based on the schema derived from the path. The method is concise and effectively filters files based on the specified extension.
Line range hint
214-232
: Review:get_workunits_internal
method.The method effectively generates work units based on the file content and configuration. The use of
isinstance
for type checks and the conditional logic for aspect filtering are correctly implemented.Tools
Ruff
295-296: Replace
yield
overfor
loop withyield from
Replace with
yield from
(UP028)
266-287
: Suggest performance improvement for_iterate_file_streaming
.While the method is correct, consider optimizing the element counting process to reduce overhead, especially for large files. Current implementation reads through the entire file just to count elements, which might not be efficient.
Line range hint
97-135
: Review:FileSourceReport
class.This class is well-designed to track and report on the progress of file ingestion. The methods for updating times and counts are clear and effectively utilize Python's datetime for precision.
367-371
: Review: Static utility methodclose_if_possible
.This utility method correctly checks if the object has a close method before calling it, which is a good practice to avoid attribute errors.
else: | ||
raise Exception(f"Failed to process {path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling by providing more specific error information.
- raise Exception(f"Failed to process {path}")
+ raise FileNotFoundError(f"The specified path does not exist: {path}")
This change makes the error message more informative and specific, which aids in troubleshooting.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
else: | |
raise Exception(f"Failed to process {path}") | |
else: | |
raise FileNotFoundError(f"The specified path does not exist: {path}") |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- metadata-ingestion/src/datahub/ingestion/source/file.py (7 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/file.py
296-297: Replace
yield
overfor
loop withyield from
Replace with
yield from
(UP028)
# TODO: Remove this method, as it appears to be unused. | ||
schema = get_path_schema(path) | ||
fs_class = fs_registry.get(schema) | ||
fs = fs_class.create() | ||
file_status = fs.file_status(path) | ||
for obj in self._iterate_file(file_status): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider removing or refactoring the unused method iterate_mce_file
.
The method is marked with a TODO comment indicating it is unused. If it is indeed unused, it should be removed to clean up the codebase.
- def iterate_mce_file(self, path: str) -> Iterator[MetadataChangeEvent]:
- # TODO: Remove this method, as it appears to be unused.
- schema = get_path_schema(path)
- fs_class = fs_registry.get(schema)
- fs = fs_class.create()
- file_status = fs.file_status(path)
- for obj in self._iterate_file(file_status):
- mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
- yield mce
+ # Method removed as it was unused.
If this method is still needed for future compatibility or other reasons, consider documenting its purpose more clearly or refactoring it to fit the current architecture better.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# TODO: Remove this method, as it appears to be unused. | |
schema = get_path_schema(path) | |
fs_class = fs_registry.get(schema) | |
fs = fs_class.create() | |
file_status = fs.file_status(path) | |
for obj in self._iterate_file(file_status): | |
# Method removed as it was unused. |
def get_filenames(self) -> Iterable[FileInfo]: | ||
path_str = str(self.config.path) | ||
schema = get_path_schema(path_str) | ||
fs_class = fs_registry.get(schema) | ||
fs = fs_class.create() | ||
for file_info in fs.list(path_str): | ||
if file_info.is_file and file_info.path.endswith( | ||
self.config.file_extension | ||
): | ||
yield file_info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor the get_filenames
method to enhance clarity and performance.
The method is well-implemented but could benefit from a minor refactor to enhance clarity and reduce the number of lines.
- path_str = str(self.config.path)
- schema = get_path_schema(path_str)
- fs_class = fs_registry.get(schema)
- fs = fs_class.create()
- for file_info in fs.list(path_str):
+ schema = get_path_schema(str(self.config.path))
+ fs = fs_registry.get(schema).create()
+ for file_info in fs.list(self.config.path):
This change reduces the redundancy by removing the unnecessary path_str
variable and combines some operations into single lines.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_filenames(self) -> Iterable[FileInfo]: | |
path_str = str(self.config.path) | |
schema = get_path_schema(path_str) | |
fs_class = fs_registry.get(schema) | |
fs = fs_class.create() | |
for file_info in fs.list(path_str): | |
if file_info.is_file and file_info.path.endswith( | |
self.config.file_extension | |
): | |
yield file_info | |
def get_filenames(self) -> Iterable[FileInfo]: | |
schema = get_path_schema(str(self.config.path)) | |
fs = fs_registry.get(schema).create() | |
for file_info in fs.list(self.config.path): | |
if file_info.is_file and file_info.path.endswith( | |
self.config.file_extension | |
): | |
yield file_info |
def _iterate_file_batch(self, fp: Any) -> Iterable[Any]: | ||
# Read the file. | ||
contents = json.load(fp) | ||
|
||
# Maintain backwards compatibility with the single-object format. | ||
if isinstance(contents, list): | ||
for row in contents: | ||
yield row | ||
else: | ||
yield contents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utilize yield from
for improved readability and performance in _iterate_file_batch
.
The method can be optimized by using yield from
when iterating through lists.
- if isinstance(contents, list):
- for row in contents:
- yield row
- else:
- yield contents
+ yield from contents if isinstance(contents, list) else (contents,)
This change leverages yield from
for better performance and readability, as suggested by the static analysis tool.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _iterate_file_batch(self, fp: Any) -> Iterable[Any]: | |
# Read the file. | |
contents = json.load(fp) | |
# Maintain backwards compatibility with the single-object format. | |
if isinstance(contents, list): | |
for row in contents: | |
yield row | |
else: | |
yield contents | |
def _iterate_file_batch(self, fp: Any) -> Iterable[Any]: | |
# Read the file. | |
contents = json.load(fp) | |
# Maintain backwards compatibility with the single-object format. | |
yield from contents if isinstance(contents, list) else (contents,) |
Tools
Ruff
296-297: Replace
yield
overfor
loop withyield from
Replace with
yield from
(UP028)
Co-authored-by: oleksandrsimonchuk <oleksandr.si@appsflyer.com> Co-authored-by: Harshal Sheth <hsheth2@gmail.com> Co-authored-by: Tamas Nemeth <treff7es@gmail.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
…ahub-project#8415) Co-authored-by: oleksandrsimonchuk <oleksandr.si@appsflyer.com> Co-authored-by: Harshal Sheth <hsheth2@gmail.com> Co-authored-by: Tamas Nemeth <treff7es@gmail.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Checklist
Summary by CodeRabbit
New Features
FileInfo
class to improve file management.Improvements
FileInfo
class.close
andclose_if_possible
methods.Bug Fixes
get_filenames
method by returning an iterable ofFileInfo
objects instead of strings.