Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Improve Delta Protocol Transitions #2848

Merged
merged 35 commits into from
Jul 17, 2024

Conversation

andreaschat-db
Copy link
Contributor

@andreaschat-db andreaschat-db commented Apr 4, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Currently, protocol transitions can be hard to manage. A few examples:

  • It is hard to predict the output of certain operations.
  • Once a legacy protocol transitions to a Table Features protocol it is quite hard to transition back to a legacy protocol.
  • Adding a feature in a protocol and then removing it might lead to a different protocol.
  • Adding an explicit feature to a legacy protocol always leads to a table features protocol although it might not be necessary.
  • Dropping features from legacy protocols is not supported. As a result, the order the features are dropped matters.
  • Default protocol versions are ignored in some cases.
  • Enabling table features by default results in feature loss in legacy protocols.
  • CREATE TABLE ignores any legacy versions set if there is also a table feature in the definition.

This PR proposes several protocol transition improvements in order to simplify user journeys. The high level proposal is the following:

Two protocol representations with singular operational semantics. This means that we have two ways to represent a protocol: a) The legacy representation and b) the table features representation. The latter representation is more powerful than the former, i.e the table features representation can represent all legacy protocols but the opposite is not true. This is followed by three simple rules:

  1. All operations should be allowed to be performed on both protocol representations and should yield equivalent results.
  2. The result should always be represented with the weaker form when possible.
  3. Conversely, if the result of an operation on a legacy protocol cannot be represented with the legacy representation, use the Table Features representation.

The PR introduces the following behavioural changes:

  1. Now all protocol operations are followed by denormalisation and then normalisation. Up to now, normalisation would only be performed after dropping a features.
  2. Legacy features can now be dropped directly from a legacy protocol. The result is represented with table features if it cannot be represented with a legacy protocol.
  3. Operations on table feature protocols now take into account the default versions. For example, enabling deletion vectors on table results to protocol (3, 7, AppendOnly, Invariants, DeletionVectors).
  4. Operations on table feature protocols now take into account any protocol versions set on the table. For example, creating a table with protocol (1, 3) and deletion vectors results to protocol (3, 7, AppendOnly, Invariants, CheckConstraints, DeletionVectors).
  5. It is not possible now to have a table features protocol without table features. For example, creating a table with (3, 7) and no table features is now normalised to (1, 1).
  6. Column Mapping can now be automatically enabled on legacy protocols when the mode is changed explicitly.

How was this patch tested?

Added DeltaProtocolTransitionsSuite. Also modified existing tests in DeltaProtocolVersionSuite.

Does this PR introduce any user-facing changes?

Yes.

@xupefei
Copy link
Contributor

xupefei commented Apr 4, 2024

One question. Consider the following command sequence:

CREATE TABLE x TBLPROPERTIES (
    delta.feature.RemovableReaderWriterFeature = 'supported',
    delta.feature.ChangeDataFeedTableFeature = 'supported')
ALTER TABLE x DROP FEATURE RemovableReaderWriterFeature
ALTER TABLE x SET TBLPROPERTIES (
  'delta.minReaderVersion' = 1,
  'delta.minWriterVersion' = 4) 

Should the table get (1,4) in the end? I think it should as we automatically add all legacy features in the 3rd command.

@andreaschat-db
Copy link
Contributor Author

One question. Consider the following command sequence:

CREATE TABLE x TBLPROPERTIES (
    delta.feature.RemovableReaderWriterFeature = 'supported',
    delta.feature.ChangeDataFeedTableFeature = 'supported')
ALTER TABLE x DROP FEATURE RemovableReaderWriterFeature
ALTER TABLE x SET TBLPROPERTIES (
  'delta.minReaderVersion' = 1,
  'delta.minWriterVersion' = 4) 

Should the table get (1,4) in the end? I think it should as we automatically add all legacy features in the 3rd command.

