Skip to content

Commit

Permalink
Change compound column field name rules (#952)
Browse files Browse the repository at this point in the history
* change physical name semantic

* replace expect output in context.rs

* replace expect output in sql & dataframe_impl

* add spec entry

* replace expect output in lib doc & planner
  • Loading branch information
waynexia authored Aug 31, 2021
1 parent bef41bc commit 7932cb9
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 395 deletions.
418 changes: 209 additions & 209 deletions datafusion/src/execution/context.rs

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ mod tests {

assert_batches_sorted_eq!(
vec![
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| c1 | MIN(c12) | MAX(c12) | AVG(c12) | SUM(c12) | COUNT(c12) | COUNT(DISTINCT c12) |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | SUM(aggregate_test_100.c12) | COUNT(aggregate_test_100.c12) | COUNT(DISTINCT aggregate_test_100.c12) |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
],
&df
);
Expand Down
20 changes: 10 additions & 10 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//!
//! let expected = vec![
//! "+---+--------+",
//! "| a | MIN(b) |",
//! "+---+--------+",
//! "| 1 | 2 |",
//! "+---+--------+"
//! "+---+--------------------------+",
//! "| a | MIN(tests/example.csv.b) |",
//! "+---+--------------------------+",
//! "| 1 | 2 |",
//! "+---+--------------------------+"
//! ];
//!
//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
Expand Down Expand Up @@ -95,11 +95,11 @@
//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
//!
//! let expected = vec![
//! "+---+--------+",
//! "| a | MIN(b) |",
//! "+---+--------+",
//! "| 1 | 2 |",
//! "+---+--------+"
//! "+---+----------------+",
//! "| a | MIN(example.b) |",
//! "+---+----------------+",
//! "| 1 | 2 |",
//! "+---+----------------+"
//! ];
//!
//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
Expand Down
75 changes: 45 additions & 30 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
input_schema: &DFSchema,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| physical_name(e, input_schema))
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
Expand All @@ -76,15 +75,25 @@ fn create_function_physical_name(
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
}

fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Column(c) => Ok(c.name.clone()),
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(_, name) => Ok(name.clone()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{:?}", value)),
Expr::BinaryExpr { left, op, right } => {
let left = physical_name(left, input_schema)?;
let right = physical_name(right, input_schema)?;
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} {}", left, op, right))
}
Expr::Case {
Expand All @@ -106,50 +115,48 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Ok(name)
}
Expr::Cast { expr, data_type } => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("CAST({} AS {:?})", expr, data_type))
}
Expr::TryCast { expr, data_type } => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
}
Expr::Not(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {}", expr))
}
Expr::Negative(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {})", expr))
}
Expr::IsNull(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NULL", expr))
}
Expr::IsNotNull(expr) => {
let expr = physical_name(expr, input_schema)?;
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_physical_name(&fun.to_string(), false, args, input_schema)
create_function_physical_name(&fun.to_string(), false, args)
}
Expr::ScalarUDF { fun, args, .. } => {
create_function_physical_name(&fun.name, false, args, input_schema)
create_function_physical_name(&fun.name, false, args)
}
Expr::WindowFunction { fun, args, .. } => {
create_function_physical_name(&fun.to_string(), false, args, input_schema)
create_function_physical_name(&fun.to_string(), false, args)
}
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => {
create_function_physical_name(&fun.to_string(), *distinct, args, input_schema)
}
} => create_function_physical_name(&fun.to_string(), *distinct, args),
Expr::AggregateUDF { fun, args } => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(physical_name(e, input_schema)?);
names.push(create_physical_name(e, false)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
Expand All @@ -158,8 +165,8 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
list,
negated,
} => {
let expr = physical_name(expr, input_schema)?;
let list = list.iter().map(|expr| physical_name(expr, input_schema));
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{} NOT IN ({:?})", expr, list))
} else {
Expand Down Expand Up @@ -444,7 +451,7 @@ impl DefaultPhysicalPlanner {
&physical_input_schema,
ctx_state,
),
physical_name(e, logical_input_schema),
physical_name(e),
))
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -545,10 +552,10 @@ impl DefaultPhysicalPlanner {
}
// logical column is not a derived column, safe to pass along to
// physical_name
Err(_) => physical_name(e, input_schema),
Err(_) => physical_name(e),
}
} else {
physical_name(e, input_schema)
physical_name(e)
};

tuple_err((
Expand Down Expand Up @@ -1192,7 +1199,7 @@ impl DefaultPhysicalPlanner {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (physical_name(e, logical_input_schema)?, e),
_ => (physical_name(e)?, e),
};
self.create_window_expr_with_name(
e,
Expand Down Expand Up @@ -1271,7 +1278,7 @@ impl DefaultPhysicalPlanner {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (physical_name(e, logical_input_schema)?, e),
_ => (physical_name(e)?, e),
};

self.create_aggregate_expr_with_name(
Expand Down Expand Up @@ -1629,16 +1636,24 @@ mod tests {
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;
let logical_plan = LogicalPlanBuilder::scan_csv_with_name(
path,
options,
None,
"aggregate_test_100",
)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.build()?;

let execution_plan = plan(&logical_plan)?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<HashAggregateExec>()
.expect("hash aggregate");
assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name());
assert_eq!(
"SUM(aggregate_test_100.c2)",
final_hash_agg.schema().field(1).name()
);
// we need access to the input to the partial aggregate so that other projects can
// implement serde
assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
Expand Down
Loading

0 comments on commit 7932cb9

Please sign in to comment.