Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use usize rather than Option<usize> to represent Limit::skipand Limit::offset #3374

Merged
merged 5 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use std::sync::Arc;
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(None, Some(100))?;
/// .limit(0, Some(100))?;
/// let results = df.collect();
/// # Ok(())
/// # }
Expand Down Expand Up @@ -217,15 +217,11 @@ impl DataFrame {
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let df = df.limit(None, Some(100))?;
/// let df = df.limit(0, Some(100))?;
/// # Ok(())
/// # }
/// ```
pub fn limit(
&self,
skip: Option<usize>,
fetch: Option<usize>,
) -> Result<Arc<DataFrame>> {
pub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Arc<DataFrame>> {
let plan = LogicalPlanBuilder::from(self.plan.clone())
.limit(skip, fetch)?
.build()?;
Expand Down Expand Up @@ -438,7 +434,7 @@ impl DataFrame {
/// # }
/// ```
pub async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(None, Some(num))?.collect().await?;
let results = self.limit(0, Some(num))?.collect().await?;
Ok(pretty::print_batches(&results)?)
}

Expand Down Expand Up @@ -543,7 +539,7 @@ impl DataFrame {
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let batches = df.limit(None, Some(100))?.explain(false, false)?.collect().await?;
/// let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -789,7 +785,7 @@ impl TableProvider for DataFrame {
Self::new(
self.session_state.clone(),
&limit
.map_or_else(|| Ok(expr.clone()), |n| expr.limit(None, Some(n)))?
.map_or_else(|| Ok(expr.clone()), |n| expr.limit(0, Some(n)))?
.plan
.clone(),
)
Expand Down Expand Up @@ -923,9 +919,7 @@ mod tests {
async fn limit() -> Result<()> {
// build query using Table API
let t = test_table().await?;
let t2 = t
.select_columns(&["c1", "c2", "c11"])?
.limit(None, Some(10))?;
let t2 = t.select_columns(&["c1", "c2", "c11"])?.limit(0, Some(10))?;
let plan = t2.plan.clone();

// build query using SQL
Expand All @@ -944,7 +938,7 @@ mod tests {
let df = test_table().await?;
let df = df
.select_columns(&["c1", "c2", "c11"])?
.limit(None, Some(10))?
.limit(0, Some(10))?
.explain(false, false)?;
let plan = df.plan.clone();

Expand Down Expand Up @@ -1205,7 +1199,7 @@ mod tests {
.await?
.select_columns(&["c1", "c2", "c3"])?
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
.limit(None, Some(1))?
.limit(0, Some(1))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true),
Expand Down Expand Up @@ -1248,7 +1242,7 @@ mod tests {
col("t2.c2").sort(true, true),
col("t2.c3").sort(true, true),
])?
.limit(None, Some(1))?;
.limit(0, Some(1))?;

let df_results = df.collect().await?;
assert_batches_sorted_eq!(
Expand All @@ -1266,7 +1260,7 @@ mod tests {

assert_eq!("\
Projection: #t1.c1 AS AAA, #t1.c2, #t1.c3, #t2.c1, #t2.c2, #t2.c3\
\n Limit: skip=None, fetch=1\
\n Limit: skip=0, fetch=1\
\n Sort: #t1.c1 ASC NULLS FIRST, #t1.c2 ASC NULLS FIRST, #t1.c3 ASC NULLS FIRST, #t2.c1 ASC NULLS FIRST, #t2.c2 ASC NULLS FIRST, #t2.c3 ASC NULLS FIRST\
\n Inner Join: #t1.c1 = #t2.c1\
\n TableScan: t1\
Expand All @@ -1276,7 +1270,7 @@ mod tests {

assert_eq!("\
Projection: #t1.c1 AS AAA, #t1.c2, #t1.c3, #t2.c1, #t2.c2, #t2.c3\
\n Limit: skip=None, fetch=1\
\n Limit: skip=0, fetch=1\
\n Sort: #t1.c1 ASC NULLS FIRST, #t1.c2 ASC NULLS FIRST, #t1.c3 ASC NULLS FIRST, #t2.c1 ASC NULLS FIRST, #t2.c2 ASC NULLS FIRST, #t2.c3 ASC NULLS FIRST\
\n Inner Join: #t1.c1 = #t2.c1\
\n TableScan: t1 projection=[c1, c2, c3]\
Expand Down Expand Up @@ -1305,7 +1299,7 @@ mod tests {
let df = test_table()
.await?
.select_columns(&["c2", "c3"])?
.limit(None, Some(1))?
.limit(0, Some(1))?
.with_column("sum", cast(col("c2") + col("c3"), DataType::Int64))?;

let df_results = df.collect().await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const DEFAULT_SCHEMA: &str = "public";
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(None, Some(100))?;
/// .limit(0, Some(100))?;
/// let results = df.collect();
/// # Ok(())
/// # }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
//! // create a plan
//! let df = df.filter(col("a").lt_eq(col("b")))?
//! .aggregate(vec![col("a")], vec![min(col("b"))])?
//! .limit(None, Some(100))?;
//! .limit(0, Some(100))?;
//!
//! // execute the plan
//! let results: Vec<RecordBatch> = df.collect().await?;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,13 +1007,13 @@ impl DefaultPhysicalPlanner {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
// a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
if let Some(fetch) = fetch {
Arc::new(LocalLimitExec::new(input, *fetch + skip.unwrap_or(0)))
Arc::new(LocalLimitExec::new(input, *fetch + skip))
} else {
input
}
};

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
Ok(Arc::new(GlobalLimitExec::new(input, Some(*skip), *fetch)))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
Expand Down Expand Up @@ -1710,7 +1710,7 @@ mod tests {
.project(vec![col("c1"), col("c2")])?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.sort(vec![col("c1").sort(true, true)])?
.limit(Some(3), Some(10))?
.limit(3, Some(10))?
.build()?;

let plan = plan(&logical_plan).await?;
Expand Down Expand Up @@ -1802,7 +1802,7 @@ mod tests {
let logical_plan = test_csv_scan()
.await?
.filter(col("c7").lt(col("c12")))?
.limit(Some(3), None)?
.limit(3, None)?
.build()?;

let plan = plan(&logical_plan).await?;
Expand All @@ -1818,7 +1818,7 @@ mod tests {

#[tokio::test]
async fn test_with_zero_offset_plan() -> Result<()> {
let logical_plan = test_csv_scan().await?.limit(Some(0), None)?.build()?;
let logical_plan = test_csv_scan().await?.limit(0, None)?.build()?;
let plan = plan(&logical_plan).await?;
assert!(format!("{:?}", plan).contains("GlobalLimitExec"));
assert!(format!("{:?}", plan).contains("skip: Some(0)"));
Expand All @@ -1830,7 +1830,7 @@ mod tests {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

let logical_plan = scan_empty_with_partitions(Some("test"), &schema, None, 2)?
.limit(Some(3), Some(5))?
.limit(3, Some(5))?
.build()?;
let plan = plan(&logical_plan).await?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! assert_fn_batches {
};
($EXPR:expr, $EXPECTED: expr, $LIMIT: expr) => {
let df = create_test_table()?;
let df = df.select(vec![$EXPR])?.limit(None, Some($LIMIT))?;
let df = df.select(vec![$EXPR])?.limit(0, Some($LIMIT))?;
let batches = df.collect().await?;

assert_batches_eq!($EXPECTED, &batches);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
"GlobalLimitExec: skip=None, fetch=3, ",
"GlobalLimitExec: skip=0, fetch=3, ",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
Expand Down Expand Up @@ -685,7 +685,7 @@ async fn test_physical_plan_display_indent() {

let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let expected = vec![
"GlobalLimitExec: skip=None, fetch=10",
"GlobalLimitExec: skip=0, fetch=10",
" SortExec: [the_min@2 DESC]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl LogicalPlanBuilder {
///
/// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
/// if specified.
pub fn limit(&self, skip: Option<usize>, fetch: Option<usize>) -> Result<Self> {
pub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Self> {
Ok(Self::from(LogicalPlan::Limit(Limit {
skip,
fetch,
Expand Down Expand Up @@ -1057,7 +1057,7 @@ mod tests {
vec![sum(col("salary")).alias("total_salary")],
)?
.project(vec![col("state"), col("total_salary")])?
.limit(Some(2), Some(10))?
.limit(2, Some(10))?
.build()?;

let expected = "Limit: skip=2, fetch=10\
Expand Down
7 changes: 4 additions & 3 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ impl LogicalPlan {
write!(
f,
"Limit: skip={}, fetch={}",
skip.map_or("None".to_string(), |x| x.to_string()),
skip,
fetch.map_or_else(|| "None".to_string(), |x| x.to_string())
)
}
Expand Down Expand Up @@ -1294,8 +1294,9 @@ pub struct Extension {
#[derive(Clone)]
pub struct Limit {
/// Number of rows to skip before fetch
pub skip: Option<usize>,
/// Maximum number of rows to fetch
pub skip: usize,
/// Maximum number of rows to fetch,
/// None means fetching all rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub fetch: Option<usize>,
/// The logical plan
pub input: Arc<LogicalPlan>,
Expand Down
39 changes: 20 additions & 19 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ fn eliminate_limit(
}
}
None => {
match skip {
if *skip == 0 {
// If there is no LIMIT and OFFSET is zero, LIMIT/OFFSET can be removed
Some(skip) if *skip == 0 => return Ok(input.as_ref().clone()),
_ => {}
return Ok(input.as_ref().clone());
} else {
{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant part

}
}
}
Expand All @@ -99,7 +100,7 @@ fn eliminate_limit(
.map(|plan| {
eliminate_limit(
_optimizer,
&Ancestor::FromLimit { skip: *skip },
&Ancestor::FromLimit { skip: Some(*skip) },
plan,
_optimizer_config,
)
Expand Down Expand Up @@ -174,7 +175,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(None, Some(0))
.limit(0, Some(0))
.unwrap()
.build()
.unwrap();
Expand All @@ -194,7 +195,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(None, Some(0))
.limit(0, Some(0))
.unwrap()
.union(plan1)
.unwrap()
Expand All @@ -215,9 +216,9 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(None, Some(2))
.limit(0, Some(2))
.unwrap()
.limit(Some(2), None)
.limit(2, None)
.unwrap()
.build()
.unwrap();
Expand All @@ -234,11 +235,11 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(None, Some(2))
.limit(0, Some(2))
.unwrap()
.sort(vec![col("a")])
.unwrap()
.limit(Some(2), Some(1))
.limit(2, Some(1))
.unwrap()
.build()
.unwrap();
Expand All @@ -255,18 +256,18 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(None, Some(2))
.limit(0, Some(2))
.unwrap()
.sort(vec![col("a")])
.unwrap()
.limit(None, Some(1))
.limit(0, Some(1))
.unwrap()
.build()
.unwrap();

let expected = "Limit: skip=None, fetch=1\
let expected = "Limit: skip=0, fetch=1\
\n Sort: #test.a\
\n Limit: skip=None, fetch=2\
\n Limit: skip=0, fetch=2\
\n Aggregate: groupBy=[[#test.a]], aggr=[[SUM(#test.b)]]\
\n TableScan: test";
assert_optimized_plan_eq(&plan, expected);
Expand All @@ -278,11 +279,11 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(Some(2), Some(1))
.limit(2, Some(1))
.unwrap()
.sort(vec![col("a")])
.unwrap()
.limit(Some(3), Some(1))
.limit(3, Some(1))
.unwrap()
.build()
.unwrap();
Expand All @@ -298,15 +299,15 @@ mod tests {
let table_scan = test_table_scan().unwrap();
let table_scan_inner = test_table_scan_with_name("test1").unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.limit(Some(2), Some(1))
.limit(2, Some(1))
.unwrap()
.join_using(
&table_scan_inner,
JoinType::Inner,
vec![Column::from_name("a".to_string())],
)
.unwrap()
.limit(Some(3), Some(1))
.limit(3, Some(1))
.unwrap()
.build()
.unwrap();
Expand All @@ -325,7 +326,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![sum(col("b"))])
.unwrap()
.limit(Some(0), None)
.limit(0, None)
.unwrap()
.build()
.unwrap();
Expand Down
Loading