-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Allow HTTP object store for json/bson etc. #2784
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which part of the original issue remains unresolved?
I think we should have some tests...
// TODO: Download the contents of the file and handle using local | ||
// store (maybe outside of HTTP store). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is what I did in the sqlite
from cloud handler, if I understand things correctly.
Also it seems like, we probably don't want to rely on content-length headers being accurate...
The "content-length" part |
097bc7a
to
be09a6e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are definitely some edge cases around streaming and validation here that we should think about a bit before merging, but the code seems fine.
if u.as_str().contains('*') { | ||
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( | ||
"Unexpected status code '{}' for url: '{}'. \ | ||
Note that globbing is not supported for HTTP.", | ||
status, u, | ||
))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have imagined that we'd do this somewhere other than the check for the content length?
e_tag: None, | ||
version: None, | ||
}, | ||
range: 0..size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an array for every byte?
// TODO: Maybe write the byte stream to file? | ||
// Would only be useful when the returned bytes are too many. | ||
let contents = res.bytes().await?; | ||
|
||
let size = contents.len(); | ||
|
||
let stream = async_stream::stream! { | ||
let res: Result<Bytes, object_store::Error> = Ok(contents); | ||
yield res; | ||
}; | ||
|
||
let payload = GetResultPayload::Stream(Box::pin(stream)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my concern here is less that we should write it to a file (or not) but that we actually/unintentionally end up reading the whole stream into memory here... That's the main risk in my mind...
// TODO: Maybe write the byte stream to file? | ||
// Would only be useful when the returned bytes are too many. | ||
let contents = res.bytes().await?; | ||
let download_file_path = std::env::temp_dir().join(format!("{}", Uuid::new_v4())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what cleans this up? maybe use tmpdir package, which cleans up tempdirs when the tempdir object goes out of scope...
Resolves part of #2756 Signed-off-by: Vaibhav <vrongmeal@gmail.com>
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
e5879a2
to
b5edcca
Compare
download_file.seek(SeekFrom::Start(0)).await?; | ||
|
||
let stream = async_stream::stream! { | ||
// Delete when the stream is dropped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to call this with move? I've been tending to use the futures::stream::once()
rather than the macro to make this more clear, but if it's in an Arc
then I think it ends up working ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The macro expands to async move
. Moreover, the compiler would just error if it didn't here.
Fixes: #2756