Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reading the entire file in ChunkedStore #4525

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ impl ListingTableUrl {
};

let path = std::path::Path::new(prefix).canonicalize()?;
let url = match path.is_file() {
true => Url::from_file_path(path).unwrap(),
false => Url::from_directory_path(path).unwrap(),
};
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {}", s)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary, canonicalize above will fail if the path doesn't exist?

Copy link
Contributor

@ozankabak ozankabak Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a generalization of the old checking logic. When you check with is_file(), you can not read certain file types such as FIFO files. To support all kinds of readable files, one typically checks with !is_dir(). You can get more context here at this clippy discussion.

So this change is simply applying the modification above and turning the boolean match expression to an if expression, which is slightly more idiomatic.

Are you asking about map_err vs unwrap? We weren't sure if an error is impossible here, so we added the map_err just in case. I typically prefer doing this for the purposes of defensive programming:

  • If what is now impossible becomes possible in the future because the code calling canonicalize changes for any reason, we would still be producing a sensible error instead of panicking.
  • It also guards against cases where the path is valid at the time you call canonicalize, but then gets deleted in between two function calls. This is not a realistic case, but it is possible in theory.

However, if you believe it reduces readability, we can go back to unwrap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support all kinds of readable files, one typically checks with !is_dir(). You can get more context here at this clippy discussion.

Makes sense, thank you

However, if you believe it reduces readability, we can go back to unwrap.

I think it does, but I don't feel especially strongly 😅 . There are long term plans to revisit the error handling to use something like anyhow, which will clean this all up

// TODO: Currently we do not have an IO-related error variant that accepts ()
// or a string. Once we have such a variant, change the error type above.

Ok(Self::new(url, glob))
}
Expand Down
26 changes: 23 additions & 3 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,43 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use rstest::*;

use super::*;

#[tokio::test]
async fn avro_exec_without_partition() -> Result<()> {
test_with_stores(Arc::new(LocalFileSystem::new())).await
}

#[rstest]
#[tokio::test]
async fn test_chunked_avro(
#[values(10, 20, 30, 40)] chunk_size: usize,
) -> Result<()> {
test_with_stores(Arc::new(ChunkedStore::new(
Arc::new(LocalFileSystem::new()),
chunk_size,
)))
.await
}

