Skip to content

Commit

Permalink
Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935) (#…
Browse files Browse the repository at this point in the history
…2936)

* Add streaming JSON and CSV (#2935)

* Add license header

* Review feedback

* Add license header

* Review feedback
  • Loading branch information
tustvold authored Jul 18, 2022
1 parent b772c6d commit 944ef3d
Show file tree
Hide file tree
Showing 5 changed files with 463 additions and 12 deletions.
140 changes: 140 additions & 0 deletions datafusion/core/src/physical_plan/file_format/chunked_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::path::Path;
use object_store::Result;
use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;

/// Wraps a [`ObjectStore`] and makes its get response return chunks
///
/// TODO: Upstream into object_store_rs
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: usize,
}

impl ChunkedStore {
pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
Self { inner, chunk_size }
}
}

impl Display for ChunkedStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ChunkedStore({})", self.inner)
}
}

#[async_trait]
impl ObjectStore for ChunkedStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.inner.put(location, bytes).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let bytes = self.inner.get(location).await?.bytes().await?;
let mut offset = 0;
let chunk_size = self.chunk_size;

Ok(GetResult::Stream(
futures::stream::iter(std::iter::from_fn(move || {
let remaining = bytes.len() - offset;
if remaining == 0 {
return None;
}
let to_read = remaining.min(chunk_size);
let next_offset = offset + to_read;
let slice = bytes.slice(offset..next_offset);
offset = next_offset;
Some(Ok(slice))
}))
.boxed(),
))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.inner.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.inner.list(prefix).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;

#[tokio::test]
async fn test_chunked() {
let location = Path::parse("test").unwrap();
let store = Arc::new(InMemory::new());
store
.put(&location, Bytes::from(vec![0; 1001]))
.await
.unwrap();

for chunk_size in [10, 20, 31] {
let store = ChunkedStore::new(store.clone(), chunk_size);
let mut s = match store.get(&location).await.unwrap() {
GetResult::Stream(s) => s,
_ => unreachable!(),
};

let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len();
let expected = remaining.min(chunk_size);
assert_eq!(size, expected);
remaining -= expected;
}
assert_eq!(remaining, 0);
}
}
}
54 changes: 48 additions & 6 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::physical_plan::{
};

use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
};
Expand Down Expand Up @@ -167,12 +168,12 @@ struct CsvConfig {
}

impl CsvConfig {
fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
fn open<R: std::io::Read>(&self, reader: R, first_chunk: bool) -> csv::Reader<R> {
let datetime_format = None;
csv::Reader::new(
reader,
Arc::clone(&self.file_schema),
self.has_header,
self.has_header && first_chunk,
Some(self.delimiter),
self.batch_size,
None,
Expand All @@ -197,11 +198,18 @@ impl FormatReader for CsvOpener {
Box::pin(async move {
match store.get(&file.location).await? {
GetResult::File(file, _) => {
Ok(futures::stream::iter(config.open(file)).boxed())
Ok(futures::stream::iter(config.open(file, true)).boxed())
}
r @ GetResult::Stream(_) => {
let bytes = r.bytes().await?;
Ok(futures::stream::iter(config.open(bytes.reader())).boxed())
GetResult::Stream(s) => {
let mut first_chunk = true;
Ok(newline_delimited_stream(s.map_err(Into::into))
.map_ok(move |bytes| {
let reader = config.open(bytes.reader(), first_chunk);
first_chunk = false;
futures::stream::iter(reader)
})
.try_flatten()
.boxed())
}
}
})
Expand Down Expand Up @@ -249,12 +257,14 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::*;
use crate::test::partitioned_csv_config;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
use arrow::datatypes::*;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
Expand Down Expand Up @@ -441,6 +451,38 @@ mod tests {
Ok(schema)
}

#[tokio::test]
async fn test_chunked() {
let ctx = SessionContext::new();
let chunk_sizes = [10, 20, 30, 40];

for chunk_size in chunk_sizes {
ctx.runtime_env().register_object_store(
"file",
"",
Arc::new(ChunkedStore::new(
Arc::new(LocalFileSystem::new()),
chunk_size,
)),
);

let task_ctx = ctx.task_ctx();

let filename = "aggregate_test_100.csv";
let file_schema = aggr_test_schema();
let config =
partitioned_csv_config(filename, file_schema.clone(), 1).unwrap();
let csv = CsvExec::new(config, true, b',');

let it = csv.execute(0, task_ctx).unwrap();
let batches: Vec<_> = it.try_collect().await.unwrap();

let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();

assert_eq!(total_rows, 100);
}
}

#[tokio::test]
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
Expand Down
Loading

0 comments on commit 944ef3d

Please sign in to comment.