Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit together with pushdown_filters #13745

Closed
bchalk101 opened this issue Dec 12, 2024 · 5 comments · Fixed by #13788
Closed

Limit together with pushdown_filters #13745

bchalk101 opened this issue Dec 12, 2024 · 5 comments · Fixed by #13788
Assignees
Labels
bug Something isn't working

Comments

@bchalk101
Copy link

bchalk101 commented Dec 12, 2024

Describe the bug

I am trying to load a parquet dataset, using both a limit and filter. When combining this with the pushdown_filters config, no data is found.
If I either remove the limit or remove pushdown_filters than it works.

To Reproduce

The following code reproduces the issues.
The dataset here is a path in S3, which contains 120 parquets, and each parquet has about 7000-8000 rows.

The specific rows, where the match occurs, must be deep inside the dataset and no the first parquet in the dataset.

    let object_store = Arc::new(aws_s3);
    let mut config = SessionConfig::new();
    config.options_mut().execution.parquet.pushdown_filters = true;

    let state = SessionStateBuilder::new().with_config(config).build();
    let ctx = SessionContext::from(state);
    ctx.register_object_store(object_store_url.as_ref(), object_store.clone());

    let mut parquet_options = ParquetReadOptions::new();
    parquet_options = parquet_options.parquet_pruning(true);
    let mut df = ctx
        .read_parquet(path, parquet_options.clone())
        .await
        .unwrap();

    df = df
        .filter(col("a").eq(lit(
            "23asdas23",
        )))
        .unwrap();
    df = df.limit(0, Some(1)).unwrap();
    let batch = df.collect().await.unwrap();

Expected behavior

Expected to apply both limit and filter.

Additional context

No response

@bchalk101 bchalk101 added the bug Something isn't working label Dec 12, 2024
@alamb
Copy link
Contributor

alamb commented Dec 12, 2024

Thanks @bchalk101 -- I agree this definitely seems to be a bug.

Can you possibly provide the explain verbose plan?

Aak run https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.explain with verbose = true?

@bchalk101
Copy link
Author

Hey @alamb,

I've attached the two, one with push down and one without.

It looks like the limit is being applied without the predicate when being used with push down, which results in only a single file being scanned.

report_with_push_down.txt
report_without_push_down.txt

@zhuqi-lucas
Copy link
Contributor

Can i take this issue? Thanks!

@zhuqi-lucas
Copy link
Contributor

take

@zhuqi-lucas
Copy link
Contributor

Hi @alamb @bchalk101

Created a pr try to fix this issue, thanks!

And the testing code to reproduce this issue:

#[tokio::main]
pub async fn main() -> Result<()> {
    let output_dir = "/tmp/test_parquet_data";
    let num_files = 120; // Number of Parquet files
    let rows_per_file = 200; // Number of rows per Parquet file

    // Generate the dataset
    generate_test_data(output_dir, num_files, rows_per_file);
    println!("Generated {} Parquet files with {} rows each", num_files, rows_per_file);

    let file_path = "/tmp/test_parquet_data/part-119.parquet"; // 最后一个文件
    let file = File::open(file_path).unwrap();
    let parquet_reader = SerializedFileReader::new(file).unwrap();
    let mut iter = parquet_reader.get_row_iter(None).unwrap();

    while let Some(record) = iter.next() {
        println!("{:?}", record);
    }


    let mut parquet_options = ParquetReadOptions::new();
    parquet_options = parquet_options.parquet_pruning(true);

    let mut config = SessionConfig::new();
    config.options_mut().execution.parquet.pushdown_filters = true;

    let state = SessionStateBuilder::new().with_config(config).build();
    let ctx = SessionContext::from(state);

    let mut df = ctx
        .read_parquet(output_dir, parquet_options.clone())
        .await
        .unwrap();

    df = df
        .filter(col("a").eq(lit(
            "23asdas23",
        )))
        .unwrap();

    df = df.limit(0, Some(1)).unwrap();

    let batch = df.collect().await.unwrap();



    println!("{:?}", batch);

    Ok(())
}


fn generate_test_data(output_dir: &str, num_files: usize, rows_per_file: usize) {
    // Define the schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Utf8, false),
        Field::new("b", DataType::Int32, false),
    ]));

    for file_index in 0..num_files {
        // Generate data for this file
        let mut a_values = Vec::new();
        let mut b_values = Vec::new();

        for row_index in 0..rows_per_file {
            // Fill in rows
            if file_index == num_files - 1 && row_index == rows_per_file / 2 {
                // Add the target row deep in the dataset
                a_values.push("23asdas23".to_string());
                b_values.push(999);
            } else {
                a_values.push(format!("random_{}_{}", file_index, row_index));
                b_values.push((file_index * rows_per_file + row_index) as i32);
            }
        }

        // Create Arrow arrays
        let a_array = Arc::new(StringArray::from(a_values)) as Arc<dyn arrow::array::Array>;
        let b_array = Arc::new(Int32Array::from(b_values)) as Arc<dyn arrow::array::Array>;

        // Create a record batch
        let batch = RecordBatch::try_new(schema.clone(), vec![a_array, b_array]).unwrap();

        // Write to a Parquet file
        let file_path = format!("{}/part-{}.parquet", output_dir, file_index);
        let file = File::create(file_path).unwrap();
        let props = WriterProperties::builder().build();
        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
        writer.write(&batch).unwrap();
        writer.close().unwrap();
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants