Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate PPL dedup Command Part 1: allowedDuplication=1 #521

Merged
merged 4 commits into from
Aug 8, 2024

Conversation

LantaoJin
Copy link
Member

@LantaoJin LantaoJin commented Aug 5, 2024

Description

Syntax

dedup [int] <field-list> keepempty=<bool>] [consecutive=<bool>]

This PR translates following three PPL dedup commands to different Spark Logical Plans.
Note, allowed duplication in this PR is 1. Assuming the <field-list> is a, b.

  1. keepempty=false: | dedup [1] a, b [keepempty=false]
Deduplicate ['a, 'b]
+- Filter (isnotnull('a) AND isnotnull('b))
     +- Project
          +- UnresolvedRelation
  1. keepempty=true : | dedup [1] a, b keepempty=true
Union
:- Deduplicate ['a, 'b]
:   +- Filter (isnotnull('a) AND isnotnull('b))
:        +- Project
:             +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
     +- Project
          +- UnresolvedRelation
  1. consecutive=true: | dedup [1] a, b [keepempty=true] consecutive=true
UnsupportedOperationException("Consecutive deduplication is not supported")

Issues Resolved

Resolves #523 (Subtask of #421)

Check List

  • Updated documentation (ppl-spark-integration/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: Lantao Jin <ltjin@amazon.com>
@LantaoJin
Copy link
Member Author

LantaoJin commented Aug 5, 2024

Part 2

To translate dedup command with allowedDuplication > 1, such as | dedup 2 a,b to Spark plan, the solution in my mind is translating to a plan with Window function (e.g row_number) and a new column row_number_col as Filter.

  • For | dedup 2 a, b keepempty=false
Project ['a, 'b] // ensure the row_number_col is removed. not a and b only, should contain all fields except row_number_col
+- Filter ('row_number_col <= 2) // allowed duplication = 2
   +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
       +- Filter (isnotnull('a) AND isnotnull('b)) // keepempty=false
          +- Project
             +- UnresolvedRelation
  • For | dedup 2 a, b keepempty=true
Union
:- Project ['a, 'b]
:  +- Filter ('row_number_col <= 2)
:     +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
:        +- Filter (isnotnull('a) AND isnotnull('b))
:           +- Project
:              +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
   +- Project
      +- UnresolvedRelation

One un-clarified point is what is the 'order_key in Window node?

Option 1

AFAIK, there always should be a time series field existing for log analysis, such as _time or @timestamp. How to get it in AST?

Option 2

If we cannot point out an existing time series field as order_key, an option is change the the syntax of dedup command as following (make sure sql repo will change first)

| dedup 2 a, b order by c
| dedup 2 a, b orderkey=c

Option 3

partitioning and ordering by the same columns:

Window.partitionBy(dedupColumns).orderBy(dedupColumns)

If that, the results can be non-deterministic for large datasets. This non-determinism arises because the sorting does not have a secondary tie-breaking rule to ensure a consistent order of rows within each partition. If multiple rows have the same values for the partition and order columns, their relative order can vary across different runs

A complex case for option 3 is while we have a sorter before dedup command

| sort c | dedup 2 a,b

Any thoughts? @YANG-DB @penghuo @dai-chen

FYI: In Spark, window function requires window to be ordered. Query fails if order by clause is not specified. For example, SELECT <wf>(expr) OVER ([PARTITION BY window_partition] ORDER BY window_ordering) from table
But in MySQL or opensearch-sql, the order by clause can be omitted.

@LantaoJin LantaoJin marked this pull request as ready for review August 5, 2024 09:45
@LantaoJin
Copy link
Member Author

The second un-clarified point is about node Deduplicate and DeduplicateWithinWatermark
https://issues.apache.org/jira/browse/SPARK-42931 introduced a new LogicalPlan DeduplicateWithinWatermark to deduplicate events within the watermark and was available in 3.5.0+
I am not clear the architecture of Flint, but seems it performs on Spark Streaming. Deduplication on streaming dataset has different semantic with batch dataset:

We document the behavior clearly that the event time column should be a part of the subset columns for deduplication to clean up the state, but it cannot be applied to the customers as timestamps are not exactly the same for duplicated events in their use cases.

Need more guidance on the purpose of this repository and how it should be used in production or in future. For current time-being, I choose Deduplicate in this PR (DeduplicateWithinWatermark required 3.5.0+).

@penghuo
Copy link
Collaborator

penghuo commented Aug 6, 2024

Part 2

To translate dedup command with allowedDuplication > 1, such as | dedup 2 a,b to Spark plan, the solution in my mind is translating to a plan with Window function (e.g row_number) and a new column row_number_col as Filter.

  • For | dedup 2 a, b keepempty=false
Project ['a, 'b] // ensure the row_number_col is removed. not a and b only, should contain all fields except row_number_col
+- Filter ('row_number_col <= 2) // allowed duplication = 2
   +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
       +- Filter (isnotnull('a) AND isnotnull('b)) // keepempty=false
          +- Project
             +- UnresolvedRelation
  • For | dedup 2 a, b keepempty=true
Union
:- Project ['a, 'b]
:  +- Filter ('row_number_col <= 2)
:     +- Window [row_number() windowspecdefinition('a, 'b, 'order_key ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 'row_number_col], ['a, 'b], ['order_key ASC NULLS FIRST]
:        +- Filter (isnotnull('a) AND isnotnull('b))
:           +- Project
:              +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
   +- Project
      +- UnresolvedRelation

One un-clarified point is what is the 'order_key in Window node? AFAIK, there always should be a time series field existing for log analysis, such as _time or @timestamp. How to get it in AST? If we cannot point out an existing time series field as order_key, the syntax of dedup command might change to

| dedup 2 a, b order by c
| dedup 2 a, b orderkey=c

Any thoughts? @YANG-DB @penghuo @dai-chen

FYI: In Spark, window function requires window to be ordered. Query fails if order by clause is not specified. For example, SELECT <wf>(expr) OVER ([PARTITION BY window_partition] ORDER BY window_ordering) from table But in MySQL or opensearch-sql, the order by clause can be omitted.

Agree on Windows function.

Questions

  • What is the difference of Deduplicate and Window? Does Deduplicate will be optimized as WIndows function?
  • Can we windows_partition and window_ordering be same field?

@LantaoJin
Copy link
Member Author

LantaoJin commented Aug 7, 2024

Questions

What is the difference of Deduplicate and Window? Does Deduplicate will be optimized as WIndows function?
Can we windows_partition and window_ordering be same field?

  1. They are totally different. Actually, Deduplicate will be optimized as Aggregation. Using Deduplicate instead of Aggregation in code is more readable. Ref https://github.com/apache/spark/blob/b3877894acb512477d41038269f20158d7bca8f3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2261
  2. Yes we can. But it brings non-deterministic results, order can vary across different runs.

I updated the comment #521 (comment)

@LantaoJin
Copy link
Member Author

LantaoJin commented Aug 7, 2024

Summary

Discussed with @penghuo offline. Here is the conclusions:

  1. For node Deduplicate and DeduplicateWithinWatermark

Deduplicate will be optimized as Aggregation, but DeduplicateWithinWatermark will be transformed to a new physical plan node StreamingDeduplicateWithinWatermarkExec. Considering current usage, #521 will keep using Deduplicate.

  1. For the 'order_key in Window

Will choose the option 3 (partitioning and ordering by the same columns). Ref #521 (comment)

CC @YANG-DB and @dai-chen

Again, all above shouldn't block code review for this PR.

@@ -271,7 +273,105 @@ public LogicalPlan visitWindowFunction(WindowFunction node, CatalystPlanContext

@Override
public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : dedupe ");
node.getChild().get(0).accept(this, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not mandatory but for future - lets extract the concrete login into a dedicated strategy class and inject that into the QueryPlanner
this would reduce the complexity and simplify the testing and seperation of concerns

@LantaoJin
Copy link
Member Author

@penghuo @dai-chen @YANG-DB Any other concern on it? Would this PR be merged now?

Signed-off-by: YANGDB <yang.db.dev@gmail.com>
@YANG-DB YANG-DB merged commit 7c4244f into opensearch-project:main Aug 8, 2024
4 checks passed
@YANG-DB YANG-DB added Lang:PPL Pipe Processing Language support backport 0.5 labels Aug 14, 2024
opensearch-trigger-bot bot pushed a commit that referenced this pull request Aug 14, 2024
* Translate PPL Dedup Command: only one duplication allowd

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* add document

Signed-off-by: Lantao Jin <ltjin@amazon.com>

---------

Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: YANGDB <yang.db.dev@gmail.com>
Co-authored-by: YANGDB <yang.db.dev@gmail.com>
(cherry picked from commit 7c4244f)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
YANG-DB added a commit that referenced this pull request Aug 14, 2024
* Translate PPL Dedup Command: only one duplication allowd



* add document



---------




(cherry picked from commit 7c4244f)

Signed-off-by: Lantao Jin <ltjin@amazon.com>
Signed-off-by: YANGDB <yang.db.dev@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: YANGDB <yang.db.dev@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 0.5 Lang:PPL Pipe Processing Language support
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support dedup Command with allowedDuplication=1
3 participants