Skip to content
8 changes: 6 additions & 2 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ impl FileFormat for AvroFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(
&self,
_reader: Arc<dyn ObjectReader>,
_table_schema: SchemaRef,
) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down Expand Up @@ -367,7 +371,7 @@ mod tests {
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(
&self,
_reader: Arc<dyn ObjectReader>,
_table_schema: SchemaRef,
) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down Expand Up @@ -265,7 +269,7 @@ mod tests {
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ impl FileFormat for JsonFormat {
Ok(Arc::new(schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(
&self,
_reader: Arc<dyn ObjectReader>,
_table_schema: SchemaRef,
) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down Expand Up @@ -219,7 +223,10 @@ mod tests {
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.to_owned()))
.infer_stats(
local_object_reader(filename.to_owned()),
file_schema.clone(),
)
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
Expand Down
11 changes: 10 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ pub trait FileFormat: Send + Sync + fmt::Debug {

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
///
/// `table_schema` is the (combined) schema of the overall table
/// and may be a superset of the schema contained in this file.
///
/// TODO: should the file source return statistics for only columns referred to in the table schema?
async fn infer_stats(
&self,
reader: Arc<dyn ObjectReader>,
table_schema: SchemaRef,
) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
Expand Down
158 changes: 137 additions & 21 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use futures::TryStreamExt;
use hashbrown::HashMap;
use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
use parquet::errors::ParquetError;
Expand All @@ -46,7 +47,7 @@ use crate::error::Result;
use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::ParquetExec;
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{Accumulator, Statistics};
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
Expand Down Expand Up @@ -99,8 +100,12 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
let stats = fetch_statistics(reader)?;
async fn infer_stats(
&self,
reader: Arc<dyn ObjectReader>,
table_schema: SchemaRef,
) -> Result<Statistics> {
let stats = fetch_statistics(reader, table_schema)?;
Ok(stats)
}

Expand Down Expand Up @@ -279,46 +284,65 @@ fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
}

/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
fn fetch_statistics(
object_reader: Arc<dyn ObjectReader>,
table_schema: SchemaRef,
) -> Result<Statistics> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
let fields = schema.fields().to_vec();
let file_schema = arrow_reader.get_schema()?;
let num_fields = table_schema.fields().len();
let fields = table_schema.fields().to_vec();
let meta_data = arrow_reader.get_metadata();

let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![0; num_fields];
let mut has_statistics = false;

let (mut max_values, mut min_values) = create_max_min_accs(&schema);
let schema_adapter = SchemaAdapter::new(table_schema.clone());

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);

for row_group_meta in meta_data.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();

let columns_null_counts = row_group_meta
.columns()
.iter()
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));

for (i, cnt) in columns_null_counts.enumerate() {
null_counts[i] += cnt as usize
}
let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();

for (i, column) in row_group_meta.columns().iter().enumerate() {
if let Some(stat) = column.statistics() {
has_statistics = true;
summarize_min_max(&mut max_values, &mut min_values, &fields, i, stat)
column_stats.insert(i, (stat.null_count(), stat));
}
}

if has_statistics {
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
if let Some(file_idx) =
schema_adapter.map_column_index(table_idx, &file_schema)
{
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
*null_cnt += *null_count as usize;
summarize_min_max(
&mut max_values,
&mut min_values,
&fields,
table_idx,
stats,
)
}
} else {
*null_cnt += num_rows as usize;
}
}
}
}

let column_stats = if has_statistics {
Some(get_col_stats(
&schema,
&table_schema,
null_counts,
&mut max_values,
&mut min_values,
Expand Down Expand Up @@ -369,10 +393,102 @@ mod tests {

use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use futures::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;

// Add a new column with the specified field name to the RecordBatch
fn add_to_batch(
batch: &RecordBatch,
field_name: &str,
array: ArrayRef,
) -> RecordBatch {
let mut fields = batch.schema().fields().clone();
fields.push(Field::new(field_name, array.data_type().clone(), true));
let schema = Arc::new(Schema::new(fields));

let mut columns = batch.columns().to_vec();
columns.push(array);
RecordBatch::try_new(schema, columns).expect("error; creating record batch")
}

fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very similar / the same as RecordBatch::try_from_iter: https://docs.rs/arrow/11.1.0/arrow/record_batch/struct.RecordBatch.html#method.try_from_iter

columns.into_iter().fold(
RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
|batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
)
}

async fn create_table(
batches: Vec<RecordBatch>,
) -> Result<(Vec<NamedTempFile>, Schema)> {
let merged_schema =
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;

let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let output = tempfile::NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let file: std::fs::File = (*output.as_file())
.try_clone()
.expect("cloning file descriptor");
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

Ok((files, merged_schema))
}

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = create_batch(vec![("c1", c1.clone())]);

let batch2 = create_batch(vec![("c2", c2)]);

let (files, schema) = create_table(vec![batch1, batch2]).await?;
let table_schema = Arc::new(schema);

let reader = local_object_reader(files[0].path().to_string_lossy().to_string());

let stats = fetch_statistics(reader, table_schema.clone())?;

assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is cool to fill in the null stats for the missing column 👍


let reader = local_object_reader(files[1].path().to_string_lossy().to_string());

let stats = fetch_statistics(reader, table_schema)?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
assert_eq!(c1_stats.null_count, Some(3));
assert_eq!(c2_stats.null_count, Some(1));
assert_eq!(c2_stats.max_value, Some(ScalarValue::Int64(Some(2))));
assert_eq!(c2_stats.min_value, Some(ScalarValue::Int64(Some(1))));

Ok(())
}

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -645,7 +761,7 @@ mod tests {
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.infer_stats(local_object_reader(filename.clone()), file_schema.clone())
.await
.expect("Stats inference");
let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,10 @@ impl ListingTable {
let statistics = if self.options.collect_stat {
let object_reader = object_store
.file_reader(part_file.file_meta.sized_file.clone())?;
self.options.format.infer_stats(object_reader).await?
self.options
.format
.infer_stats(object_reader, self.file_schema.clone())
.await?
} else {
Statistics::default()
};
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,18 @@ impl SchemaAdapter {
Self { table_schema }
}

/// Map a column index in the table schema to a column index in a particular
/// file schema
/// Panics if index is not in range for the table schema
pub(crate) fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
) -> Option<usize> {
let field = self.table_schema.field(index);
file_schema.index_of(field.name()).ok()
}

/// Map projected column indexes to the file schema. This will fail if the table schema
/// and the file schema contain a field with the same name and different types.
pub fn map_projections(
Expand Down
Loading