Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RelationSubquery PPL #775

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ source = table | where ispresent(a) |
- `source = table1 | cross join left = l right = r table2`
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`

_- **Limitation: sub-searches is unsupported in join right side now**_
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`


#### **Lookup**
Expand Down Expand Up @@ -349,6 +348,15 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`

#### **(Relation) Subquery**
[See additional command details](ppl-subquery-command.md)

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or Search clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-search-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The example show fetch all the document from accounts index with .

PPL query:

os> source=accounts account_number=1 or gender="F";
os> SEARCH source=accounts account_number=1 or gender="F";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two queries as examples. One example query ignores SEARCH keyword. Keep a SEARCH keyword in this example query.

+------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname |
|------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------|
Expand Down
63 changes: 57 additions & 6 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## PPL SubQuery Commands:

**Syntax**
### Syntax
The subquery command should be implemented using a clean, logical syntax that integrates with existing PPL structure.

```sql
Expand All @@ -21,7 +21,7 @@ For additional info See [Issue](https://github.com/opensearch-project/opensearch

---

**InSubquery usage**
### InSubquery usage
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
Expand Down Expand Up @@ -111,8 +111,9 @@ source = supplier
nation
| sort s_name
```
---

**ExistsSubquery usage**
### ExistsSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

Expand Down Expand Up @@ -163,8 +164,9 @@ source = orders
| sort o_orderpriority
| fields o_orderpriority, order_count
```
---

**ScalarSubquery usage**
### ScalarSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested

Expand Down Expand Up @@ -240,10 +242,59 @@ source = spark_catalog.default.outer
source = spark_catalog.default.inner | where c = 1 | stats min(d) | sort d
]
```
---

### (Relation) Subquery
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

**_SQL Migration examples with Subquery PPL:_**

tpch q13
```sql
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
```
Rewritten by PPL (Relation) Subquery:
```sql
SEARCH source = [
SEARCH source = customer
| LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey
[
SEARCH source = orders
| WHERE not like(o_comment, '%special%requests%')
]
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
] AS c_orders
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
| STATS COUNT(1) AS custdist BY c_count
| SORT - custdist, - c_count
```
---

### **Additional Context**
### Additional Context

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` as subquery expressions, their common usage is in `where` clause.

The `where` command syntax is:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, GreaterThan, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, JoinHint, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, Join, JoinHint, LocalLimit, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLJoinITSuite
Expand Down Expand Up @@ -738,4 +738,190 @@ class FlintSparkPPLJoinITSuite
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
}

test("test inner join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, Inner, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test left outer join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | left join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70), Row(null, null, 40))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, LeftOuter, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'Canada' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name AND a.year = 2023 AND a.month = 4 AND b.year = 2023 AND b.month = 4
| [
| source = $testTable2
| ]
| | eval a_name = a.name
| | eval a_country = a.country
| | eval b_country = b.country
| | fields a_name, age, state, a_country, occupation, b_country, salary
| | left join left=a, right=b
| ON a.a_name = b.name
| [
| source = $testTable3
| ]
| | eval aa_country = a.a_country
| | eval ab_country = a.b_country
| | eval bb_country = b.country
| | fields a_name, age, state, aa_country, occupation, ab_country, salary, bb_country, hobby, language
| | cross join left=a, right=b
| [
| source = $testTable2
| ]
| | eval new_country = a.aa_country
| | eval new_salary = b.salary
| | stats avg(new_salary) as avg_salary by span(age, 5) as age_span, state
| | left semi join left=a, right=b
| ON a.state = b.state
| [
| source = $testTable1
| ]
| | eval new_avg_salary = floor(avg_salary)
| | fields state, age_span, new_avg_salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("Quebec", 20, 83333), Row("Ontario", 25, 83333))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Cross, None, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, LeftOuter, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.analyzed.collect { case s: SubqueryAlias =>
s
}.size == 13)
}
}
28 changes: 19 additions & 9 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,27 @@ mlArg

// clauses
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
: SOURCE EQUAL tableOrSubqueryClause
| INDEX EQUAL tableOrSubqueryClause
;

tableOrSubqueryClause
: LT_SQR_PRTHS subSearch RT_SQR_PRTHS (AS alias = qualifiedName)?
| tableSourceClause
;

// One tableSourceClause will generate one Relation node with/without one alias
// even if the relation contains more than one table sources.
// These table sources in one relation will be readed one by one in OpenSearch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we support this source = tb1, tb2, tb3 as tbl
should we treat tb1,tb2,tb3 as single table, and let datasource connector handle it?, right? for instance,

source=`tb1, tb2, tb3` as tbl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The current valid syntax is source = tb1, tb2, tb3 as tbl or source = `tb1`, `tb2`, `tb3` as tbl.
Indexes tb1, tb2 and tb3 will be converted to one relation in OpenSearch and tbl is the alias of this relation.
Spark doesn't support comma-tables as a relation. Catalyst will throw Table not found if a UnresolvedRelation with comma-named table identifier.
source=`tb1, tb2, tb3` as tbl equals to source = tb1, tb2, tb3 as tbl for Spark and fail in resolution because the reason I just said.
For OpenSearch index, source=`tb1, tb2, tb3` as tbl cannot work either since "tb1, tb2, tb3" is not a valid index name.

