diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index 1dc42837e6..60fc08fc96 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -153,6 +153,8 @@ pub enum Expr { Subquery(Subquery), #[display("{_0}, {_1}")] InSubquery(ExprRef, Subquery), + #[display("{_0}")] + Exists(Subquery), } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)] @@ -709,7 +711,9 @@ impl Expr { Self::Agg(agg_expr) => agg_expr.semantic_id(schema), Self::ScalarFunction(sf) => scalar_function_semantic_id(sf, schema), - Self::Subquery(..) | Self::InSubquery(..) => todo!("semantic_id for subquery"), + Self::Subquery(..) | Self::InSubquery(..) | Self::Exists(..) => { + FieldID::new("__subquery__") + } // todo: better/unique id } } @@ -719,6 +723,7 @@ impl Expr { Self::Column(..) => vec![], Self::Literal(..) => vec![], Self::Subquery(..) => vec![], + Self::Exists(..) => vec![], // One child. Self::Not(expr) @@ -753,7 +758,7 @@ impl Expr { pub fn with_new_children(&self, children: Vec) -> Self { match self { // no children - Self::Column(..) | Self::Literal(..) | Self::Subquery(..) => { + Self::Column(..) | Self::Literal(..) | Self::Subquery(..) | Self::Exists(..) => { assert!(children.is_empty(), "Should have no children"); self.clone() } @@ -990,6 +995,7 @@ impl Expr { Ok(first_field.clone()) } Self::InSubquery(expr, _) => Ok(Field::new(expr.name(), DataType::Boolean)), + Self::Exists(_) => Ok(Field::new("exists", DataType::Boolean)), } } @@ -1022,6 +1028,7 @@ impl Expr { Self::IfElse { if_true, .. } => if_true.name(), Self::Subquery(subquery) => subquery.name(), Self::InSubquery(expr, _) => expr.name(), + Self::Exists(subquery) => subquery.name(), } } @@ -1096,7 +1103,8 @@ impl Expr { | Expr::FillNull(..) | Expr::ScalarFunction { .. } | Expr::Subquery(..) - | Expr::InSubquery(..) => Err(io::Error::new( + | Expr::InSubquery(..) + | Expr::Exists(..) => Err(io::Error::new( io::ErrorKind::Other, "Unsupported expression for SQL translation", )), diff --git a/src/daft-dsl/src/optimization.rs b/src/daft-dsl/src/optimization.rs index 68d360cb00..e84035a572 100644 --- a/src/daft-dsl/src/optimization.rs +++ b/src/daft-dsl/src/optimization.rs @@ -35,7 +35,8 @@ pub fn requires_computation(e: &Expr) -> bool { | Expr::Between { .. } | Expr::IfElse { .. } | Expr::Subquery { .. } - | Expr::InSubquery { .. } => true, + | Expr::InSubquery { .. } + | Expr::Exists(..) => true, } } diff --git a/src/daft-logical-plan/src/ops/project.rs b/src/daft-logical-plan/src/ops/project.rs index 633ac9ec12..e2775180f4 100644 --- a/src/daft-logical-plan/src/ops/project.rs +++ b/src/daft-logical-plan/src/ops/project.rs @@ -202,7 +202,9 @@ fn replace_column_with_semantic_id( Transformed::yes(new_expr.into()) } else { match e.as_ref() { - Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) => Transformed::no(e), + Expr::Column(_) | Expr::Literal(_) | Expr::Subquery(_) | Expr::Exists(_) => { + Transformed::no(e) + } Expr::Agg(agg_expr) => replace_column_with_semantic_id_aggexpr( agg_expr.clone(), subexprs_to_replace, diff --git a/src/daft-logical-plan/src/partitioning.rs b/src/daft-logical-plan/src/partitioning.rs index c33a90823d..78ac00b874 100644 --- a/src/daft-logical-plan/src/partitioning.rs +++ b/src/daft-logical-plan/src/partitioning.rs @@ -237,6 +237,7 @@ fn translate_clustering_spec_expr( }, Expr::Literal(_) => Ok(clustering_spec_expr.clone()), Expr::Subquery(_) => Ok(clustering_spec_expr.clone()), + Expr::Exists(_) => Ok(clustering_spec_expr.clone()), Expr::Alias(child, name) => { let newchild = translate_clustering_spec_expr(child, old_colname_to_new_colname)?; Ok(newchild.alias(name.clone())) diff --git a/src/daft-sql/src/planner.rs b/src/daft-sql/src/planner.rs index e3acbc93b4..1827c23147 100644 --- a/src/daft-sql/src/planner.rs +++ b/src/daft-sql/src/planner.rs @@ -64,6 +64,7 @@ impl Relation { } } +#[derive(Clone)] pub struct SQLPlanner { catalog: SQLCatalog, current_relation: Option, @@ -1135,7 +1136,7 @@ impl SQLPlanner { negated, } => { let expr = self.plan_expr(expr)?; - let mut this = Self::new(self.catalog.clone()); + let mut this = self.clone(); let subquery = this.plan_query(subquery)?.build(); let subquery = Subquery { plan: subquery }; @@ -1295,8 +1296,26 @@ impl SQLPlanner { }, ) } - SQLExpr::Exists { .. } => unsupported_sql_err!("EXISTS"), - SQLExpr::Subquery(_) => unsupported_sql_err!("SUBQUERY"), + SQLExpr::Exists { subquery, negated } => { + let mut this = self.clone(); + let subquery = this.plan_query(subquery)?; + let subquery = Subquery { + plan: subquery.build(), + }; + if *negated { + Ok(Expr::Exists(subquery).arced().not()) + } else { + Ok(Expr::Exists(subquery).arced()) + } + } + SQLExpr::Subquery(subquery) => { + let mut this = self.clone(); + let subquery = this.plan_query(subquery)?; + let subquery = Subquery { + plan: subquery.build(), + }; + Ok(Expr::Subquery(subquery).arced()) + } SQLExpr::GroupingSets(_) => unsupported_sql_err!("GROUPING SETS"), SQLExpr::Cube(_) => unsupported_sql_err!("CUBE"), SQLExpr::Rollup(_) => unsupported_sql_err!("ROLLUP"), diff --git a/src/daft-table/src/lib.rs b/src/daft-table/src/lib.rs index adf490e9a5..82003f4e5f 100644 --- a/src/daft-table/src/lib.rs +++ b/src/daft-table/src/lib.rs @@ -596,6 +596,9 @@ impl Table { InSubquery(_expr, _subquery) => Err(DaftError::ComputeError( "IN should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(), )), + Exists(_subquery) => Err(DaftError::ComputeError( + "EXISTS should be optimized away before evaluation. This indicates a bug in the query optimizer.".to_string(), + )), }?; if expected_field.name != series.field().name {