diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json
index 6591a62151d4d..48b86589be426 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4070,6 +4070,12 @@
     ],
     "sqlState" : "HV091"
   },
+  "NON_DETERMINISTIC_CHECK_CONSTRAINT" : {
+    "message" : [
+      "The check constraint `<checkCondition>` is non-deterministic. Check constraints must only contain deterministic expressions."
+    ],
+    "sqlState" : "42621"
+  },
   "NON_FOLDABLE_ARGUMENT" : {
     "message" : [
       "The function <funcName> requires the parameter <paramName> to be a foldable expression of the type <paramType>, but the actual argument is a non-foldable."
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 61d4820884614..6c0ba6c74d8d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1138,6 +1138,12 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
       case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
         checkColumnNotExists("rename", col.path :+ newName, table.schema)
 
+      case AddConstraint(_: ResolvedTable, check: CheckConstraint) if !check.deterministic =>
+        check.child.failAnalysis(
+          errorClass = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
+          messageParameters = Map("checkCondition" -> check.condition)
+        )
+
       case AlterColumns(table: ResolvedTable, specs) =>
         val groupedColumns = specs.groupBy(_.column.name)
         groupedColumns.collect {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index ab9487aa66644..b426ad6165739 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -22,9 +22,11 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SqlScriptingLocalVariableManager
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces}
+import org.apache.spark.sql.catalyst.util.SparkCharVarcharUtils.replaceCharVarcharWithString
+import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.util.ArrayImplicits._
@@ -77,14 +79,19 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
       assertValidSessionVariableNameParts(nameParts, resolved)
       d.copy(name = resolved)
 
+    // For CREATE TABLE and REPLACE TABLE statements, resolve the table identifier and include
+    // the table columns as output. This allows expressions (e.g., constraints) referencing these
+    // columns to be resolved correctly.
+    case c @ CreateTable(UnresolvedIdentifier(nameParts, allowTemp), columns, _, _, _) =>
+      val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns)
+      c.copy(name = resolvedIdentifier)
+
+    case r @ ReplaceTable(UnresolvedIdentifier(nameParts, allowTemp), columns, _, _, _) =>
+      val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns)
+      r.copy(name = resolvedIdentifier)
+
     case UnresolvedIdentifier(nameParts, allowTemp) =>
-      if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) {
-        val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last)
-        ResolvedIdentifier(FakeSystemCatalog, ident)
-      } else {
-        val CatalogAndIdentifier(catalog, identifier) = nameParts
-        ResolvedIdentifier(catalog, identifier)
-      }
+      resolveIdentifier(nameParts, allowTemp, Nil)
 
     case CurrentNamespace =>
       ResolvedNamespace(currentCatalog, catalogManager.currentNamespace.toImmutableArraySeq)
@@ -94,6 +101,27 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
       resolveNamespace(catalog, ns, fetchMetadata)
   }
 
+  private def resolveIdentifier(
+      nameParts: Seq[String],
+      allowTemp: Boolean,
+      columns: Seq[ColumnDefinition]): ResolvedIdentifier = {
+    val columnOutput = columns.map { col =>
+      val dataType = if (conf.preserveCharVarcharTypeInfo) {
+        col.dataType
+      } else {
+        replaceCharVarcharWithString(col.dataType)
+      }
+      AttributeReference(col.name, dataType, col.nullable, col.metadata)()
+    }
+    if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) {
+      val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last)
+      ResolvedIdentifier(FakeSystemCatalog, ident, columnOutput)
+    } else {
+      val CatalogAndIdentifier(catalog, identifier) = nameParts
+      ResolvedIdentifier(catalog, identifier, columnOutput)
+    }
+  }
+
   private def resolveNamespace(
       catalog: CatalogPlugin,
       ns: Seq[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
index 05158fbee3de6..2b66ef503822c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkThrowable
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -61,7 +61,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
       input: LogicalPlan,
       tableSpec: TableSpecBase,
       withNewSpec: TableSpecBase => LogicalPlan): LogicalPlan = tableSpec match {
-    case u: UnresolvedTableSpec if u.optionExpression.resolved =>
+    case u: UnresolvedTableSpec if u.childrenResolved =>
       val newOptions: Seq[(String, String)] = u.optionExpression.options.map {
         case (key: String, null) =>
           (key, null)
@@ -86,6 +86,18 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
           }
           (key, newValue)
       }
+
+      u.constraints.foreach {
+        case check: CheckConstraint =>
+          if (!check.child.deterministic) {
+            check.child.failAnalysis(
+              errorClass = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
+              messageParameters = Map("checkCondition" -> check.condition)
+            )
+          }
+        case _ =>
+      }
+
       val newTableSpec = TableSpec(
         properties = u.properties,
         provider = u.provider,
@@ -94,7 +106,8 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
         comment = u.comment,
         collation = u.collation,
         serde = u.serde,
-        external = u.external)
+        external = u.external,
+        constraints = u.constraints.map(_.toV2Constraint))
       withNewSpec(newTableSpec)
     case _ =>
       input
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 6f657e931a49e..b52091afc1333 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -252,8 +252,13 @@ case class ResolvedNonPersistentFunc(
  */
 case class ResolvedIdentifier(
     catalog: CatalogPlugin,
-    identifier: Identifier) extends LeafNodeWithoutStats {
-  override def output: Seq[Attribute] = Nil
+    identifier: Identifier,
+    override val output: Seq[Attribute] = Nil) extends LeafNodeWithoutStats
+
+object ResolvedIdentifier {
+  def unapply(ri: ResolvedIdentifier): Option[(CatalogPlugin, Identifier)] = {
+    Some((ri.catalog, ri.identifier))
+  }
 }
 
 // A fake v2 catalog to hold temp views.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
index 1a52723669358..15b8b7e2e7319 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
@@ -18,11 +18,17 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.UUID
 
+import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
+import org.apache.spark.sql.connector.catalog.constraints.Constraint
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.types.DataType
 
-trait TableConstraint {
+trait TableConstraint extends Expression with Unevaluable {
+  /** Convert to a data source v2 constraint */
+  def toV2Constraint: Constraint
 
   /** Returns the user-provided name of the constraint */
   def userProvidedName: String
@@ -92,6 +98,10 @@ trait TableConstraint {
       )
     }
   }
+
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+
+  override def dataType: DataType = throw new UnresolvedException("dataType")
 }
 
 case class ConstraintCharacteristic(enforced: Option[Boolean], rely: Option[Boolean])
@@ -108,10 +118,25 @@ case class CheckConstraint(
     override val tableName: String = null,
     override val userProvidedCharacteristic: ConstraintCharacteristic = ConstraintCharacteristic.empty)
   extends UnaryExpression
-  with Unevaluable
   with TableConstraint {
 // scalastyle:on line.size.limit
 
+  def toV2Constraint: Constraint = {
+    val predicate = new V2ExpressionBuilder(child, true).buildPredicate().orNull
+    val enforced = userProvidedCharacteristic.enforced.getOrElse(true)
+    val rely = userProvidedCharacteristic.rely.getOrElse(false)
+    // TODO(SPARK-51903): Change the status to VALIDATED when we support validation on ALTER TABLE
+    val validateStatus = Constraint.ValidationStatus.UNVALIDATED
+    Constraint
+      .check(name)
+      .predicateSql(condition)
+      .predicate(predicate)
+      .rely(rely)
+      .enforced(enforced)
+      .validationStatus(validateStatus)
+      .build()
+  }
+
   override protected def withNewChildInternal(newChild: Expression): Expression =
     copy(child = newChild)
 
@@ -121,8 +146,6 @@ case class CheckConstraint(
 
   override def sql: String = s"CONSTRAINT $userProvidedName CHECK ($condition)"
 
-  override def dataType: DataType = StringType
-
   override def withUserProvidedName(name: String): TableConstraint = copy(userProvidedName = name)
 
   override def withTableName(tableName: String): TableConstraint = copy(tableName = tableName)
@@ -137,9 +160,20 @@ case class PrimaryKeyConstraint(
     override val userProvidedName: String = null,
     override val tableName: String = null,
     override val userProvidedCharacteristic: ConstraintCharacteristic = ConstraintCharacteristic.empty)
-  extends TableConstraint {
+  extends LeafExpression with TableConstraint {
 // scalastyle:on line.size.limit
 
+  override def toV2Constraint: Constraint = {
+    val enforced = userProvidedCharacteristic.enforced.getOrElse(false)
+    val rely = userProvidedCharacteristic.rely.getOrElse(false)
+    Constraint
+      .primaryKey(name, columns.map(FieldReference.column).toArray)
+      .rely(rely)
+      .enforced(enforced)
+      .validationStatus(Constraint.ValidationStatus.UNVALIDATED)
+      .build()
+  }
+
   override protected def generateName(tableName: String): String = s"${tableName}_pk"
 
   override def withUserProvidedName(name: String): TableConstraint = copy(userProvidedName = name)
@@ -158,9 +192,20 @@ case class UniqueConstraint(
     override val userProvidedName: String = null,
     override val tableName: String = null,
     override val userProvidedCharacteristic: ConstraintCharacteristic = ConstraintCharacteristic.empty)
-    extends TableConstraint {
+  extends LeafExpression with TableConstraint {
 // scalastyle:on line.size.limit
 
+  override def toV2Constraint: Constraint = {
+    val enforced = userProvidedCharacteristic.enforced.getOrElse(false)
+    val rely = userProvidedCharacteristic.rely.getOrElse(false)
+    Constraint
+      .unique(name, columns.map(FieldReference.column).toArray)
+      .rely(rely)
+      .enforced(enforced)
+      .validationStatus(Constraint.ValidationStatus.UNVALIDATED)
+      .build()
+  }
+
   override protected def generateName(tableName: String): String = {
     s"${tableName}_uniq_$randomSuffix"
   }
@@ -183,9 +228,25 @@ case class ForeignKeyConstraint(
     override val userProvidedName: String = null,
     override val tableName: String = null,
     override val userProvidedCharacteristic: ConstraintCharacteristic = ConstraintCharacteristic.empty)
-  extends TableConstraint {
+  extends LeafExpression with TableConstraint {
 // scalastyle:on line.size.limit
 
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def toV2Constraint: Constraint = {
+    val enforced = userProvidedCharacteristic.enforced.getOrElse(false)
+    val rely = userProvidedCharacteristic.rely.getOrElse(false)
+    Constraint
+      .foreignKey(name,
+        childColumns.map(FieldReference.column).toArray,
+        parentTableId.asIdentifier,
+        parentColumns.map(FieldReference.column).toArray)
+      .rely(rely)
+      .enforced(enforced)
+      .validationStatus(Constraint.ValidationStatus.UNVALIDATED)
+      .build()
+  }
+
   override protected def generateName(tableName: String): String =
     s"${tableName}_${parentTableId.last}_fk_$randomSuffix"
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 2b2f9df6abf52..bc3e86351b11b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, ResolvedFieldName, ResolvedTable, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, TableConstraint, Unevaluable}
@@ -295,7 +295,16 @@ case class AlterTableCollation(
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
-  override def changes: Seq[TableChange] = Seq.empty
+  override def changes: Seq[TableChange] = {
+    val constraint = tableConstraint.toV2Constraint
+    val validatedTableVersion = table match {
+      case t: ResolvedTable if constraint.enforced() =>
+        t.table.currentVersion()
+      case _ =>
+        null
+    }
+    Seq(TableChange.addConstraint(constraint, validatedTableVersion))
+  }
 
   protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
 }
@@ -308,7 +317,8 @@ case class DropConstraint(
     name: String,
     ifExists: Boolean,
     cascade: Boolean) extends AlterTableCommand {
-  override def changes: Seq[TableChange] = Seq.empty
+  override def changes: Seq[TableChange] =
+    Seq(TableChange.dropConstraint(name, ifExists, cascade))
 
   protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index e0d44e7d248ed..184ba22bd08fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1505,19 +1505,25 @@ case class UnresolvedTableSpec(
     serde: Option[SerdeInfo],
     external: Boolean,
     constraints: Seq[TableConstraint])
-  extends UnaryExpression with Unevaluable with TableSpecBase {
+  extends Expression with Unevaluable with TableSpecBase {
 
   override def dataType: DataType =
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3113")
 
-  override def child: Expression = optionExpression
-
-  override protected def withNewChildInternal(newChild: Expression): Expression =
-    this.copy(optionExpression = newChild.asInstanceOf[OptionList])
-
   override def simpleString(maxFields: Int): String = {
     this.copy(properties = Utils.redact(properties).toMap).toString
   }
+
+  override def nullable: Boolean = true
+
+  override def children: Seq[Expression] = optionExpression +: constraints
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): Expression = {
+    copy(
+      optionExpression = newChildren.head.asInstanceOf[OptionList],
+      constraints = newChildren.tail.asInstanceOf[Seq[TableConstraint]])
+  }
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
index f0812245bcce0..3298a56d95992 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
@@ -47,6 +47,7 @@ case class CreateTableExec(
           .withColumns(columns)
           .withPartitions(partitioning.toArray)
           .withProperties(tableProperties.asJava)
+          .withConstraints(tableSpec.constraints.toArray)
           .build()
         catalog.createTable(identifier, tableInfo)
       } catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index 51f5c848bd27b..a819f836a1dd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -52,6 +52,7 @@ case class ReplaceTableExec(
       .withColumns(columns)
       .withPartitions(partitioning.toArray)
       .withProperties(tableProperties.asJava)
+      .withConstraints(tableSpec.constraints.toArray)
       .build()
     catalog.createTable(ident, tableInfo)
     Seq.empty
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
index aa82ac57089f0..a98a7faf4884f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala
@@ -70,16 +70,18 @@ class TableOptionsConstantFoldingSuite extends QueryTest with SharedSparkSession
     checkError(
       exception = intercept[AnalysisException](
         sql(s"$prefix ('k' = 1 + 2 + unresolvedAttribute)")),
-      condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
       parameters = Map(
-        "objectName" -> "`unresolvedAttribute`"),
+        "objectName" -> "`unresolvedAttribute`",
+        "proposal" -> "`col`"),
       queryContext = Array(ExpectedContext("", "", 60, 78, "unresolvedAttribute")))
     checkError(
       exception = intercept[AnalysisException](
         sql(s"$prefix ('k' = true or false or unresolvedAttribute)")),
-      condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
       parameters = Map(
-        "objectName" -> "`unresolvedAttribute`"),
+        "objectName" -> "`unresolvedAttribute`",
+        "proposal" -> "`col`"),
       queryContext = Array(ExpectedContext("", "", 69, 87, "unresolvedAttribute")))
     checkError(
       exception = intercept[AnalysisException](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
index a34d126f6f643..2df42b1b429e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
@@ -39,6 +39,8 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
     userProvidedName = "c2",
     tableName = "t")
 
+  val unnamedConstraint = constraint1.withUserProvidedName(null)
+
   test("Create table with one check constraint - table level") {
     val sql = "CREATE TABLE t (a INT, b STRING, CONSTRAINT c1 CHECK (a > 0)) USING parquet"
     verifyConstraints(sql, Seq(constraint1))
@@ -56,8 +58,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
       case (enforcedStr, relyStr, characteristic) =>
         val sql = s"CREATE TABLE t (a INT, b STRING, CONSTRAINT c1 CHECK (a > 0) " +
           s"$enforcedStr $relyStr) USING parquet"
-        val constraint = constraint1.withUserProvidedName("c1")
-          .withUserProvidedCharacteristic(characteristic)
+        val constraint = constraint1.withUserProvidedCharacteristic(characteristic)
         verifyConstraints(sql, Seq(constraint))
     }
   }
@@ -102,8 +103,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
       case (enforcedStr, relyStr, characteristic) =>
         val sql = s"CREATE TABLE t (a INT CONSTRAINT c1 CHECK (a > 0)" +
           s" $enforcedStr $relyStr, b STRING) USING parquet"
-        val constraint = constraint1.withUserProvidedName("c1")
-          .withUserProvidedCharacteristic(characteristic)
+        val constraint = constraint1.withUserProvidedCharacteristic(characteristic)
         verifyConstraints(sql, Seq(constraint))
     }
   }
@@ -151,8 +151,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
       case (enforcedStr, relyStr, characteristic) =>
         val sql = s"REPLACE TABLE t (a INT, b STRING, CONSTRAINT c1 CHECK (a > 0) " +
           s"$enforcedStr $relyStr) USING parquet"
-        val constraint = constraint1.withUserProvidedName("c1")
-          .withUserProvidedCharacteristic(characteristic)
+        val constraint = constraint1.withUserProvidedCharacteristic(characteristic)
         verifyConstraints(sql, Seq(constraint), isCreateTable = false)
     }
   }
@@ -271,7 +270,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
         case c: CreateTable =>
           val tableSpec = c.tableSpec.asInstanceOf[UnresolvedTableSpec]
           assert(tableSpec.constraints.size == 1)
-          assert(tableSpec.constraints.head == constraint1.withUserProvidedName(null))
+          assert(tableSpec.constraints.head == unnamedConstraint)
           assert(tableSpec.constraints.head.name.matches("t_chk_[0-9a-f]{7}"))
 
         case other =>
@@ -290,7 +289,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
         case c: ReplaceTable =>
           val tableSpec = c.tableSpec.asInstanceOf[UnresolvedTableSpec]
           assert(tableSpec.constraints.size == 1)
-          assert(tableSpec.constraints.head == constraint1.withUserProvidedName(null))
+          assert(tableSpec.constraints.head == unnamedConstraint)
           assert(tableSpec.constraints.head.name.matches("t_chk_[0-9a-f]{7}"))
 
         case other =>
@@ -309,7 +308,7 @@ class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
       case a: AddConstraint =>
         val table = a.table.asInstanceOf[UnresolvedTable]
         assert(table.multipartIdentifier == Seq("a", "b", "t"))
-        assert(a.tableConstraint == constraint1.withUserProvidedName(null))
+        assert(a.tableConstraint == unnamedConstraint)
         assert(a.tableConstraint.name.matches("t_chk_[0-9a-f]{7}"))
 
       case other =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ConstraintParseSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ConstraintParseSuiteBase.scala
index ea369489eb181..8b5ddb506f786 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ConstraintParseSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ConstraintParseSuiteBase.scala
@@ -56,12 +56,12 @@ abstract class ConstraintParseSuiteBase extends AnalysisTest with SharedSparkSes
 
   protected def createExpectedPlan(
       columns: Seq[ColumnDefinition],
-      constraints: Seq[TableConstraint],
+      tableConstraints: Seq[TableConstraint],
       isCreateTable: Boolean = true): LogicalPlan = {
     val tableId = UnresolvedIdentifier(Seq("t"))
     val tableSpec = UnresolvedTableSpec(
       Map.empty[String, String], Some("parquet"), OptionList(Seq.empty),
-      None, None, None, None, false, constraints)
+      None, None, None, None, false, tableConstraints)
     if (isCreateTable) {
       CreateTable(tableId, columns, Seq.empty, tableSpec, false)
     } else {
@@ -79,7 +79,7 @@ abstract class ConstraintParseSuiteBase extends AnalysisTest with SharedSparkSes
       ColumnDefinition("b", StringType)
     )
     val expected = createExpectedPlan(
-      columns = columns, constraints = constraints, isCreateTable = isCreateTable)
+      columns = columns, tableConstraints = constraints, isCreateTable = isCreateTable)
     comparePlans(parsed, expected)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
new file mode 100644
index 0000000000000..3b48430604e86
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.command.DDLCommandTestUtils
+import org.apache.spark.sql.internal.SQLConf
+
+class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLCommandTestUtils {
+  override protected def command: String = "Check CONSTRAINT"
+
+  test("Nondeterministic expression -- alter table") {
+    withTable("t") {
+      sql("create table t(i double)")
+      val query =
+        """
+          |ALTER TABLE t ADD CONSTRAINT c1 CHECK (i > rand(0))
+          |""".stripMargin
+      val error = intercept[AnalysisException] {
+        sql(query)
+      }
+      checkError(
+        exception = error,
+        condition = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
+        sqlState = "42621",
+        parameters = Map("checkCondition" -> "i > rand(0)"),
+        context = ExpectedContext(
+          fragment = "i > rand(0)",
+          start = 40,
+          stop = 50
+        )
+      )
+    }
+  }
+
+  test("Nondeterministic expression -- create table") {
+    Seq(
+      "CREATE TABLE t(i DOUBLE CHECK (i > rand(0)))",
+      "CREATE TABLE t(i DOUBLE, CONSTRAINT c1 CHECK (i > rand(0)))",
+      "REPLACE TABLE t(i DOUBLE CHECK (i > rand(0)))",
+      "REPLACE TABLE t(i DOUBLE, CONSTRAINT c1 CHECK (i > rand(0)))"
+    ).foreach { query =>
+      withTable("t") {
+        val error = intercept[AnalysisException] {
+          sql(query)
+        }
+        checkError(
+          exception = error,
+          condition = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
+          sqlState = "42621",
+          parameters = Map("checkCondition" -> "i > rand(0)"),
+          context = ExpectedContext(
+            fragment = "i > rand(0)"
+          )
+        )
+      }
+    }
+  }
+
+  test("Expression referring a column of another table -- alter table") {
+    withTable("t", "t2") {
+      sql("CREATE TABLE t(i DOUBLE) USING parquet")
+      sql("CREATE TABLE t2(j STRING) USING parquet")
+      val query =
+        """
+          |ALTER TABLE t ADD CONSTRAINT c1 CHECK (len(t2.j) > 0)
+          |""".stripMargin
+      val error = intercept[AnalysisException] {
+        sql(query)
+      }
+      checkError(
+        exception = error,
+        condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        sqlState = "42703",
+        parameters = Map("objectName" -> "`t2`.`j`", "proposal" -> "`t`.`i`"),
+        context = ExpectedContext(
+          fragment = "t2.j",
+          start = 44,
+          stop = 47
+        )
+      )
+    }
+  }
+
+  test("Expression referring a column of another table -- create and replace table") {
+    withTable("t", "t2") {
+      sql("CREATE TABLE t(i double) USING parquet")
+      val query = "CREATE TABLE t2(j string check(t.i > 0)) USING parquet"
+      val error = intercept[AnalysisException] {
+        sql(query)
+      }
+      checkError(
+        exception = error,
+        condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        sqlState = "42703",
+        parameters = Map("objectName" -> "`t`.`i`", "proposal" -> "`j`"),
+        context = ExpectedContext(
+          fragment = "t.i"
+        )
+      )
+    }
+  }
+
+  private def getCheckConstraint(table: Table): Check = {
+    assert(table.constraints.length == 1)
+    assert(table.constraints.head.isInstanceOf[Check])
+    table.constraints.head.asInstanceOf[Check]
+    table.constraints.head.asInstanceOf[Check]
+  }
+
+  test("Predicate should be null if it can't be converted to V2 predicate") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, j string) $defaultUsing")
+      sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (from_json(j, 'a INT').a > 1)")
+      val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+      val constraint = getCheckConstraint(table)
+      assert(constraint.name() == "c1")
+      assert(constraint.toDDL ==
+        "CONSTRAINT c1 CHECK (from_json(j, 'a INT').a > 1) ENFORCED UNVALIDATED NORELY")
+      assert(constraint.predicateSql() == "from_json(j, 'a INT').a > 1")
+      assert(constraint.predicate() == null)
+    }
+  }
+
+  def getConstraintCharacteristics(): Seq[(String, String)] = {
+    val validStatus = "UNVALIDATED"
+    Seq(
+      ("", s"ENFORCED $validStatus NORELY"),
+      ("NOT ENFORCED", s"NOT ENFORCED $validStatus NORELY"),
+      ("NOT ENFORCED NORELY", s"NOT ENFORCED $validStatus NORELY"),
+      ("NORELY NOT ENFORCED", s"NOT ENFORCED $validStatus NORELY"),
+      ("NORELY", s"ENFORCED $validStatus NORELY"),
+      ("NOT ENFORCED RELY", s"NOT ENFORCED $validStatus RELY"),
+      ("RELY NOT ENFORCED", s"NOT ENFORCED $validStatus RELY"),
+      ("NOT ENFORCED RELY", s"NOT ENFORCED $validStatus RELY"),
+      ("RELY NOT ENFORCED", s"NOT ENFORCED $validStatus RELY"),
+      ("RELY", s"ENFORCED $validStatus RELY")
+    )
+  }
+
+  test("Create table with check constraint") {
+    getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        val constraint = getCheckConstraint(table)
+        assert(constraint.name() == "c1")
+        assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) $expectedDDL")
+      }
+    }
+  }
+
+  test("Create table with check constraint: char/varchar type") {
+    Seq(true, false).foreach { preserveCharVarcharTypeInfo =>
+      withSQLConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key ->
+        preserveCharVarcharTypeInfo.toString) {
+        Seq("CHAR(10)", "VARCHAR(10)").foreach { dt =>
+          withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+            val constraintStr = "CONSTRAINT c1 CHECK (LENGTH(name) > 0)"
+            sql(s"CREATE TABLE $t (id bigint, name $dt, $constraintStr) $defaultUsing")
+            val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+            val constraint = getCheckConstraint(table)
+            assert(constraint.name() == "c1")
+            assert(constraint.toDDL ==
+              s"CONSTRAINT c1 CHECK (LENGTH(name) > 0) ENFORCED UNVALIDATED NORELY")
+            assert(constraint.predicateSql() == "LENGTH(name) > 0")
+          }
+        }
+      }
+    }
+  }
+
+  test("Replace table with check constraint") {
+    getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        val constraint = getCheckConstraint(table)
+        assert(constraint.name() == "c1")
+        assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) $expectedDDL")
+      }
+    }
+  }
+
+  test("Alter table add check constraint") {
+    getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+        assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+        sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        val constraint = getCheckConstraint(table)
+        assert(constraint.name() == "c1")
+        assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) $expectedDDL")
+      }
+    }
+  }
+
+  test("Add duplicated check constraint") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT abc CHECK (id > 0)")
+      // Constraint names are case-insensitive
+      Seq("abc", "ABC").foreach { name =>
+        val error = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t ADD CONSTRAINT $name CHECK (id>0)")
+        }
+        checkError(
+          exception = error,
+          condition = "CONSTRAINT_ALREADY_EXISTS",
+          sqlState = "42710",
+          parameters = Map("constraintName" -> "abc",
+            "oldConstraint" -> "CONSTRAINT abc CHECK (id > 0) ENFORCED UNVALIDATED NORELY")
+        )
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
index 6ba60e245f9b4..76928faec3189 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.v2
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier, InMemoryCatalog, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
+import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.ArrayImplicits._
 
@@ -34,12 +34,13 @@ trait CommandSuiteBase extends SharedSparkSession {
   def catalogVersion: String = "V2" // The catalog version is added to test names
   def commandVersion: String = "V2" // The command version is added to test names
   def catalog: String = "test_catalog" // The default V2 catalog for testing
+  def nonPartitionCatalog: String = "non_part_test_catalog" // Catalog for non-partitioned tables
   def defaultUsing: String = "USING _" // The clause is used in creating v2 tables under testing
 
   // V2 catalogs created and used especially for testing
   override def sparkConf: SparkConf = super.sparkConf
     .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
-    .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
+    .set(s"spark.sql.catalog.$nonPartitionCatalog", classOf[InMemoryTableCatalog].getName)
     .set(s"spark.sql.catalog.fun_$catalog", classOf[InMemoryCatalog].getName)
 
   def checkLocation(
@@ -64,4 +65,11 @@ trait CommandSuiteBase extends SharedSparkSession {
     assert(partMetadata.containsKey("location"))
     assert(partMetadata.get("location") === expected)
   }
+
+  def loadTable(catalog: String, schema : String, table: String): InMemoryTable = {
+    import CatalogV2Implicits._
+    val catalogPlugin = spark.sessionState.catalogManager.catalog(catalog)
+    catalogPlugin.asTableCatalog.loadTable(Identifier.of(Array(schema), table))
+      .asInstanceOf[InMemoryTable]
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropConstraintSuite.scala
new file mode 100644
index 0000000000000..f492e18a6e529
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropConstraintSuite.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.connector.catalog.constraints.Check
+import org.apache.spark.sql.execution.command.DDLCommandTestUtils
+
+class DropConstraintSuite extends QueryTest with CommandSuiteBase with DDLCommandTestUtils {
+  override protected def command: String = "ALTER TABLE .. DROP CONSTRAINT"
+
+  test("Drop a non-exist constraint") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE $t DROP CONSTRAINT c1")
+      }
+      checkError(
+        exception = e,
+        condition = "CONSTRAINT_DOES_NOT_EXIST",
+        sqlState = "42704",
+        parameters = Map("constraintName" -> "c1", "tableName" -> "test_catalog.ns.tbl")
+      )
+    }
+  }
+
+  test("Drop a non-exist constraint if exists") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      sql(s"ALTER TABLE $t DROP CONSTRAINT IF EXISTS c1")
+    }
+  }
+
+  test("Drop a constraint on a non-exist table") {
+    Seq("", "IF EXISTS").foreach { ifExists =>
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE test_catalog.ns.tbl DROP CONSTRAINT $ifExists c1")
+      }
+      checkError(
+        exception = e,
+        condition = "TABLE_OR_VIEW_NOT_FOUND",
+        sqlState = "42P01",
+        parameters = Map("relationName" -> "`test_catalog`.`ns`.`tbl`"),
+        context = ExpectedContext(
+          fragment = "test_catalog.ns.tbl",
+          start = 12,
+          stop = 30
+        )
+      )
+    }
+  }
+
+  test("Drop existing constraints") {
+    Seq("", "IF EXISTS").foreach { ifExists =>
+      withNamespaceAndTable("ns", "tbl", catalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+        sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0)")
+        sql(s"ALTER TABLE $t ADD CONSTRAINT c2 CHECK (len(data) > 0)")
+        sql(s"ALTER TABLE $t DROP CONSTRAINT $ifExists c1")
+        val table = loadTable(catalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head.asInstanceOf[Check]
+        assert(constraint.name() == "c2")
+
+        sql(s"ALTER TABLE $t DROP CONSTRAINT $ifExists c2")
+        val table2 = loadTable(catalog, "ns", "tbl")
+        assert(table2.constraints.length == 0)
+      }
+    }
+  }
+
+  test("Drop constraint is case insensitive") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      sql(s"ALTER TABLE $t ADD CONSTRAINT abc CHECK (id > 0)")
+      sql(s"ALTER TABLE $t DROP CONSTRAINT aBC")
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
new file mode 100644
index 0000000000000..02646a3cfcbb0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.execution.command.DDLCommandTestUtils
+
+class ForeignKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDLCommandTestUtils {
+  override protected def command: String = "FOREIGN KEY CONSTRAINT"
+
+  private val validConstraintCharacteristics = Seq(
+    ("", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("RELY", "NOT ENFORCED UNVALIDATED RELY")
+  )
+
+  test("Add foreign key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", catalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing")
+        sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+        assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+        sql(s"ALTER TABLE $t ADD CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES ${t}_ref(id) $characteristic")
+        val table = loadTable(catalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "fk1")
+        assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES test_catalog.ns.tbl_ref (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Create table with foreign key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+        val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES ${t}_ref(id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint, fk bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "fk1")
+        assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES non_part_test_catalog.ns.tbl_ref (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("REPLACE table with foreign key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+        val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES ${t}_ref(id) $characteristic"
+        sql(s"REPLACE TABLE $t (id bigint, fk bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "fk1")
+        assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES non_part_test_catalog.ns.tbl_ref (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Add duplicated foreign key constraint") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing")
+      sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+      assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT fk1 FOREIGN KEY (fk) REFERENCES ${t}_ref(id)")
+      // Constraint names are case-insensitive
+      Seq("fk1", "FK1").foreach { name =>
+        val error = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t ADD CONSTRAINT $name FOREIGN KEY (fk) REFERENCES ${t}_ref(id)")
+        }
+        checkError(
+          exception = error,
+          condition = "CONSTRAINT_ALREADY_EXISTS",
+          sqlState = "42710",
+          parameters = Map("constraintName" -> "fk1",
+            "oldConstraint" ->
+              ("CONSTRAINT fk1 FOREIGN KEY (fk) " +
+                "REFERENCES test_catalog.ns.tbl_ref (id) NOT ENFORCED UNVALIDATED NORELY"))
+        )
+      }
+    }
+  }
+
+  test("Add foreign key constraint with multiple columns") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id1 bigint, id2 bigint, fk1 bigint, fk2 bigint, data string) " +
+        s"$defaultUsing")
+      sql(s"CREATE TABLE ${t}_ref (id1 bigint, id2 bigint, data string) $defaultUsing")
+      assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT fk1 FOREIGN KEY (fk1, fk2) REFERENCES ${t}_ref(id1, id2)")
+      val table = loadTable(catalog, "ns", "tbl")
+      assert(table.constraints.length == 1)
+      val constraint = table.constraints.head
+      assert(constraint.name() == "fk1")
+      assert(constraint.toDDL ==
+        s"CONSTRAINT fk1 FOREIGN KEY (fk1, fk2) " +
+          s"REFERENCES test_catalog.ns.tbl_ref (id1, id2) NOT ENFORCED UNVALIDATED NORELY")
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
new file mode 100644
index 0000000000000..a4785e953a2d8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.execution.command.DDLCommandTestUtils
+
+class PrimaryKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDLCommandTestUtils {
+  override protected def command: String = "PRIMARY KEY CONSTRAINT"
+
+  private val validConstraintCharacteristics = Seq(
+    ("", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("RELY", "NOT ENFORCED UNVALIDATED RELY")
+  )
+
+  test("Add primary key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+        assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+        sql(s"ALTER TABLE $t ADD CONSTRAINT pk1 PRIMARY KEY (id) $characteristic")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "pk1")
+        assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Create table with primary key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "pk1")
+        assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Replace table with primary key constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "pk1")
+        assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Add duplicated primary key constraint") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT pk1 PRIMARY KEY (id)")
+      // Constraint names are case-insensitive
+      Seq("pk1", "PK1").foreach { name =>
+        val error = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t ADD CONSTRAINT $name PRIMARY KEY (id)")
+        }
+        checkError(
+          exception = error,
+          condition = "CONSTRAINT_ALREADY_EXISTS",
+          sqlState = "42710",
+          parameters = Map("constraintName" -> "pk1",
+            "oldConstraint" -> "CONSTRAINT pk1 PRIMARY KEY (id) NOT ENFORCED UNVALIDATED NORELY")
+        )
+      }
+    }
+  }
+
+  test("Add primary key constraint with multiple columns") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id1 bigint, id2 bigint, data string) $defaultUsing")
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT pk1 PRIMARY KEY (id1, id2)")
+      val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+      assert(table.constraints.length == 1)
+      val constraint = table.constraints.head
+      assert(constraint.name() == "pk1")
+      assert(constraint.toDDL ==
+        "CONSTRAINT pk1 PRIMARY KEY (id1, id2) NOT ENFORCED UNVALIDATED NORELY")
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
new file mode 100644
index 0000000000000..9446cbc6ade22
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.execution.command.DDLCommandTestUtils
+
+class UniqueConstraintSuite extends QueryTest with CommandSuiteBase with DDLCommandTestUtils {
+  override protected def command: String = "UNIQUE CONSTRAINT"
+
+  private val validConstraintCharacteristics = Seq(
+    ("", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NOT ENFORCED NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY NOT ENFORCED", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("NORELY", "NOT ENFORCED UNVALIDATED NORELY"),
+    ("RELY", "NOT ENFORCED UNVALIDATED RELY")
+  )
+
+  test("Add unique constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", catalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+        assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+        sql(s"ALTER TABLE $t ADD CONSTRAINT uk1 UNIQUE (id) $characteristic")
+        val table = loadTable(catalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "uk1")
+        assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Create table with unique constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "uk1")
+        assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Replace table with unique constraint") {
+    validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
+        val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "uk1")
+        assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
+      }
+    }
+  }
+
+  test("Add duplicated unique constraint") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+      assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT uk1 UNIQUE (id)")
+      // Constraint names are case-insensitive
+      Seq("uk1", "UK1").foreach { name =>
+        val error = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t ADD CONSTRAINT $name UNIQUE (id)")
+        }
+        checkError(
+          exception = error,
+          condition = "CONSTRAINT_ALREADY_EXISTS",
+          sqlState = "42710",
+          parameters = Map("constraintName" -> "uk1",
+            "oldConstraint" -> "CONSTRAINT uk1 UNIQUE (id) NOT ENFORCED UNVALIDATED NORELY")
+        )
+      }
+    }
+  }
+
+  test("Add unique constraint with multiple columns") {
+    withNamespaceAndTable("ns", "tbl", catalog) { t =>
+      sql(s"CREATE TABLE $t (id1 bigint, id2 bigint, data string) $defaultUsing")
+      assert(loadTable(catalog, "ns", "tbl").constraints.isEmpty)
+
+      sql(s"ALTER TABLE $t ADD CONSTRAINT uk1 UNIQUE (id1, id2)")
+      val table = loadTable(catalog, "ns", "tbl")
+      assert(table.constraints.length == 1)
+      val constraint = table.constraints.head
+      assert(constraint.name() == "uk1")
+      assert(constraint.toDDL ==
+        "CONSTRAINT uk1 UNIQUE (id1, id2) NOT ENFORCED UNVALIDATED NORELY")
+    }
+  }
+}