Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableScanExec return exact stats when it contain's filters #12416

Open
waruto210 opened this issue Sep 10, 2024 · 3 comments · May be fixed by #12471
Open

TableScanExec return exact stats when it contain's filters #12416

waruto210 opened this issue Sep 10, 2024 · 3 comments · May be fixed by #12471
Labels
bug Something isn't working

Comments

@waruto210
Copy link
Contributor

Describe the bug

I'm working on a project based on datafusion's ListingTable and ParquetExec. I've made some modifications to enable exact filter pushdown for parquet tables.

When I execute a statement like select count(*) from table where Age > 10 limit 10, I noticed that in the physical plan, TableScanExec is replaced with a placeholder, causing the query to directly return the total number of rows in the table.

Eventually, I found that AggregateStatistics was optimizing the query plan using stats, and ParquetExec::statistics() was returning stats as follows: Rows=Exact(390616), Bytes=Absent, [(Col[0]: Min=Exact(Int16(0)) Max=Exact(Int16(55)) Null=Exact(0))].
However, ParquetExec contains some filters, so the stats should be inexact or absent.

Currently, datafusion's ListingTable supports inexact filter pushdown, so there would be a FilterExec outside the TableScanExec, which prevents incorrect optimization by AggregateStatistics. But, since filters can still exist within ParquetExec, returning exact stats is semantically incorrect.

To Reproduce

Use the following code and print stats in ParquetExec::statistics()

use arrow::util::pretty::print_batches;
use arrow_schema::DataType;
use datafusion::error::Result;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{
    config::{ConfigField, TableParquetOptions},
    datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions},
    prelude::*,
};
use std::sync::Arc;
use tokio;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let ctx = SessionContext::new();
    ctx.state_ref()
        .write()
        .config_mut()
        .options_mut()
        .sql_parser
        .enable_ident_normalization = false;
    ctx.state_ref()
        .write()
        .config_mut()
        .options_mut()
        .execution
        .target_partitions = 1;
    let mut opts = TableParquetOptions::default();
    opts.set("pushdown_filters", "true").unwrap();
    let format = ParquetFormat::new().with_options(opts);
    let options = ListingOptions::new(Arc::new(format))
        .with_table_partition_cols(vec![("A".to_owned(), DataType::Int32)]);
    ctx.register_listing_table(
        "hits",
        "/path",
        options,
        None,
        None,
    )
    .await
    .unwrap();

    let pp = plan(&ctx, "select count(*) from hits where Age > 10")
        .await
        .unwrap();
    let rb = collect(pp, ctx.task_ctx()).await.unwrap();
    print_batches(&rb).unwrap();

    Ok(())
}

async fn plan(ctx: &SessionContext, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
    let now = std::time::Instant::now();
    let lp = Arc::new(ctx.state().create_logical_plan(sql).await?);
    println!("LogicalPlan:\n{:?}", lp);
    let state = ctx.state();
    let lp = state.optimize(&lp).unwrap();
    println!("Optimized LogicalPlan:\n{:?}", lp);
    let pp = state.create_physical_plan(&lp).await.unwrap();
    println!(
        "Plan duration: {}us. Optimized PhysicalPlan:\n{}",
        now.elapsed().as_micros(),
        DisplayableExecutionPlan::new(&*pp).indent(true)
    );
    Ok(pp)
}

You can find that Rows in stats is Exact(n), but n may exceed the actual number of rows returned by ParquetExec, which is semantically incorrect.

Expected behavior

I suggest adding the following code to the statistics method of some TableScanExec implementations

let stats = if self.pushdown_filters() && self.predicate.is_some() {
    self.projected_statistics.clone().into_inexact()
} else {
    self.projected_statistics.clone()
};

Additional context

No response

@waruto210 waruto210 added the bug Something isn't working label Sep 10, 2024
@waruto210
Copy link
Contributor Author

@alamb PTAL

@alamb
Copy link
Contributor

alamb commented Sep 11, 2024

I'm working on a project based on datafusion's ListingTable and ParquetExec.

BTW I would love to learn more about this project, if you can make anything public

@alamb PTAL

I think your description and proposed solution makes lots of sense to me. Thank you @waruto210 -- I am surprised we haven't hit it before 🤔

@waruto210
Copy link
Contributor Author

BTW I would love to learn more about this project, if you can make anything public

We're building a log query platform utilizing some components from datafusion. We'll keep the community posted on issues we find, and we'll also submit code back to datafusion if we have any work that can be put into datafusion repo as a generic component.

@waruto210 waruto210 linked a pull request Sep 15, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants