Skip to content

Commit

Permalink
[CHORE]: better subquery handling (#3295)
Browse files Browse the repository at this point in the history
this better defers the errors to the optimizer instead of immediately
erroring out on subqueries.
  • Loading branch information
universalmind303 authored Nov 15, 2024
1 parent e18b719 commit 4470192
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 8 deletions.
14 changes: 11 additions & 3 deletions src/daft-dsl/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ pub enum Expr {
Subquery(Subquery),
#[display("{_0}, {_1}")]
InSubquery(ExprRef, Subquery),
#[display("{_0}")]
Exists(Subquery),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -709,7 +711,9 @@ impl Expr {
Self::Agg(agg_expr) => agg_expr.semantic_id(schema),
Self::ScalarFunction(sf) => scalar_function_semantic_id(sf, schema),

Self::Subquery(..) | Self::InSubquery(..) => todo!("semantic_id for subquery"),
Self::Subquery(..) | Self::InSubquery(..) | Self::Exists(..) => {
FieldID::new("__subquery__")
} // todo: better/unique id
}
}

Expand All @@ -719,6 +723,7 @@ impl Expr {
Self::Column(..) => vec![],
Self::Literal(..) => vec![],
Self::Subquery(..) => vec![],
Self::Exists(..) => vec![],

// One child.
Self::Not(expr)
Expand Down Expand Up @@ -753,7 +758,7 @@ impl Expr {
pub fn with_new_children(&self, children: Vec<ExprRef>) -> Self {
match self {
// no children
Self::Column(..) | Self::Literal(..) | Self::Subquery(..) => {
Self::Column(..) | Self::Literal(..) | Self::Subquery(..) | Self::Exists(..) => {
assert!(children.is_empty(), "Should have no children");
self.clone()
}
Expand Down Expand Up @@ -990,6 +995,7 @@ impl Expr {
Ok(first_field.clone())
}
Self::InSubquery(expr, _) => Ok(Field::new(expr.name(), DataType::Boolean)),
Self::Exists(_) => Ok(Field::new("exists", DataType::Boolean)),
}
}

Expand Down Expand Up @@ -1022,6 +1028,7 @@ impl Expr {
Self::IfElse { if_true, .. } => if_true.name(),
Self::Subquery(subquery) => subquery.name(),
Self::InSubquery(expr, _) => expr.name(),
Self::Exists(subquery) => subquery.name(),
}
}

Expand Down Expand Up @@ -1096,7 +1103,8 @@ impl Expr {
| Expr::FillNull(..)
| Expr::ScalarFunction { .. }
| Expr::Subquery(..)
| Expr::InSubquery(..) => Err(io::Error::new(
| Expr::InSubquery(..)
| Expr::Exists(..) => Err(io::Error::new(
io::ErrorKind::Other,
"Unsupported expression for SQL translation",
)),
Expand Down
3 changes: 2 additions & 1 deletion src/daft-dsl/src/optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub fn requires_computation(e: &Expr) -> bool {
| Expr::Between { .. }
| Expr::IfElse { .. }
| Expr::Subquery { .. }
| Expr::InSubquery { .. } => true,
| Expr::InSubquery { .. }
| Expr::Exists(..) => true,
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/daft-logical-plan/src/ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ fn replace_column_with_semantic_id(
Transformed::yes(new_expr.into())
} else {
match e.as_ref() {
Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) => Transformed::no(e),
Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) | Expr::Exists(_) => {
Transformed::no(e)
}
Expr::Agg(agg_expr) => replace_column_with_semantic_id_aggexpr(
agg_expr.clone(),
subexprs_to_replace,
Expand Down
1 change: 1 addition & 0 deletions src/daft-logical-plan/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ fn translate_clustering_spec_expr(
},
Expr::Literal(_) => Ok(clustering_spec_expr.clone()),
Expr::Subquery(_) => Ok(clustering_spec_expr.clone()),
Expr::Exists(_) => Ok(clustering_spec_expr.clone()),
Expr::Alias(child, name) => {
let newchild = translate_clustering_spec_expr(child, old_colname_to_new_colname)?;
Ok(newchild.alias(name.clone()))
Expand Down
25 changes: 22 additions & 3 deletions src/daft-sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Relation {
}
}

#[derive(Clone)]
pub struct SQLPlanner {
catalog: SQLCatalog,
current_relation: Option<Relation>,
Expand Down Expand Up @@ -1135,7 +1136,7 @@ impl SQLPlanner {
negated,
} => {
let expr = self.plan_expr(expr)?;
let mut this = Self::new(self.catalog.clone());
let mut this = self.clone();
let subquery = this.plan_query(subquery)?.build();
let subquery = Subquery { plan: subquery };

Expand Down Expand Up @@ -1295,8 +1296,26 @@ impl SQLPlanner {
},
)
}
SQLExpr::Exists { .. } => unsupported_sql_err!("EXISTS"),
SQLExpr::Subquery(_) => unsupported_sql_err!("SUBQUERY"),
SQLExpr::Exists { subquery, negated } => {
let mut this = self.clone();
let subquery = this.plan_query(subquery)?;
let subquery = Subquery {
plan: subquery.build(),
};
if *negated {
Ok(Expr::Exists(subquery).arced().not())
} else {
Ok(Expr::Exists(subquery).arced())
}
}
SQLExpr::Subquery(subquery) => {
let mut this = self.clone();
let subquery = this.plan_query(subquery)?;
let subquery = Subquery {
plan: subquery.build(),
};
Ok(Expr::Subquery(subquery).arced())
}
SQLExpr::GroupingSets(_) => unsupported_sql_err!("GROUPING SETS"),
SQLExpr::Cube(_) => unsupported_sql_err!("CUBE"),
SQLExpr::Rollup(_) => unsupported_sql_err!("ROLLUP"),
Expand Down
3 changes: 3 additions & 0 deletions src/daft-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ impl Table {
InSubquery(_expr, _subquery) => Err(DaftError::ComputeError(
"IN <SUBQUERY> should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(),
)),
Exists(_subquery) => Err(DaftError::ComputeError(
"EXISTS <SUBQUERY> should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(),
)),
}?;

if expected_field.name != series.field().name {
Expand Down

0 comments on commit 4470192

Please sign in to comment.