Skip to content

Commit

Permalink
Fix limited statistic collection accross files with no stats (#4521)
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical authored Dec 17, 2022
1 parent 8d36529 commit 067d044
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
24 changes: 24 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,30 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn load_table_stats_when_no_stats() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();

let ctx = SessionContext::new();
let state = ctx.state();

let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options())))
.with_collect_stat(false);
let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics().num_rows, None);
assert_eq!(exec.statistics().total_byte_size, None);

Ok(())
}

#[tokio::test]
async fn test_try_create_output_ordering() {
let testdata = crate::test_util::parquet_test_data();
Expand Down
34 changes: 26 additions & 8 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,42 @@ use futures::StreamExt;
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files.
/// Needed to read up to `limit` number of rows.
/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];

let mut total_byte_size = 0;
let mut null_counts = vec![0; file_schema.fields().len()];
let mut has_statistics = false;
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);

let mut num_rows = 0;
let mut is_exact = true;

// The number of rows and the total byte size can be calculated as long as
// at least one file has them. If none of the files provide them, then they
// will be omitted from the statistics. The missing values will be counted
// as zero.
let mut num_rows = None;
let mut total_byte_size = None;

// fusing the stream allows us to call next safely even once it is finished
let mut all_files = Box::pin(all_files.fuse());
while let Some(res) = all_files.next().await {
let (file, file_stats) = res?;
result_files.push(file);
is_exact &= file_stats.is_exact;
num_rows += file_stats.num_rows.unwrap_or(0);
total_byte_size += file_stats.total_byte_size.unwrap_or(0);
num_rows = if let Some(num_rows) = num_rows {
Some(num_rows + file_stats.num_rows.unwrap_or(0))
} else {
file_stats.num_rows
};
total_byte_size = if let Some(total_byte_size) = total_byte_size {
Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
} else {
file_stats.total_byte_size
};
if let Some(vec) = &file_stats.column_statistics {
has_statistics = true;
for (i, cs) in vec.iter().enumerate() {
Expand Down Expand Up @@ -103,7 +116,12 @@ pub async fn get_statistics_with_limit(
}
}
}
if num_rows > limit.unwrap_or(usize::MAX) {

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
break;
}
}
Expand All @@ -126,8 +144,8 @@ pub async fn get_statistics_with_limit(
};

let statistics = Statistics {
num_rows: Some(num_rows),
total_byte_size: Some(total_byte_size),
num_rows,
total_byte_size,
column_statistics: column_stats,
is_exact,
};
Expand Down
14 changes: 12 additions & 2 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,24 @@ impl TryInto<Statistics> for &protobuf::Statistics {
type Error = DataFusionError;

fn try_into(self) -> Result<Statistics, Self::Error> {
// Keep it sync with Statistics::to_proto
let none_value = -1_i64;
let column_statistics = self
.column_stats
.iter()
.map(|s| s.into())
.collect::<Vec<_>>();
Ok(Statistics {
num_rows: Some(self.num_rows as usize),
total_byte_size: Some(self.total_byte_size as usize),
num_rows: if self.num_rows == none_value {
None
} else {
Some(self.num_rows as usize)
},
total_byte_size: if self.total_byte_size == none_value {
None
} else {
Some(self.total_byte_size as usize)
},
// No column statistic (None) is encoded with empty array
column_statistics: if column_statistics.is_empty() {
None
Expand Down

0 comments on commit 067d044

Please sign in to comment.