-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51771][SQL] Add DSv2 APIs for ALTER TABLE ADD/DROP CONSTRAINT #50561
base: master
Are you sure you want to change the base?
Conversation
cc @aokolnychyi as well |
@@ -536,6 +536,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat | |||
|
|||
case a: AlterTableCommand if a.table.resolved => | |||
val table = a.table.asInstanceOf[ResolvedTable] | |||
ResolveTableConstraints.validateCatalogForTableChange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: there will be new tests for this one when the alter table with constraint
is supported end-to-end.
val newConstraint = add.getConstraint | ||
val existingConstraint = findExistingConstraint(newConstraint.name) | ||
if (existingConstraint.isDefined) { | ||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When do we define dedicated error methods in classes like QueryComplicationErrors
? Do we encourage external connectors to use built-in error classes in Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the spark errors are with error classes. This is a utility method for external developers to use directly.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
/** A TableChange to alter table and add a constraint. */ | ||
final class AddConstraint implements TableChange { | ||
private final Constraint constraint; | ||
private final boolean validate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During the discussion, the SPIP design evolved to be like this:
class AddConstraint implements TableChange {
private final Constraint constraint;
private final String validatedTableVersion; // if known and was validated
…
}
If Spark validated the constraint, it should set the validation status in the constraint to VALID
. The table version will be there, if known.
interface Table {
…
// reports the current table version, if supported
default String currentVersion() { return null; }
…
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I am a bit confused on this one. Shall we make it simple and let the external table catalog to track the validatedTableVersion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that Spark will construct a scan to do the validation. We need a way to pass which version of the table was scanned/validated back to the connector so that the connector can check changes that happened concurrently.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
@@ -811,6 +811,20 @@ | |||
}, | |||
"sqlState" : "XX000" | |||
}, | |||
"CONSTRAINT_ALREADY_EXISTS" : { | |||
"message" : [ | |||
"Constraint '<constraintName>' already exists. Please delete the existing constraint first.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is constraint name scoped to table? wondering should we also put tableName like the other message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is from alter table statement, which is usually simple and straightforward. I prefer keeping it as it is now
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
Outdated
Show resolved
Hide resolved
James Channel |
What changes were proposed in this pull request?
This PR adds the following DSv2 TableChange as per SPIP doc:
Why are the changes needed?
For constraints support in Spark
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests
Was this patch authored or co-authored using generative AI tooling?
No