-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid reading the entire file in ChunkedStore #4525
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,29 +138,37 @@ where | |
{ | ||
let delimiter = LineDelimiter::new(); | ||
|
||
futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move { | ||
loop { | ||
if let Some(next) = delimiter.next() { | ||
return Some((Ok(next), (s, delimiter))); | ||
} | ||
futures::stream::unfold( | ||
(s, delimiter, false), | ||
|(mut s, mut delimiter, mut exhausted)| async move { | ||
loop { | ||
if let Some(next) = delimiter.next() { | ||
return Some((Ok(next), (s, delimiter, exhausted))); | ||
} else if exhausted { | ||
return None; | ||
} | ||
|
||
match s.next().await { | ||
Some(Ok(bytes)) => delimiter.push(bytes), | ||
Some(Err(e)) => return Some((Err(e), (s, delimiter))), | ||
None => match delimiter.finish() { | ||
Ok(true) => return None, | ||
Ok(false) => continue, | ||
Err(e) => return Some((Err(e), (s, delimiter))), | ||
}, | ||
match s.next().await { | ||
Some(Ok(bytes)) => delimiter.push(bytes), | ||
Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))), | ||
None => { | ||
exhausted = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see this is to handle the case of a missing null terminator, for which there is a test added below 👍 |
||
match delimiter.finish() { | ||
Ok(true) => return None, | ||
Ok(false) => continue, | ||
Err(e) => return Some((Err(e), (s, delimiter, exhausted))), | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
}, | ||
) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use futures::stream::TryStreamExt; | ||
use futures::stream::{BoxStream, TryStreamExt}; | ||
|
||
#[test] | ||
fn test_delimiter() { | ||
|
@@ -209,6 +217,31 @@ mod tests { | |
futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s)))); | ||
let stream = newline_delimited_stream(input_stream); | ||
|
||
let results: Vec<_> = stream.try_collect().await.unwrap(); | ||
assert_eq!( | ||
results, | ||
vec![ | ||
Bytes::from("hello\nworld\n"), | ||
Bytes::from("bingo\n"), | ||
Bytes::from("cupcakes") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
] | ||
) | ||
} | ||
#[tokio::test] | ||
async fn test_delimiter_unfold_stream() { | ||
let input_stream: BoxStream<'static, Result<Bytes>> = futures::stream::unfold( | ||
VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]), | ||
|mut input| async move { | ||
if !input.is_empty() { | ||
Some((Ok(Bytes::from(input.pop_front().unwrap())), input)) | ||
} else { | ||
None | ||
} | ||
}, | ||
) | ||
.boxed(); | ||
let stream = newline_delimited_stream(input_stream); | ||
|
||
let results: Vec<_> = stream.try_collect().await.unwrap(); | ||
assert_eq!( | ||
results, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change necessary, canonicalize above will fail if the path doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a generalization of the old checking logic. When you check with
is_file()
, you can not read certain file types such as FIFO files. To support all kinds of readable files, one typically checks with!is_dir()
. You can get more context here at this clippy discussion.So this change is simply applying the modification above and turning the boolean
match
expression to anif
expression, which is slightly more idiomatic.Are you asking about
map_err
vsunwrap
? We weren't sure if an error is impossible here, so we added themap_err
just in case. I typically prefer doing this for the purposes of defensive programming:canonicalize
changes for any reason, we would still be producing a sensible error instead of panicking.canonicalize
, but then gets deleted in between two function calls. This is not a realistic case, but it is possible in theory.However, if you believe it reduces readability, we can go back to
unwrap
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thank you
I think it does, but I don't feel especially strongly 😅 . There are long term plans to revisit the error handling to use something like anyhow, which will clean this all up