Very good point. Command 2 will result to Protocol(3, 7, ChangeDataFeedTableFeature). Then, command 3 will result to Protocol(3, 7, ChangeDataFeedTableFeature + rest features in (1, 4)). Dropping one of the existing legacy features cannot result in (1, 4) because the legacy features list won't match exactly. To get out of this and downgrade to (1, 4) the user will have to add a feature to the table and then drop it... I could address this in a separate PR in the future since there is a way out of it and it is a rare case, i.e., downgrade the protocol versions of a table with NO table features but with table feature versions.

@zsxwing
Copy link
Member

zsxwing commented Apr 5, 2024

Asking the user to set delta.minReaderVersion and delta.minWriterVersion is not user-friendly. This means users need to understand how to map a feature to minReaderVersion/minWriterVersion.

Copy link
Contributor

@larsk-db larsk-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@larsk-db
Copy link
Contributor

larsk-db commented Apr 5, 2024

Asking the user to set delta.minReaderVersion and delta.minWriterVersion is not user-friendly. This means users need to understand how to map a feature to minReaderVersion/minWriterVersion.

True...but it's even worse if they have to do that and add each feature individually. At least this way they can go "ok, my connector supports (x, y), let me set this on the table and then downgrade.

@felipepessoto
Copy link
Contributor

For me it seems the existing behavior you described is better:

“For example, consider creating a table with Protocol(3, 7, RemovableReaderWriterFeature, ChangeDataFeed) and then dropping RemovableReaderWriterFeature. The resulting protocol will be Protocol(3, 7, ChangeDataFeed) instead of Protocol(1, 4)”

Protocol 3,7 besides being newer has less requirements. Meaning if you “downgrade” to 1,4 you are making your table less compatible to clients that doesn’t support all the features from writer v4, but supports v7 + CDC

Copy link
Collaborator

@bart-samwel bart-samwel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the user journey: this journey works when the user thinks of this ahead of time. Should we also support the reverse order:

  1. DROP FEATURE
  2. Set the protocol versions to (x, y).
    I.e, should we "normalize" the protocol versions to a legacy protocol version whenever someone alters the protocol, not just on DROP FEATURE?

@andreaschat-db andreaschat-db force-pushed the addLegacyFeature branch 2 times, most recently from b50aa8f to 613fc3c Compare June 27, 2024 09:17
Comment on lines 577 to 583
assert(log.update().protocol === Protocol(1, 7)
.merge(Protocol(1, 2)).withFeature(TestWriterFeature))
.withFeature(TestWriterFeature).merge(Protocol(1, 2)))
table.addFeatureSupport(TestReaderWriterFeature.name)
assert(
log.update().protocol === Protocol(3, 7)
.merge(Protocol(1, 2))
.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature)))
.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature))
.merge(Protocol(1, 2)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it wasn't clear to me why this change was necessary. It seems a bit error-prone to write the expected result this way. Let's change it to be an explicit Protocol (without merging two protocol objects).

Copy link
Contributor Author

@andreaschat-db andreaschat-db Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree. I changed it. Unfortunately, we use merge very often in the tests for validating results and it would be a pain to change all of them.

The order matters because of a check in withFeatures that requires table feature versions when adding a table feature. Merge would now normalize (1, 7) + (1, 2) to (1, 1). So, (1, 1).withFeature would produce an error.

Comment on lines 1898 to 1900
|'$minReaderKey' = '3',
|'$minWriterKey' = '7',
|'${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this change as part of this PR, right? I initially made the minReaderKey 2 to expose a bug: a655bed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting table feature versions without a table feature is now always normalised to (1, 1). I modified the test to capture your initial intention.

Comment on lines +88 to +89
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we need to update this test? the minReaderVersion and minWriterVersion are "suggestions" and are ignored anyways since we enable DVs which will make this table have (3,7) in the end anyways. Are we enforcing users' minReaderVersion and minWriterVersion after this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is changed so the protocol always ends up to (1, 1) in testDroppingColumnMapping independently of whether we enable DVs or not. As a result, the validation at the end of verifyDropFeatureTruncateHistory does not need to change whether this is a table features or not.

With the new semantics there are 2 cases:

  1. Enabling columnMapping on a legacy protocol results to (2, 5).
  2. Enabling columnMapping on a table features protocol results to (2, 7, ColumnMapping).

}

