From 1388fa1e7d35856b1964af3356fa4b90a220b7bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 22 Jan 2026 15:15:12 +0800 Subject: [PATCH 1/4] support set/remove table properties --- .../org/apache/fluss/spark/SparkCatalog.scala | 23 ++++++++++++++- .../apache/fluss/spark/SparkConversions.scala | 18 ++++++++++-- .../apache/fluss/spark/SparkCatalogTest.scala | 28 +++++++++++++++++++ 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 842ef9b395..1a80b234b5 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -80,7 +80,28 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - throw new UnsupportedOperationException("Altering table is not supported") + if ( + !changes.forall( + e => e.isInstanceOf[TableChange.SetProperty] || e.isInstanceOf[TableChange.RemoveProperty]) + ) { + throw new IllegalArgumentException( + "Altering table only supports set or remove properties for now") + } + try { + admin + .alterTable(toTablePath(ident), SparkConversions.toFlussTableChanges(changes).asJava, false) + .get() + loadTable(ident) + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[TableNotExistException]) { + throw new NoSuchTableException(ident) + } else { + throw e + } + case e: UnsupportedOperationException => + throw new IllegalArgumentException(e) + } } override def dropTable(ident: Identifier): Boolean = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala index f85e33f81e..eeb726678d 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -25,6 +25,7 @@ import org.apache.fluss.types.RowType import org.apache.spark.sql.FlussIdentityTransform import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -54,7 +55,7 @@ object SparkConversions { tableDescriptorBuilder.partitionedBy(partitionKey: _*) val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { - val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",").map(_.trim) schemaBuilder.primaryKey(pks: _*) pks } else { @@ -64,7 +65,7 @@ object SparkConversions { if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { - caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",").map(_.trim) } else { primaryKeys.filterNot(partitionKey.contains) } @@ -76,7 +77,7 @@ object SparkConversions { } val (tableProps, customProps) = - caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + caseInsensitiveProps.filterNot(e => SPARK_TABLE_OPTIONS.contains(e._1)).partition { case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX) } @@ -97,4 +98,15 @@ object SparkConversions { } partitionKeys.toArray } + + def toFlussTableChanges(changes: Seq[TableChange]): Seq[org.apache.fluss.metadata.TableChange] = { + changes.map { + case p: TableChange.SetProperty => + org.apache.fluss.metadata.TableChange.set(p.property(), p.value()) + case p: TableChange.RemoveProperty => + org.apache.fluss.metadata.TableChange.reset(p.property()) + // TODO Add full support for table changes + case _ => throw new UnsupportedOperationException("Unsupported table change") + } + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index 4c4bec355a..1805f3fc5c 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -17,6 +17,7 @@ package org.apache.fluss.spark +import org.apache.fluss.config.ConfigOptions import org.apache.fluss.metadata._ import org.apache.fluss.types.{DataTypes, RowType} @@ -192,6 +193,33 @@ class SparkCatalogTest extends FlussSparkTestBase { checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) } + test("Catalog: set/remove table properties") { + withTable("t") { + sql( + s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}' = 3)") + var flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult( + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + "check table properties")(flussTable.getProperties.toMap.asScala) + assert( + flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value1") + + sql(s"ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2')") + flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult( + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + "check table properties")(flussTable.getProperties.toMap.asScala) + assert( + flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value2") + + sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + flussTable = admin.getTableInfo(createTablePath("t")).get() + assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1")) + + sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + } + } + test("Partition: show partitions") { withTable("t") { sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)") From 69015424becf9e9b148446f4ac883d2c43adfe5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Mon, 26 Jan 2026 10:02:55 +0800 Subject: [PATCH 2/4] fix comments --- .../org/apache/fluss/spark/SparkCatalog.scala | 9 ------- .../apache/fluss/spark/SparkCatalogTest.scala | 25 ++++++++++++++++--- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 1a80b234b5..cdc2206f27 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -80,13 +80,6 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - if ( - !changes.forall( - e => e.isInstanceOf[TableChange.SetProperty] || e.isInstanceOf[TableChange.RemoveProperty]) - ) { - throw new IllegalArgumentException( - "Altering table only supports set or remove properties for now") - } try { admin .alterTable(toTablePath(ident), SparkConversions.toFlussTableChanges(changes).asJava, false) @@ -99,8 +92,6 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl } else { throw e } - case e: UnsupportedOperationException => - throw new IllegalArgumentException(e) } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index 1805f3fc5c..8235effe0f 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -18,6 +18,7 @@ package org.apache.fluss.spark import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.exception.InvalidAlterTableException import org.apache.fluss.metadata._ import org.apache.fluss.types.{DataTypes, RowType} @@ -25,6 +26,9 @@ import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.connector.catalog.Identifier import org.assertj.core.api.Assertions.{assertThat, assertThatList} +import org.scalatest.matchers.should.Matchers.{a, convertToAnyShouldWrapper} + +import java.util.concurrent.ExecutionException import scala.collection.JavaConverters._ @@ -204,19 +208,32 @@ class SparkCatalogTest extends FlussSparkTestBase { assert( flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value1") - sql(s"ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2')") + sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' = 'value2')") flussTable = admin.getTableInfo(createTablePath("t")).get() assertResult( Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), "check table properties")(flussTable.getProperties.toMap.asScala) assert( flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value2") + assert( + flussTable.getCustomProperties.toMap.asScala.getOrElse("key2", "non-exists") == "value2") - sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + sql("ALTER TABLE t UNSET TBLPROPERTIES('key1', 'key2')") flussTable = admin.getTableInfo(createTablePath("t")).get() assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1")) - - sql(s"ALTER TABLE t UNSET TBLPROPERTIES('key1')") + assert(!flussTable.getCustomProperties.toMap.asScala.contains("key2")) + + sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')") + + // Most table properties with prefix of 'table.' are not allowed to be modified. + intercept[ExecutionException] { + sql( + s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_REPLICATION_FACTOR.key()}' = '2')") + }.getCause.shouldBe(a[InvalidAlterTableException]) + intercept[ExecutionException] { + sql( + s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_FORMAT.key()}' = 'paimon')") + }.getCause.shouldBe(a[InvalidAlterTableException]) } } From 20078df188bffddc4600c4c4b7afd972d52a8bf4 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 26 Jan 2026 14:04:09 +0800 Subject: [PATCH 3/4] add test to alter "table.datalake.enabled" config --- .../fluss/spark/FlussSparkTestBase.scala | 5 ++--- .../apache/fluss/spark/SparkCatalogTest.scala | 21 +++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index f9b98fe7c9..296157c4a1 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -22,17 +22,15 @@ import org.apache.fluss.client.admin.Admin import org.apache.fluss.client.table.Table import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.{ConfigOptions, Configuration} -import org.apache.fluss.metadata.{TableDescriptor, TablePath} +import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath} import org.apache.fluss.row.InternalRow import org.apache.fluss.server.testutils.FlussClusterExtension - import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession import org.junit.jupiter.api.extension.RegisterExtension import java.time.Duration - import scala.collection.JavaConverters._ class FlussSparkTestBase extends QueryTest with SharedSparkSession { @@ -107,6 +105,7 @@ object FlussSparkTestBase { .setClusterConf( new Configuration() .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON) ) .setNumOfTabletServers(3) .build diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index 8235effe0f..d15c6db024 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -202,8 +202,10 @@ class SparkCatalogTest extends FlussSparkTestBase { sql( s"CREATE TABLE $DEFAULT_DATABASE.t (id int, name string) TBLPROPERTIES('key1' = 'value1', '${SparkConnectorOptions.BUCKET_NUMBER.key()}' = 3)") var flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult(flussTable.getNumBuckets, "check bucket num")(3) assertResult( - Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"), "check table properties")(flussTable.getProperties.toMap.asScala) assert( flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value1") @@ -211,7 +213,8 @@ class SparkCatalogTest extends FlussSparkTestBase { sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' = 'value2')") flussTable = admin.getTableInfo(createTablePath("t")).get() assertResult( - Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1"), + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"), "check table properties")(flussTable.getProperties.toMap.asScala) assert( flussTable.getCustomProperties.toMap.asScala.getOrElse("key1", "non-exists") == "value2") @@ -223,17 +226,23 @@ class SparkCatalogTest extends FlussSparkTestBase { assert(!flussTable.getCustomProperties.toMap.asScala.contains("key1")) assert(!flussTable.getCustomProperties.toMap.asScala.contains("key2")) + // no error if unset not-exists key sql("ALTER TABLE t UNSET TBLPROPERTIES('key1')") + sql( + s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')") + flussTable = admin.getTableInfo(createTablePath("t")).get() + assertResult( + Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon", + ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"), + "check table properties")(flussTable.getProperties.toMap.asScala) + // Most table properties with prefix of 'table.' are not allowed to be modified. intercept[ExecutionException] { sql( s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_REPLICATION_FACTOR.key()}' = '2')") }.getCause.shouldBe(a[InvalidAlterTableException]) - intercept[ExecutionException] { - sql( - s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_FORMAT.key()}' = 'paimon')") - }.getCause.shouldBe(a[InvalidAlterTableException]) } } From 2415ecc8b738b966955a37223ab99c5ca8960362 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 26 Jan 2026 14:14:02 +0800 Subject: [PATCH 4/4] fix spotless --- .../apache/fluss/spark/FlussSparkTestBase.scala | 2 ++ .../org/apache/fluss/spark/SparkCatalogTest.scala | 15 ++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 296157c4a1..6dd2c4d869 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -25,12 +25,14 @@ import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath} import org.apache.fluss.row.InternalRow import org.apache.fluss.server.testutils.FlussClusterExtension + import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession import org.junit.jupiter.api.extension.RegisterExtension import java.time.Duration + import scala.collection.JavaConverters._ class FlussSparkTestBase extends QueryTest with SharedSparkSession { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index d15c6db024..242e7998ec 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -204,7 +204,8 @@ class SparkCatalogTest extends FlussSparkTestBase { var flussTable = admin.getTableInfo(createTablePath("t")).get() assertResult(flussTable.getNumBuckets, "check bucket num")(3) assertResult( - Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + Map( + ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"), "check table properties")(flussTable.getProperties.toMap.asScala) assert( @@ -213,7 +214,8 @@ class SparkCatalogTest extends FlussSparkTestBase { sql("ALTER TABLE t SET TBLPROPERTIES('key1' = 'value2', 'key2' = 'value2')") flussTable = admin.getTableInfo(createTablePath("t")).get() assertResult( - Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + Map( + ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon"), "check table properties")(flussTable.getProperties.toMap.asScala) assert( @@ -233,10 +235,13 @@ class SparkCatalogTest extends FlussSparkTestBase { s"ALTER TABLE t SET TBLPROPERTIES('${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = 'true')") flussTable = admin.getTableInfo(createTablePath("t")).get() assertResult( - Map(ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", + Map( + ConfigOptions.TABLE_REPLICATION_FACTOR.key() -> "1", ConfigOptions.TABLE_DATALAKE_FORMAT.key() -> "paimon", - ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true"), - "check table properties")(flussTable.getProperties.toMap.asScala) + ConfigOptions.TABLE_DATALAKE_ENABLED.key() -> "true" + ), + "check table properties" + )(flussTable.getProperties.toMap.asScala) // Most table properties with prefix of 'table.' are not allowed to be modified. intercept[ExecutionException] {