Copy link
Member Author

@LantaoJin LantaoJin Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source = tb1, tb2, tb3 as tbl and source = `tb1`, `tb2`, `tb3` as tbl are valid because we never need the pattern
source = tb1 as t1, tb2 as t2, tb3 as t3 since there is no meaningful. tb1 tb2 tb3 are combined in one relation in plan.
That's why source=`tb1, tb2, tb3` as tbl will be treated the relation name "tb1, tb2, tb3" and should fail with table not found.

Copy link
Collaborator

@penghuo penghuo Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current valid syntax is source = tb1, tb2, tb3 as tbl or source = tb1, tb2, tb3 as tbl

is it a valid grammer in spark-sql. If not, does it confuse user?

OpenSearch index, source=tb1, tb2, tb3 as tbl cannot work either since "tb1, tb2, tb3" is not a valid index name.

PPL on OpenSearch support it, it is multiple opensearch index.

Should we let the Catalog handle table name resolution? for openserach catalog, it can resolve table name as multiple index properly. for instance,catalog.namespace.index-2024*,index-2023-12,

Copy link
Member Author

@LantaoJin LantaoJin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current valid syntax is source = tb1, tb2, tb3 as tbl or source = `tb1`, `tb2`, `tb3` as tbl

is it a valid grammer in spark-sql. If not, does it confuse user?

Yes. It is a valid grammer in opensearch-spark. For example

search source=test1, test2 or search source=`test1`, `test2`

generate a Spark plan with Union

'Union
:- 'Project [*]
:  +- 'UnresolvedRelation [spark_catalog, default, flint_ppl_test1], [], false
+- 'Project [*]
   +- 'UnresolvedRelation [spark_catalog, default, flint_ppl_test2], [], false

OpenSearch index, source=`tb1, tb2, tb3` as tbl cannot work either since "tb1, tb2, tb3" is not a valid index name.

PPL on OpenSearch support it, it is multiple opensearch index.

Oh, that is the key difference, opensearch-spark can't handle it since "tb1, tb2, tb3" in backticks will be handled as a whole and name with comma is invalid in Spark.

Copy link
Member Author

@LantaoJin LantaoJin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PPL on OpenSearch supports:

  1. source=accounts, account2
  2. source=`accounts`,`account2`
  3. source=`accounts, account2`

But PPL on Spark supports the first two. I would suggest to mark the third as invalid since users will treat the content in backticks as a whole as usual. `accounts, account2` seems more specific for OpenSearch domain. For the instance you provided above, my suggestion is treating content in backticks as a whole. @penghuo

  • √ source=`catalog`.`namespace`.`index-2024*`, `catalog`.`namespace`.`index-2023-12`
  • √ source=`catalog`.`namespace`.index-2024*, index-2023-12
  • × source=`catalog`.`namespace`.`index-2024*, index-2023-12`

Copy link
Member Author

@LantaoJin LantaoJin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any different thoughts? I think it's worth to open a meta issue in sql repo for further discussion if we couldn't get align here. This context in a closed PR could be easily lost.

// But it may have different behaivours in different execution backends.
// For example, a Spark UnresovledRelation node only accepts one data source.
tableSourceClause
: tableSource (COMMA tableSource)*
: tableSource (COMMA tableSource)* (AS alias = qualifiedName)?
Copy link
Member Author

@LantaoJin LantaoJin Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table alias is useful in query which contains a subquery, for example

select a, (  
             select sum(b) 
             from catalog.schema.table1 as t1 
             where t1.a = t2.a
          )  sum_b
 from catalog.schema.table2 as t2

t1 and t2 are table aliases which are used in correlated subquery, sum_b are subquery alias

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the detailed review - can you also add this explanation to the ppl-subquery-command doc ?
thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will give more examples in doc.

;

// join
joinCommand
: (joinType) JOIN sideAlias joinHintList? joinCriteria? right = tableSource
: (joinType) JOIN sideAlias joinHintList? joinCriteria? right = tableOrSubqueryClause
;

joinType
Expand All @@ -279,13 +289,13 @@ joinCriteria
;

joinHintList
: hintPair (COMMA? hintPair)*
;
: hintPair (COMMA? hintPair)*
;

hintPair
: leftHintKey = LEFT_HINT DOT ID EQUAL leftHintValue = ident #leftHint
| rightHintKey = RIGHT_HINT DOT ID EQUAL rightHintValue = ident #rightHint
;
: leftHintKey = LEFT_HINT DOT ID EQUAL leftHintValue = ident #leftHint
| rightHintKey = RIGHT_HINT DOT ID EQUAL rightHintValue = ident #rightHint
;

renameClasue
: orignalField = wcFieldExpression AS renamedField = wcFieldExpression
Expand Down
Loading
Loading