test("protocol upgrade compatibility") {
assert(Protocol(1, 1).canUpgradeTo(Protocol(1, 1)))
assert(Protocol(1, 1).canUpgradeTo(Protocol(2, 1)))
assert(!Protocol(1, 2).canUpgradeTo(Protocol(1, 1)))
assert(!Protocol(2, 2).canUpgradeTo(Protocol(2, 1)))
assert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that this needed to be removed in this PR. Intuitively, (1, 2) -> (1,1) is not an "upgrade" so as a developer I expect this to return false. Have we changed the definition of canUpgradeTo to mean canTransitionTo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change here is orthogonal and it is not needed by anything else done in this PR. The idea came up in early iterations of this PR with @bart-samwel to simplify canUpgradeTo.

@@ -25,6 +25,7 @@ import java.util.Locale
import scala.sys.process.Process

// scalastyle:off import.ordering.noEmptyLine
// scalastyle:off line.size.limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary when only deleting stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with that one is that was contained in the piece of code I deleted and it turns out it is needed by the rest of the code below... I think what happened is that there is a bug below were the style is not turned back on. So the style was not enforced when follow up code was added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, but then, don't add it at the top, but just scope it where it's needed, please? Or is just sooooo many places that that would be crazy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid there are 38 errors :(.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That are all non-consecutive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid they are not.

@scottsand-db
Copy link
Collaborator

There are spark master test failures:

https://github.com/delta-io/delta/actions/runs/9891350186/job/27321586977?pr=2848

[info] CreateCheckpointSuite:
[info] - commits containing adds and removes, and no previous checkpoint
[info] - commits containing adds, and no previous checkpoint
[info] - commits containing adds and removes, and a previous checkpoint created using Spark (actions/perfile): 1000000
[info] - commits containing adds and removes, and a previous checkpoint created using Spark (actions/perfile): 3
[info] - commits containing adds, and a previous checkpoint created using Spark (actions/perfile): 1000000
[info] - commits containing adds, and a previous checkpoint created using Spark (actions/perfile): 3
[info] - commits with metadata updates
[info] - commits with protocol updates *** FAILED ***
[info]   == Results ==
[info]   
[info]   == Expected Answer - 1 ==
[info]   ([2,2])
[info]   
[info]   == Result - 1 ==
[info]   ([1,2]) (TestUtils.scala:386)

@andreaschat-db
Copy link
Contributor Author

There are spark master test failures:

https://github.com/delta-io/delta/actions/runs/9891350186/job/27321586977?pr=2848

[info] CreateCheckpointSuite:
[info] - commits containing adds and removes, and no previous checkpoint
[info] - commits containing adds, and no previous checkpoint
[info] - commits containing adds and removes, and a previous checkpoint created using Spark (actions/perfile): 1000000
[info] - commits containing adds and removes, and a previous checkpoint created using Spark (actions/perfile): 3
[info] - commits containing adds, and a previous checkpoint created using Spark (actions/perfile): 1000000
[info] - commits containing adds, and a previous checkpoint created using Spark (actions/perfile): 3
[info] - commits with metadata updates
[info] - commits with protocol updates *** FAILED ***
[info]   == Results ==
[info]   
[info]   == Expected Answer - 1 ==
[info]   ([2,2])
[info]   
[info]   == Result - 1 ==
[info]   ([1,2]) (TestUtils.scala:386)

The PR is not ready to be merged. The particular failure is fixed by #3356.

@allisonport-db allisonport-db merged commit 669dca9 into delta-io:master Jul 17, 2024
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants