Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 28, 2022
1 parent 73066af commit 30f9f8d
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,56 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use crate::prelude::SessionContext;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
);
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
.input()
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.unwrap();
assert_eq!(1234, coalesce.target_batch_size);
Ok(())
}

#[tokio::test]
async fn test_disable_coalesce() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_bool(OPT_COALESCE_BATCHES, false),
);
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
// projection should directly wrap filter with no coalesce step
let _filter = projection
.input()
.as_any()
.downcast_ref::<FilterExec>()
.unwrap();
Ok(())
}

async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn ExecutionPlan>> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let table = MemTable::try_new(schema, vec![partition])?;
ctx.register_table("a", Arc::new(table))?;
let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
ctx.create_physical_plan(&plan).await
}

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
let schema = test_schema();
Expand Down

0 comments on commit 30f9f8d

Please sign in to comment.