Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RelationSubquery PPL #775

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ source = table | where ispresent(a) |
- `source = table1 | cross join left = l right = r table2`
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`

_- **Limitation: sub-searches is unsupported in join right side now**_
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`


#### **Lookup**
Expand Down Expand Up @@ -349,6 +348,15 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`

#### **(Relation) Subquery**
[See additional command details](ppl-subquery-command.md)

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:
Expand Down
8 changes: 4 additions & 4 deletions docs/ppl-lang/ppl-search-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ Using ``search`` command to retrieve document from the index. ``search`` command


### Syntax
`search source=[<remote-cluster>:]<index> [boolean-expression]`
`(search | from)? source=[<remote-cluster>:]<index> [boolean-expression]`

* search: search keywords, which could be ignore.
* search or from: search keywords, which could be ignore.
* index: mandatory. search command must specify which index to query from. The index name can be prefixed by "<cluster name>:" for cross-cluster search.
* bool-expression: optional. any expression which could be evaluated to boolean value.

Expand All @@ -17,7 +17,7 @@ The example show fetch all the document from accounts index.

PPL query:

os> source=accounts;
os> SEARCH source=accounts;
+------------------+-------------+----------------------+-----------+----------+--------+------------+---------+-------+-----------------------+------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname |
|------------------+-------------+----------------------+-----------+----------+--------+------------+---------+-------+-----------------------+------------|
Expand All @@ -32,7 +32,7 @@ The example show fetch all the document from accounts index with .

PPL query:

os> source=accounts account_number=1 or gender="F";
os> FROM source=accounts account_number=1 or gender="F";
+------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname |
|------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------|
Expand Down
63 changes: 57 additions & 6 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## PPL SubQuery Commands:

**Syntax**
### Syntax
The subquery command should be implemented using a clean, logical syntax that integrates with existing PPL structure.

```sql
Expand All @@ -21,7 +21,7 @@ For additional info See [Issue](https://github.com/opensearch-project/opensearch

---

**InSubquery usage**
### InSubquery usage
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
Expand Down Expand Up @@ -111,8 +111,9 @@ source = supplier
nation
| sort s_name
```
---

**ExistsSubquery usage**
### ExistsSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

Expand Down Expand Up @@ -163,8 +164,9 @@ source = orders
| sort o_orderpriority
| fields o_orderpriority, order_count
```
---

**ScalarSubquery usage**
### ScalarSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested

Expand Down Expand Up @@ -240,10 +242,59 @@ source = spark_catalog.default.outer
source = spark_catalog.default.inner | where c = 1 | stats min(d) | sort d
]
```
---

### (Relation) Subquery
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

**_SQL Migration examples with Subquery PPL:_**

tpch q13
```sql
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
```
Rewritten by PPL (Relation) Subquery:
```sql
SEARCH source = [
SEARCH source = customer
| LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey
[
SEARCH source = orders
| WHERE not like(o_comment, '%special%requests%')
]
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
] AS c_orders
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
| STATS COUNT(1) AS custdist BY c_count
| SORT - custdist, - c_count
```
---

### **Additional Context**
### Additional Context

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` as subquery expressions, their common usage is in `where` clause.

The `where` command syntax is:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, GreaterThan, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, JoinHint, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, Join, JoinHint, LocalLimit, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLJoinITSuite
Expand Down Expand Up @@ -738,4 +738,190 @@ class FlintSparkPPLJoinITSuite
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
}

test("test inner join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, Inner, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test left outer join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | left join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70), Row(null, null, 40))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, LeftOuter, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'Canada' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name AND a.year = 2023 AND a.month = 4 AND b.year = 2023 AND b.month = 4
| [
| source = $testTable2
| ]
| | eval a_name = a.name
| | eval a_country = a.country
| | eval b_country = b.country
| | fields a_name, age, state, a_country, occupation, b_country, salary
| | left join left=a, right=b
| ON a.a_name = b.name
| [
| source = $testTable3
| ]
| | eval aa_country = a.a_country
| | eval ab_country = a.b_country
| | eval bb_country = b.country
| | fields a_name, age, state, aa_country, occupation, ab_country, salary, bb_country, hobby, language
| | cross join left=a, right=b
| [
| source = $testTable2
| ]
| | eval new_country = a.aa_country
| | eval new_salary = b.salary
| | stats avg(new_salary) as avg_salary by span(age, 5) as age_span, state
| | left semi join left=a, right=b
| ON a.state = b.state
| [
| source = $testTable1
| ]
| | eval new_avg_salary = floor(avg_salary)
| | fields state, age_span, new_avg_salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("Quebec", 20, 83333), Row("Ontario", 25, 83333))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Cross, None, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, LeftOuter, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.analyzed.collect { case s: SubqueryAlias =>
s
}.size == 13)
}
}
Loading
Loading