Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallel Arrow file format reading #8897

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
159 changes: 146 additions & 13 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
};

use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};

use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

use super::FileGroupPartitioner;

/// Execution plan for scanning Arrow data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -117,6 +122,28 @@ impl ExecutionPlan for ArrowExec {
Ok(self)
}

/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(self.output_ordering().is_some())
.repartition_file_groups(&self.base_config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut new_plan = self.clone();
new_plan.base_config.file_groups = repartitioned_file_groups;
return Ok(Some(Arc::new(new_plan)));
}
Ok(None)
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -155,19 +182,125 @@ impl FileOpener for ArrowOpener {
let object_store = self.object_store.clone();
let projection = self.projection.clone();
Ok(Box::pin(async move {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(file, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
let range = file_meta.range.clone();
match range {
None => {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
GetResultPayload::File(file, _) => {
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
file, projection,
)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
cursor, projection,
)?;
Ok(futures::stream::iter(arrow_reader).boxed())
}
}
}
GetResultPayload::Stream(_) => {
let bytes = r.bytes().await?;
let cursor = std::io::Cursor::new(bytes);
let arrow_reader =
arrow::ipc::reader::FileReader::try_new(cursor, projection)?;
Ok(futures::stream::iter(arrow_reader).boxed())
Some(range) => {
// range is not none, the file maybe split into multiple parts to scan in parallel
// get footer_len firstly
let get_option = GetOptions {
range: Some(GetRange::Suffix(10)),
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.await?;
let footer_len_buf = get_result.bytes().await?;
let footer_len = arrow_ipc::reader::read_footer_length(
footer_len_buf[..].try_into().unwrap(),
)?;
// read footer according to footer_len
let get_option = GetOptions {
range: Some(GetRange::Suffix(10 + footer_len)),
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.await?;
let footer_buf = get_result.bytes().await?;
let footer = arrow_ipc::root_as_footer(
footer_buf[..footer_len].try_into().unwrap(),
)
.map_err(|err| {
arrow_schema::ArrowError::ParseError(format!(
"Unable to get root as footer: {err:?}"
))
})?;
// build decoder according to footer & projection
let schema =
arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
let mut decoder = FileDecoder::new(schema.into(), footer.version());
if let Some(projection) = projection {
decoder = decoder.with_projection(projection);
}
let dict_ranges = footer
.dictionaries()
.iter()
.flatten()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
block_offset..block_offset + block_len
})
.collect_vec();
let dict_results = object_store
.get_ranges(file_meta.location(), &dict_ranges)
.await?;
for (dict_block, dict_result) in
footer.dictionaries().iter().flatten().zip(dict_results)
{
decoder.read_dictionary(dict_block, &dict_result.into())?;
}

// filter recordbatches according to range
let recordbatches = footer
.recordBatches()
.iter()
.flatten()
.filter(|block| {
let block_offset = block.offset() as usize;
block_offset >= range.start as usize
&& block_offset < range.end as usize
})
.copied()
.collect_vec();

let recordbatch_ranges = recordbatches
.iter()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
block_offset..block_offset + block_len
})
.collect_vec();

let recordbatch_results = object_store
.get_ranges(file_meta.location(), &recordbatch_ranges)
.await?;

Ok(futures::stream::iter(
recordbatches
.into_iter()
.zip(recordbatch_results)
.filter_map(move |(block, data)| {
match decoder.read_record_batch(&block, &data.into()) {
Ok(Some(record_batch)) => Some(Ok(record_batch)),
Ok(None) => None,
Err(err) => Some(Err(err)),
}
}),
)
.boxed())
}
}
}))
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-pro
[dev-dependencies]
env_logger = { workspace = true }
num_cpus = { workspace = true }
tokio = { version = "1.0", features = ["rt-multi-thread"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?


[[test]]
harness = false
Expand Down
11 changes: 10 additions & 1 deletion datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,16 @@ query TT
EXPLAIN SELECT * FROM arrow_table
----
logical_plan TableScan: arrow_table projection=[f0, f1, f2]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2]
physical_plan ArrowExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:0..461], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:461..922], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:922..1383], [WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow:1383..1842]]}, projection=[f0, f1, f2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 looks good to me -- though I I wonder will this actually read in parallel (or do these ranges all end up in the same reader)?


# correct content
query ITB
SELECT * FROM arrow_table
----
1 foo true
2 bar NULL
3 baz false
4 NULL true

# Cleanup
statement ok
Expand Down