async fn test_with_stores(store: Arc<dyn ObjectStore>) -> Result<()> {
let ctx = SessionContext::new();
ctx.runtime_env()
.register_object_store("file", "", store.clone());

let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let store = Arc::new(LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(filename);

let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
Expand All @@ -239,8 +261,6 @@ mod tests {
output_ordering: None,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let ctx = SessionContext::new();
let mut results = avro_exec
.execute(0, ctx.task_ctx())
.expect("plan execution failed");
Expand Down
96 changes: 76 additions & 20 deletions datafusion/core/src/physical_plan/file_format/chunked_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use async_trait::async_trait;
use bytes::Bytes;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::path::Path;
Expand All @@ -25,7 +25,7 @@ use object_store::{MultipartId, Result};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;
use tokio::io::{AsyncReadExt, AsyncWrite, BufReader};

/// Wraps a [`ObjectStore`] and makes its get response return chunks
///
Expand Down Expand Up @@ -70,24 +70,78 @@ impl ObjectStore for ChunkedStore {
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let bytes = self.inner.get(location).await?.bytes().await?;
let mut offset = 0;
let chunk_size = self.chunk_size;

Ok(GetResult::Stream(
futures::stream::iter(std::iter::from_fn(move || {
let remaining = bytes.len() - offset;
if remaining == 0 {
return None;
}
let to_read = remaining.min(chunk_size);
let next_offset = offset + to_read;
let slice = bytes.slice(offset..next_offset);
offset = next_offset;
Some(Ok(slice))
}))
.boxed(),
))
match self.inner.get(location).await? {
GetResult::File(std_file, ..) => {
let file = tokio::fs::File::from_std(std_file);
let reader = BufReader::new(file);
Ok(GetResult::Stream(
futures::stream::unfold(
(reader, self.chunk_size),
|(mut reader, chunk_size)| async move {
let mut buffer = BytesMut::zeroed(chunk_size);
let size = reader.read(&mut buffer).await.map_err(|e| {
object_store::Error::Generic {
store: "ChunkedStore",
source: Box::new(e),
}
});
match size {
Ok(0) => None,
Ok(value) => Some((
Ok(buffer.split_to(value).freeze()),
(reader, chunk_size),
)),
Err(e) => Some((Err(e), (reader, chunk_size))),
}
},
)
.boxed(),
))
}
GetResult::Stream(stream) => {
let buffer = BytesMut::new();
Ok(GetResult::Stream(
futures::stream::unfold(
(stream, buffer, false, self.chunk_size),
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
// Keep accumulating bytes until we reach capacity as long as
// the stream can provide them:
if exhausted {
return None;
}
while buffer.len() < chunk_size {
match stream.next().await {
None => {
exhausted = true;
let slice = buffer.split_off(0).freeze();
return Some((
Ok(slice),
(stream, buffer, exhausted, chunk_size),
));
}
Some(Ok(bytes)) => {
buffer.put(bytes);
}
Some(Err(e)) => {
return Some((
Err(object_store::Error::Generic {
store: "ChunkedStore",
source: Box::new(e),
}),
(stream, buffer, exhausted, chunk_size),
))
}
};
}
// Return the chunked values as the next value in the stream
let slice = buffer.split_to(chunk_size).freeze();
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
},
)
.boxed(),
))
}
}
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
Expand Down Expand Up @@ -125,7 +179,9 @@ impl ObjectStore for ChunkedStore {
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;

#[tokio::test]
async fn test_chunked() {
Expand Down
85 changes: 46 additions & 39 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,40 @@ mod tests {
Ok(schema)
}

async fn test_additional_stores(
file_compression_type: FileCompressionType,
store: Arc<dyn ObjectStore>,
) {
let ctx = SessionContext::new();
ctx.runtime_env()
.register_object_store("file", "", store.clone());

let task_ctx = ctx.task_ctx();

let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";

let file_groups = partitioned_file_groups(
path.as_str(),
filename,
1,
FileType::CSV,
file_compression_type.to_owned(),
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned());

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();

let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();

assert_eq!(total_rows, 100);
}

#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
Expand All @@ -567,45 +601,18 @@ mod tests {
case(FileCompressionType::XZ)
)]
#[tokio::test]
async fn test_chunked(file_compression_type: FileCompressionType) {
let ctx = SessionContext::new();
let chunk_sizes = [10, 20, 30, 40];

for chunk_size in chunk_sizes {
ctx.runtime_env().register_object_store(
"file",
"",
Arc::new(ChunkedStore::new(
Arc::new(LocalFileSystem::new()),
chunk_size,
)),
);

let task_ctx = ctx.task_ctx();

let file_schema = aggr_test_schema();
let path = format!("{}/csv", arrow_test_data());
let filename = "aggregate_test_100.csv";

let file_groups = partitioned_file_groups(
path.as_str(),
filename,
1,
FileType::CSV,
file_compression_type.to_owned(),
)
.unwrap();

let config = partitioned_csv_config(file_schema, file_groups).unwrap();
let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned());

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();

let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();

assert_eq!(total_rows, 100);
}
async fn test_chunked_csv(
file_compression_type: FileCompressionType,
#[values(10, 20, 30, 40)] chunk_size: usize,
) {
test_additional_stores(
file_compression_type,
Arc::new(ChunkedStore::new(
Arc::new(LocalFileSystem::new()),
chunk_size,
)),
)
.await;
}

#[tokio::test]
Expand Down
65 changes: 49 additions & 16 deletions datafusion/core/src/physical_plan/file_format/delimited_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,29 +138,37 @@ where
{
let delimiter = LineDelimiter::new();

futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
loop {
if let Some(next) = delimiter.next() {
return Some((Ok(next), (s, delimiter)));
}
futures::stream::unfold(
(s, delimiter, false),
|(mut s, mut delimiter, mut exhausted)| async move {
loop {
if let Some(next) = delimiter.next() {
return Some((Ok(next), (s, delimiter, exhausted)));
} else if exhausted {
return None;
}

match s.next().await {
Some(Ok(bytes)) => delimiter.push(bytes),
Some(Err(e)) => return Some((Err(e), (s, delimiter))),
None => match delimiter.finish() {
Ok(true) => return None,
Ok(false) => continue,
Err(e) => return Some((Err(e), (s, delimiter))),
},
match s.next().await {
Some(Ok(bytes)) => delimiter.push(bytes),
Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
None => {
exhausted = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is to handle the case of a missing null terminator, for which there is a test added below 👍

match delimiter.finish() {
Ok(true) => return None,
Ok(false) => continue,
Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
}
}
}
}
}
})
},
)
}

#[cfg(test)]
mod tests {
use super::*;
use futures::stream::TryStreamExt;
use futures::stream::{BoxStream, TryStreamExt};

#[test]
fn test_delimiter() {
Expand Down Expand Up @@ -209,6 +217,31 @@ mod tests {
futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
let stream = newline_delimited_stream(input_stream);

let results: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(
results,
vec![
Bytes::from("hello\nworld\n"),
Bytes::from("bingo\n"),
Bytes::from("cupcakes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

]
)
}
#[tokio::test]
async fn test_delimiter_unfold_stream() {
let input_stream: BoxStream<'static, Result<Bytes>> = futures::stream::unfold(
VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
|mut input| async move {
if !input.is_empty() {
Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
} else {
None
}
},
)
.boxed();
let stream = newline_delimited_stream(input_stream);

let results: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(
results,
Expand Down
Loading