From 3083a92f6a017079bbd5dca4c60b1c9a4908773a Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 5 Nov 2024 17:17:13 +0200 Subject: [PATCH] refactor(cubesql): Extract ColumnRemapping in wrapper to separate struct --- .../cubesql/src/compile/engine/df/wrapper.rs | 81 +++++++++++++------ 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 61e4cda323045..02d362466750b 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -260,6 +260,44 @@ fn expr_name(e: &Expr, schema: &DFSchema) -> Result { } } +/// Holds column remapping for generated SQL +/// Can be used to remap expression in logical plans on top, +/// and to generate mapping between schema and Cube load query in wrapper +pub struct ColumnRemapping { + column_remapping: HashMap, +} + +impl ColumnRemapping { + /// Generate member_fields for CubeScanExecutionPlan, which contains SQL with this remapping. + /// Cube will respond with aliases after remapping, which we must use to read response. + /// Schema in DF will stay the same as before remapping. + /// So result would have all aliases after remapping in order derived from `schema`. + pub fn member_fields(&self, schema: &DFSchema) -> Vec { + schema + .fields() + .iter() + .map(|f| { + MemberField::Member( + self.column_remapping + .get(&Column::from_name(f.name().to_string())) + .map(|x| x.name.to_string()) + .unwrap_or(f.name().to_string()), + ) + }) + .collect() + } + + /// Replace every column expression in `expr` according to this remapping. Column expressions + /// not present in `self` will stay the same. + pub fn remap(&self, expr: &Expr) -> result::Result { + replace_col( + expr.clone(), + &self.column_remapping.iter().map(|(k, v)| (k, v)).collect(), + ) + .map_err(|_| CubeError::internal(format!("Can't rename columns for expr: {expr:?}",))) + } +} + /// Builds new column mapping /// One remapper for one context: all unqualified columns with same name are assumed the same column struct Remapper { @@ -365,9 +403,11 @@ impl Remapper { Ok(alias) } - pub fn into_remapping(self) -> Option> { + pub fn into_remapping(self) -> Option { if self.remapping.len() > 0 { - Some(self.remapping) + Some(ColumnRemapping { + column_remapping: self.remapping, + }) } else { None } @@ -377,7 +417,7 @@ impl Remapper { pub struct SqlGenerationResult { pub data_source: Option, pub from_alias: Option, - pub column_remapping: Option>, + pub column_remapping: Option, pub sql: SqlQuery, pub request: TransportLoadRequestQuery, } @@ -446,11 +486,7 @@ impl CubeScanWrapperNode { .await .and_then(|SqlGenerationResult { data_source, mut sql, request, column_remapping, .. }| -> result::Result<_, CubeError> { let member_fields = if let Some(column_remapping) = column_remapping { - schema - .fields() - .iter() - .map(|f| MemberField::Member(column_remapping.get(&Column::from_name(f.name().to_string())).map(|x| x.name.to_string()).unwrap_or(f.name().to_string()))) - .collect() + column_remapping.member_fields(schema) } else { schema .fields() @@ -714,6 +750,8 @@ impl CubeScanWrapperNode { .await? }; + let column_remapping = column_remapping.as_ref(); + let mut subqueries_sql = HashMap::new(); for subquery in subqueries.iter() { let SqlGenerationResult { @@ -759,7 +797,7 @@ impl CubeScanWrapperNode { projection_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -773,7 +811,7 @@ impl CubeScanWrapperNode { flat_group_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -787,7 +825,7 @@ impl CubeScanWrapperNode { aggr_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -801,7 +839,7 @@ impl CubeScanWrapperNode { filter_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -815,7 +853,7 @@ impl CubeScanWrapperNode { window_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -829,7 +867,7 @@ impl CubeScanWrapperNode { order_expr.clone(), sql, generator.clone(), - &column_remapping, + column_remapping, &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), @@ -1060,7 +1098,7 @@ impl CubeScanWrapperNode { exprs: Vec, mut sql: SqlQuery, generator: Arc, - column_remapping: &Option>, + column_remapping: Option<&ColumnRemapping>, next_remapper: &mut Remapper, can_rename_columns: bool, ungrouped_scan_node: Option>, @@ -1068,17 +1106,8 @@ impl CubeScanWrapperNode { ) -> result::Result<(Vec, SqlQuery), CubeError> { let mut aliased_columns = Vec::new(); for original_expr in exprs { - let expr = if let Some(column_remapping) = column_remapping.as_ref() { - let mut expr = replace_col( - original_expr.clone(), - &column_remapping.iter().map(|(k, v)| (k, v)).collect(), - ) - .map_err(|_| { - CubeError::internal(format!( - "Can't rename columns for expr: {:?}", - original_expr - )) - })?; + let expr = if let Some(column_remapping) = column_remapping { + let mut expr = column_remapping.remap(&original_expr)?; if !can_rename_columns { let original_alias = expr_name(&original_expr, &schema)?; if original_alias != expr_name(&expr, &schema)? {