Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1105] Change Data Feed - MERGE command #1155

Closed
wants to merge 15 commits into from

Conversation

allisonport-db
Copy link
Collaborator

See the project plan at #1105.

This PR adds CDF to the MERGE command.

Merge is implemented in two ways.

  • Insert-only merges. For these we don't need to do anything special, since we only write AddFiles with the new rows.

    • However, our current implementation of insert-only merges doesn't correctly update the metric numTargetRowsInserted, which is used to check for data changes in CDCReader. This PR fixes that.
  • For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.

    • Changes made to JoinedRowProcessor make column ROW_DELETED_COL unnecessary, so this PR removes it.

Tests are added in MergeCDCSuite.

GitOrigin-RevId: eb88ef7d39632e6559f7b14fd71fc93a40fcf901
GitOrigin-RevId: d9b7f8a1e0844f9ce97f0aaf5533a14c5e4e712b
GitOrigin-RevId: f926e5913b622933ddf2e69b243498d91ce27695
GitOrigin-RevId: 3f03422b890f065efe30b6552adc4f98cb123f8c
GitOrigin-RevId: 2a8d18e0b14177db9418807c3ef5e99ce3042442
GitOrigin-RevId: 3fe9b5daa0ed1c5db004a7f0302c94b7291fc726
GitOrigin-RevId: eb0a83a0fad86e65f8ddbb3adc21e247f2a17820
GitOrigin-RevId: d6a54de385b0a50cc8687d6bf945083aecbfef93
GitOrigin-RevId: cda01c70baa0a3cf0df8040774ea2e3e151cf97a
GitOrigin-RevId: d5233be73d337eab9225e957351d57e3ca49c3a4
// performing the final write, and the increment column will always be dropped after executing
// the metrics UDF.

// We produce both rows for the CDC_TYPE_NOT_CDC partition to be written to the main table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

itst hard to understand what partition means in the immediate context of this code. rewrite differently

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. LMK if it's more clear.

GitOrigin-RevId: 5a9774228ff1c6943483868d8e4ddf5bc27aff45
GitOrigin-RevId: 0d05065e70aeceb537c03c14d85bb56dcce174c1
GitOrigin-RevId: 068f4a35a2266715c9776638eb8220e5976b6324
Comment on lines +879 to +882
row.getBoolean(
outputRowEncoder.schema.getFieldIndex(ROW_DROPPED_COL)
.getOrElse(outputRowEncoder.schema.fields.size)
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, this is simply mimicking the prior implementation when CDC is disabled.

Another solution is to have outputRowEncoder include ROW_DROPPED_COL when CDC is disabled. It will be dropped on line 684 regardless. Not sure the tradeoff with respect to decoding a column we don't need.

Copy link
Contributor

@tdas tdas Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is moot discussion now right? you have to used ROW_DROPPED_COL to get the metrics right .. right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just about how we get the index of ROW_DROPPED_COL. This fx could be simplified to

row.getBoolean(outputRowEncoder.schema.fieldIndex(ROW_DROPPED_COL))

if we always include ROW_DROPPED_COL in outputRowEncoder

notMatchedConditions: Seq[Expression],
notMatchedOutputs: Seq[Seq[Expression]],
notMatchedOutputs: Seq[Seq[Seq[Expression]]],
noopCopyOutput: Seq[Expression],
deleteRowOutput: Seq[Expression],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't actually need this. it can be done the way it is in https://github.com/allisonport-db/delta/blob/02a238e6666e31cc74ea1dbda12842ce929de4d6/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L860 such that we simply do not create an output row. Not sure which is clearer to readers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont get what you are referring to here. thread got lost?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry might be hard to explain in writing.

But basically since now processRow returns an Iterator[InternalRow] instead of just InternalRow, instead of using an expression to create our "deletedRowOutput" that we later delete, we could simply omit that inputRow from the returned iterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's implemented that way in the above linked commit, before I added back ROW_DROPPED_COL

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more a question of readability I think... not sure if either way is preferred to the other

GitOrigin-RevId: e9dad844f161fabb6b2f59725eaa6a64c3380b90
@@ -748,14 +837,16 @@ object MergeIntoCommand {
val FILE_NAME_COL = "_file_name_"
val SOURCE_ROW_PRESENT_COL = "_source_row_present_"
val TARGET_ROW_PRESENT_COL = "_target_row_present_"
val ROW_DROPPED_COL = "_row_dropped_"
val INCR_ROW_COUNT_COL = "_incr_row_count_"

class JoinedRowProcessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this last time. Definitely add param docs. the triple sequence is hella confusing. honestly i should have param docs when i had originally implemented this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added param docs. An overview of what JoinedRowProcessor is doing may also be helpful, what do you think? I can add tomorrow

GitOrigin-RevId: 201a8e10b7a1c9fd76a1ad24f4d9312574ad7e5d
@allisonport-db allisonport-db requested a review from tdas June 3, 2022 08:42
Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great now!

@vkorukanti vkorukanti added this to the 2.0 milestone Jun 28, 2022
jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the `MERGE` command.

Merge is implemented in two ways.

- Insert-only merges. For these we don't need to do anything special, since we only write `AddFile`s with the new rows.
    - However, our current implementation of insert-only merges doesn't correctly update the metric `numTargetRowsInserted`, which is used to check for data changes in [CDCReader](https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L313). This PR fixes that.

- For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.
    - Changes made to `JoinedRowProcessor` make column `ROW_DELETED_COL` unnecessary, so this PR removes it.

Tests are added in `MergeCDCSuite`.

Closes delta-io#1155

GitOrigin-RevId: 0386c6ff811abe433644b5f5f46a3c7d51001740
jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
See the project plan at delta-io#1105.

This PR adds CDF to the `MERGE` command.

Merge is implemented in two ways.

- Insert-only merges. For these we don't need to do anything special, since we only write `AddFile`s with the new rows.
    - However, our current implementation of insert-only merges doesn't correctly update the metric `numTargetRowsInserted`, which is used to check for data changes in [CDCReader](https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala#L313). This PR fixes that.

- For all other merges, we generate CDF rows for inserts, updates, and deletions. We do this by generating expression sequences for CDF outputs (i.e. preimage, insert, etc) on a clause-by-clause basis. We apply these to the rows in our joinedDF in addition to our existing main data output sequences.
    - Changes made to `JoinedRowProcessor` make column `ROW_DELETED_COL` unnecessary, so this PR removes it.

Tests are added in `MergeCDCSuite`.

Closes delta-io#1155

GitOrigin-RevId: 0386c6ff811abe433644b5f5f46a3c7d51001740
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants