Skip to content

Commit

Permalink
feat: Parallel Arrow file format reading (#8897)
Browse files Browse the repository at this point in the history
* feat: Parallel Arrow file format reading

* update slt for arrow scan.

* fix tomlfmt

* fix tomlfmt

* update configs.md
  • Loading branch information
my-vegetable-has-exploded authored Jan 29, 2024
1 parent af0e8a9 commit 9bf0f68
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
159 changes: 146 additions & 13 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
};

use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};

use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

use super::FileGroupPartitioner;

/// Execution plan for scanning Arrow data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -117,6 +122,28 @@ impl ExecutionPlan for ArrowExec {
Ok(self)
}

/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(self.output_ordering().is_some())
.repartition_file_groups(&self.base_config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut new_plan = self.clone();
new_plan.base_config.file_groups = repartitioned_file_groups;
return Ok(Some(Arc::new(new_plan)));
}
Ok(None)
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -155,19 +182,125 @@ impl FileOpener for ArrowOpener {
let object_store = self.object_store.clone();
let projection = self.projection.clone();
Ok(Box::pin(async move {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(file, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
let range = file_meta.range.clone();
match range {
None => {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
file, projection,
)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
cursor, projection,
)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
}
}
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(cursor, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
Some(range) => {
// range is not none, the file maybe split into multiple parts to scan in parallel
// get footer_len firstly
let get_option = GetOptions {
range: Some(GetRange::Suffix(10)),
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.await?;
let footer_len_buf = get_result.bytes().await?;
let footer_len = arrow_ipc::reader::read_footer_length(
footer_len_buf[..].try_into().unwrap(),
)?;
// read footer according to footer_len
let get_option = GetOptions {
range: Some(GetRange::Suffix(10 + footer_len)),
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.await?;
let footer_buf = get_result.bytes().await?;
let footer = arrow_ipc::root_as_footer(
footer_buf[..footer_len].try_into().unwrap(),
)
.map_err(|err| {
arrow_schema::ArrowError::ParseError(format!(
"Unable to get root as footer: {err:?}"
))
})?;
// build decoder according to footer & projection
let schema =
arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
let mut decoder = FileDecoder::new(schema.into(), footer.version());
if let Some(projection) = projection {
decoder = decoder.with_projection(projection);
}
let dict_ranges = footer
.dictionaries()
.iter()
.flatten()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
block_offset..block_offset + block_len
})
.collect_vec();
let dict_results = object_store
.get_ranges(file_meta.location(), &dict_ranges)
.await?;
for (dict_block, dict_result) in
footer.dictionaries().iter().flatten().zip(dict_results)
{
decoder.read_dictionary(dict_block, &dict_result.into())?;
}

// filter recordbatches according to range
let recordbatches = footer
.recordBatches()
.iter()
.flatten()
.filter(|block| {
let block_offset = block.offset() as usize;
block_offset >= range.start as usize
&& block_offset < range.end as usize
})
.copied()
.collect_vec();

let recordbatch_ranges = recordbatches
.iter()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
block_offset..block_offset + block_len
})
.collect_vec();

let recordbatch_results = object_store
.get_ranges(file_meta.location(), &recordbatch_ranges)
.await?;

Ok(futures::stream::iter(
recordbatches
.into_iter()
.zip(recordbatch_results)
.filter_map(move |(block, data)| {
match decoder.read_record_batch(&block, &data.into()) {
Ok(Some(record_batch)) => Some(Ok(record_batch)),
Ok(None) => None,
Err(err) => Some(Err(err)),
}
}),
)
.boxed())
}
}
}))
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-pro
[dev-dependencies]
env_logger = { workspace = true }
num_cpus = { workspace = true }
tokio = { version = "1.0", features = ["rt-multi-thread"] }

[[test]]
harness = false
Expand Down
11 changes: 10 additions & 1 deletion datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,16 @@ query TT
EXPLAIN SELECT * FROM arrow_table
----
logical_plan TableScan: arrow_table projection=[f0, f1, f2]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2]
physical_plan ArrowExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:0..461], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:461..922], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:922..1383], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:1383..1842]]}, projection=[f0, f1, f2]

# correct content
query ITB
SELECT * FROM arrow_table
----
1 foo true
2 bar NULL
3 baz false
4 NULL true

# Cleanup
statement ok
Expand Down

0 comments on commit 9bf0f68

Please sign in to comment.