Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Protocol version downgrade in the presence of table features #2841

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ trait TableFeatureSupport { this: Protocol =>
lazy val readerAndWriterFeatures: Seq[TableFeature] =
readerAndWriterFeatureNames.toSeq.flatMap(TableFeature.featureNameToFeature)

/**
* A sequence of native [[TableFeature]]s. This is derived by filtering out all explicitly
* supported legacy features.
*/
@JsonIgnore
lazy val nativeReaderAndWriterFeatures: Seq[TableFeature] =
readerAndWriterFeatures.filterNot(_.isLegacyFeature)

/**
* Get all features that are implicitly supported by this protocol, for example, `Protocol(1,2)`
* implicitly supports `appendOnly` and `invariants`. When this protocol is capable of requiring
Expand Down Expand Up @@ -242,43 +250,16 @@ trait TableFeatureSupport { this: Protocol =>
}

/**
* Determine whether this protocol can be safely downgraded to a new protocol `to`. This
* includes the following:
* - The current protocol needs to support at least writer features. This is because protocol
* downgrade is only supported with table features.
* - The protocol version can only be downgraded when there are no non-legacy table features.
* - We can only remove one feature at a time.
* - When downgrading protocol versions, the resulting versions must support exactly the same
* set of legacy features supported by the current protocol.
*
* Note, this not an exhaustive list of downgrade rules. Rather, we check the most important
* downgrade invariants. We also perform checks during feature removal at
* [[AlterTableDropFeatureDeltaCommand]].
* Determine whether this protocol can be safely downgraded to a new protocol `to`.
* All we need is the implicit and explicit features between the two protocols to match,
* excluding the dropped feature. Note, this accounts for cases where we downgrade
* from table features to legacy protocol versions.
*/
def canDowngradeTo(to: Protocol, droppedFeatureName: String): Boolean = {
if (!supportsWriterFeatures) return false

// When `to` protocol does not have any features version downgrades are possible. However,
// the current protocol needs to contain one non-legacy feature. We also allow downgrade when
// there are only legacy features. This is to accommodate the case when the user attempts to
// remove a legacy feature in a table that only contains legacy features.
if (to.readerAndWriterFeatureNames.isEmpty) {
val featureNames = readerAndWriterFeatureNames - droppedFeatureName
val sameLegacyFeaturesSupported = featureNames == to.implicitlySupportedFeatures.map(_.name)
val minRequiredVersions = TableFeatureProtocolUtils.minimumRequiredVersions(
featureNames.flatMap(TableFeature.featureNameToFeature).toSeq)

return sameLegacyFeaturesSupported &&
(to.minReaderVersion, to.minWriterVersion) == minRequiredVersions &&
readerAndWriterFeatures.filterNot(_.isLegacyFeature).size <= 1
}

// When `to` protocol contains table features we cannot downgrade the protocol version.
if (to.minReaderVersion != this.minReaderVersion) return false
if (to.minWriterVersion != this.minWriterVersion) return false

// Can only remove a maximum of one feature at a time.
(this.readerAndWriterFeatureNames -- to.readerAndWriterFeatureNames).size == 1
val thisFeatures = this.implicitlyAndExplicitlySupportedFeatures
val toFeatures = to.implicitlyAndExplicitlySupportedFeatures
val droppedFeature = Seq(droppedFeatureName).flatMap(TableFeature.featureNameToFeature)
(thisFeatures -- droppedFeature) == toFeatures
}

/**
Expand Down Expand Up @@ -368,13 +349,25 @@ trait TableFeatureSupport { this: Protocol =>
* features. After we remove the last native feature we downgrade the protocol to (1, 1).
*/
def downgradeProtocolVersionsIfNeeded: Protocol = {
if (!readerAndWriterFeatures.forall(_.isLegacyFeature)) return this
if (nativeReaderAndWriterFeatures.nonEmpty) {
val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
// It is guaranteed by the definitions of WriterFeature and ReaderFeature, that we cannot
// end up with invalid protocol versions such as (3, 3). Nevertheless,
// we double check it here.
val newProtocol =
Protocol(minReaderVersion, minWriterVersion).withFeatures(readerAndWriterFeatures)
assert(
newProtocol.supportsWriterFeatures,
s"Downgraded protocol should at least support writer features, but got $newProtocol.")
return newProtocol
}

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
val newProtocol = Protocol(minReaderVersion, minWriterVersion)

require(
assert(
!newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures,
s"Downgraded protocol should not support table features, but got $newProtocol.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3204,7 +3204,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
initialMinWriterVersion: Int,
featuresToAdd: Seq[TableFeature],
featuresToRemove: Seq[TableFeature],
expectedDowngradedProtocol: Protocol): Unit = {
expectedDowngradedProtocol: Protocol,
truncateHistory: Boolean = false): Unit = {
withTempDir { dir =>
val deltaLog = DeltaLog.forTable(spark, dir)

Expand All @@ -3225,8 +3226,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
|)""".stripMargin)

for (feature <- featuresToRemove) {
AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), feature.name)
.run(spark)
AlterTableDropFeatureDeltaCommand(
table = DeltaTableV2(spark, deltaLog.dataPath),
featureName = feature.name,
truncateHistory = truncateHistory).run(spark)
}
assert(deltaLog.update().protocol === expectedDowngradedProtocol)
}
Expand Down Expand Up @@ -3273,7 +3276,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Downgrade protocol version on table created with table features") {
test("Downgrade protocol version on table created with (3, 7)") {
// When the table is initialized with table features there are no active (implicit) legacy
// features. After removing the last table feature we downgrade back to (1, 1).
testProtocolVersionDowngrade(
Expand All @@ -3284,7 +3287,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = Protocol(1, 1))
}

test("Downgrade protocol version on table created with writer features") {
test("Downgrade protocol version on table created with (1, 7)") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 1,
initialMinWriterVersion = 7,
Expand Down Expand Up @@ -3346,7 +3349,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = protocolWithWriterFeature(DomainMetadataTableFeature))
}

test("Protocol version is not downgraded when reader+writer features exist") {
test("Protocol version is not downgraded when multiple reader+writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
Expand All @@ -3355,15 +3358,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = protocolWithReaderFeature(DeletionVectorsTableFeature))
}

test("Protocol version is not downgraded when both reader+writer and writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableReaderWriterFeature, TestRemovableWriterFeature),
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol =
Protocol(3, 7, Some(Set.empty), Some(Set(TestRemovableWriterFeature.name))))

test("Protocol version is not downgraded when reader+writer features exist") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
Expand Down Expand Up @@ -3401,6 +3396,50 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
}
}

for (truncateHistory <- BOOLEAN_DOMAIN)
test(s"Protocol version downgrade with Table Features - Basic test " +
s"truncateHistory: ${truncateHistory}") {
val expectedFeatures = Seq(RowTrackingFeature, DomainMetadataTableFeature)

testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures),
truncateHistory = truncateHistory)
}

for (truncateHistory <- BOOLEAN_DOMAIN)
test(s"Protocol version downgrade with Table Features - include legacy writer features: " +
s"truncateHistory: ${truncateHistory}") {
val expectedFeatures =
Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, AppendOnlyTableFeature)

testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures),
truncateHistory = truncateHistory)
}

for (truncateHistory <- BOOLEAN_DOMAIN)
test(s"Protocol version downgrade with Table Features - include legacy reader features: " +
s"truncateHistory: ${truncateHistory}") {
val expectedFeatures =
Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, ColumnMappingTableFeature)

testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature,
featuresToRemove = Seq(TestRemovableReaderWriterFeature),
expectedDowngradedProtocol = Protocol(2, 7).withFeatures(expectedFeatures),
truncateHistory = truncateHistory)
}

private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
s"`${V2CheckpointTableFeature.name}`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,11 @@ class DeltaTableFeatureSuite
test("protocol downgrade compatibility") {
val tableFeatureProtocol =
Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
// Cannot downgrade when the original protocol does not support at a minimum writer features.
assert(!Protocol(1, 6).canDowngradeTo(Protocol(1, 6), droppedFeatureName = ""))
assert(tableFeatureProtocol.withFeature(TestWriterFeature)
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name))
assert(Protocol(1, 7).withFeature(TestWriterFeature)
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name))
.canDowngradeTo(Protocol(1, 7), droppedFeatureName = TestWriterFeature.name))
// When there are no explicit features the protocol versions need to be downgraded
// below table features. The new protocol versions need to match exactly the supported
// legacy features.
for (n <- 1 to 3) {
assert(
!Protocol(n, 7)
Expand All @@ -291,31 +290,13 @@ class DeltaTableFeatureSuite
.withFeatures(Seq(TestWriterFeature, AppendOnlyTableFeature, InvariantsTableFeature))
.canDowngradeTo(Protocol(1, 2), droppedFeatureName = TestWriterFeature.name))
}
// When there are no explicit features the protocol versions need to be downgraded
// below table features.
assert(!tableFeatureProtocol.withFeature(TestWriterFeature)
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
assert(!tableFeatureProtocol.withFeature(TestWriterFeature)
.canDowngradeTo(Protocol(2, 7), droppedFeatureName = TestWriterFeature.name))
// Only one non-legacy writer feature per time.
assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestRemovableWriterFeature))
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
// Remove reader+writer feature.
assert(tableFeatureProtocol.withFeatures(Seq(TestReaderWriterFeature))
.canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestReaderWriterFeature.name))
// Only one non-legacy feature at a time - multiple reader+writer features.
assert(
!tableFeatureProtocol
.withFeatures(Seq(TestReaderWriterFeature, TestReaderWriterMetadataAutoUpdateFeature))
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = ""))
assert(
tableFeatureProtocol
.merge(Protocol(2, 5))
.withFeatures(Seq(TestReaderWriterFeature, TestRemovableLegacyReaderWriterFeature))
.canDowngradeTo(Protocol(2, 5), droppedFeatureName = TestReaderWriterFeature.name))
// Only one feature at a time - mix of reader+writer and writer features.
assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature))
.canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name))
// Downgraded protocol must be able to support all legacy table features.
assert(
!tableFeatureProtocol
Expand Down
Loading