Skip to content

Commit

Permalink
refactor(cubesql): Extract ColumnRemapping in wrapper to separate struct
Browse files Browse the repository at this point in the history
  • Loading branch information
mcheshkov committed Nov 5, 2024
1 parent 3af1287 commit 3083a92
Showing 1 changed file with 55 additions and 26 deletions.
81 changes: 55 additions & 26 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,44 @@ fn expr_name(e: &Expr, schema: &DFSchema) -> Result<String> {
}
}

/// Holds column remapping for generated SQL
/// Can be used to remap expression in logical plans on top,
/// and to generate mapping between schema and Cube load query in wrapper
pub struct ColumnRemapping {
column_remapping: HashMap<Column, Column>,
}

impl ColumnRemapping {
/// Generate member_fields for CubeScanExecutionPlan, which contains SQL with this remapping.
/// Cube will respond with aliases after remapping, which we must use to read response.
/// Schema in DF will stay the same as before remapping.
/// So result would have all aliases after remapping in order derived from `schema`.
pub fn member_fields(&self, schema: &DFSchema) -> Vec<MemberField> {
schema
.fields()
.iter()
.map(|f| {
MemberField::Member(
self.column_remapping
.get(&Column::from_name(f.name().to_string()))
.map(|x| x.name.to_string())
.unwrap_or(f.name().to_string()),
)
})
.collect()
}

/// Replace every column expression in `expr` according to this remapping. Column expressions
/// not present in `self` will stay the same.
pub fn remap(&self, expr: &Expr) -> result::Result<Expr, CubeError> {
replace_col(
expr.clone(),
&self.column_remapping.iter().map(|(k, v)| (k, v)).collect(),
)
.map_err(|_| CubeError::internal(format!("Can't rename columns for expr: {expr:?}",)))
}
}

/// Builds new column mapping
/// One remapper for one context: all unqualified columns with same name are assumed the same column
struct Remapper {
Expand Down Expand Up @@ -365,9 +403,11 @@ impl Remapper {
Ok(alias)
}

pub fn into_remapping(self) -> Option<HashMap<Column, Column>> {
pub fn into_remapping(self) -> Option<ColumnRemapping> {
if self.remapping.len() > 0 {
Some(self.remapping)
Some(ColumnRemapping {
column_remapping: self.remapping,
})
} else {
None
}
Expand All @@ -377,7 +417,7 @@ impl Remapper {
pub struct SqlGenerationResult {
pub data_source: Option<String>,
pub from_alias: Option<String>,
pub column_remapping: Option<HashMap<Column, Column>>,
pub column_remapping: Option<ColumnRemapping>,
pub sql: SqlQuery,
pub request: TransportLoadRequestQuery,
}
Expand Down Expand Up @@ -446,11 +486,7 @@ impl CubeScanWrapperNode {
.await
.and_then(|SqlGenerationResult { data_source, mut sql, request, column_remapping, .. }| -> result::Result<_, CubeError> {
let member_fields = if let Some(column_remapping) = column_remapping {
schema
.fields()
.iter()
.map(|f| MemberField::Member(column_remapping.get(&Column::from_name(f.name().to_string())).map(|x| x.name.to_string()).unwrap_or(f.name().to_string())))
.collect()
column_remapping.member_fields(schema)
} else {
schema
.fields()
Expand Down Expand Up @@ -714,6 +750,8 @@ impl CubeScanWrapperNode {
.await?
};

let column_remapping = column_remapping.as_ref();

let mut subqueries_sql = HashMap::new();
for subquery in subqueries.iter() {
let SqlGenerationResult {
Expand Down Expand Up @@ -759,7 +797,7 @@ impl CubeScanWrapperNode {
projection_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand All @@ -773,7 +811,7 @@ impl CubeScanWrapperNode {
flat_group_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand All @@ -787,7 +825,7 @@ impl CubeScanWrapperNode {
aggr_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand All @@ -801,7 +839,7 @@ impl CubeScanWrapperNode {
filter_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand All @@ -815,7 +853,7 @@ impl CubeScanWrapperNode {
window_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand All @@ -829,7 +867,7 @@ impl CubeScanWrapperNode {
order_expr.clone(),
sql,
generator.clone(),
&column_remapping,
column_remapping,
&mut next_remapper,
can_rename_columns,
ungrouped_scan_node.clone(),
Expand Down Expand Up @@ -1060,25 +1098,16 @@ impl CubeScanWrapperNode {
exprs: Vec<Expr>,
mut sql: SqlQuery,
generator: Arc<dyn SqlGenerator>,
column_remapping: &Option<HashMap<Column, Column>>,
column_remapping: Option<&ColumnRemapping>,
next_remapper: &mut Remapper,
can_rename_columns: bool,
ungrouped_scan_node: Option<Arc<CubeScanNode>>,
subqueries: Arc<HashMap<String, String>>,
) -> result::Result<(Vec<AliasedColumn>, SqlQuery), CubeError> {
let mut aliased_columns = Vec::new();
for original_expr in exprs {
let expr = if let Some(column_remapping) = column_remapping.as_ref() {
let mut expr = replace_col(
original_expr.clone(),
&column_remapping.iter().map(|(k, v)| (k, v)).collect(),
)
.map_err(|_| {
CubeError::internal(format!(
"Can't rename columns for expr: {:?}",
original_expr
))
})?;
let expr = if let Some(column_remapping) = column_remapping {
let mut expr = column_remapping.remap(&original_expr)?;
if !can_rename_columns {
let original_alias = expr_name(&original_expr, &schema)?;
if original_alias != expr_name(&expr, &schema)? {
Expand Down

0 comments on commit 3083a92

Please sign in to comment.