Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate PPL dedup Command Part 1: allowedDuplication=1 #521

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,40 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createDuplicationNullableTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| id INT,
| name STRING,
| category STRING
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, "A", "X"),
| (2, "A", "Y"),
| (3, "A", "Y"),
| (4, "B", "Z"),
| (5, "B", "Z"),
| (6, "B", "Z"),
| (7, "C", "X"),
| (8, null, "Y"),
| (9, "D", "Z"),
| (10, "E", null),
| (11, "A", "X"),
| (12, "A", "Y"),
| (13, null, "X"),
| (14, "B", null),
| (15, "B", "Y"),
| (16, null, "Z"),
| (17, "C", "X"),
| (18, null, null)
| """.stripMargin)
}

protected def createTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLDedupITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createDuplicationNullableTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test dedupe 1 name") {
val frame = sql(s"""
| source = $testTable | dedup 1 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("A"), Row("B"), Row("C"), Row("D"), Row("E"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"))
val dedupKeys = Seq(UnresolvedAttribute("name"))
val filter = Filter(IsNotNull(UnresolvedAttribute("name")), table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"))
implicit val twoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val dedupKeys = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val filter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val isNotNullFilter =
Filter(IsNotNull(UnresolvedAttribute("name")), table)
val deduplicate = Deduplicate(Seq(UnresolvedAttribute("name")), isNotNullFilter)
val isNullFilter = Filter(IsNull(UnresolvedAttribute("name")), table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val isNotNullFilter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val deduplicate = Deduplicate(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")),
isNotNullFilter)
val isNullFilter = Filter(
Or(IsNull(UnresolvedAttribute("name")), IsNull(UnresolvedAttribute("category"))),
table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test 1 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 1 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

ignore("test dedupe 2 name") {
val frame = sql(s"""
| source = $testTable| dedup 2 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] =
Array(Row("A"), Row("A"), Row("B"), Row("B"), Row("C"), Row("C"), Row("D"), Row("E"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name, category") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => {
val value = row.getAs[String](0)
if (value == null) String.valueOf(Int.MaxValue) else value
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))
}

ignore("test dedupe 2 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test 2 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 2 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}
}
17 changes: 16 additions & 1 deletion ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,24 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

---
**Dedup**

- `source = table | dedup a | fields a,b,c`
- `source = table | dedup a,b | fields a,b,c`
- `source = table | dedup a keepempty=true | fields a,b,c`
- `source = table | dedup a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a | fields a,b,c`
- `source = table | dedup 1 a,b | fields a,b,c`
- `source = table | dedup 1 a keepempty=true | fields a,b,c`
- `source = table | dedup 1 a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Unsupported)
- `source = table | dedup 2 a | fields a,b,c` (Unsupported)


For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst)

---

For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2)
For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408)

Expand All @@ -284,3 +298,4 @@ For additional details on Spark PPL commands support campaign, see [PPL Commands

> This is an experimental command - it may be removed in future versions


Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ commands
| correlateCommand
| fieldsCommand
| statsCommand
| dedupCommand
| sortCommand
| headCommand
| evalCommand
Expand Down
Loading
Loading