From 6be7d378bc64a6f4ee0367830faf833d1352a18e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 21 Jan 2026 16:04:23 +0800 Subject: [PATCH 1/4] [core][spark] introduce action to handle updates on global-indexed columns --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 24 ++++ .../MergeIntoPaimonDataEvolutionTable.scala | 105 ++++++++++++++--- .../spark/sql/RowTrackingTestBase.scala | 108 ++++++++++++++++++ 4 files changed, 226 insertions(+), 17 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 41620abd79ac..1e1ac14edf31 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -626,6 +626,12 @@ Integer The maximum number of concurrent scanner for global index.By default is the number of processors available to the Java virtual machine. + +
global-index.column-update-action
+ THROW_ERROR + Enum + Defines the action to take when an update modifies columns that are covered by a global index. +
ignore-delete
false diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 6dc1e0a91dd6..5033eb2fa535 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2072,6 +2072,14 @@ public InlineElement getDescription() { .withDescription( "Global index root directory, if not set, the global index files will be stored under the /index."); + public static final ConfigOption + GLOBAL_INDEX_COLUMN_UPDATE_ACTION = + key("global-index.column-update-action") + .enumType(GlobalIndexColumnUpdateAction.class) + .defaultValue(GlobalIndexColumnUpdateAction.THROW_ERROR) + .withDescription( + "Defines the action to take when an update modifies columns that are covered by a global index."); + public static final ConfigOption LOOKUP_MERGE_BUFFER_SIZE = key("lookup.merge-buffer-size") .memoryType() @@ -2780,6 +2788,10 @@ public Path globalIndexExternalPath() { return path; } + public GlobalIndexColumnUpdateAction globalIndexColumnUpdateAction() { + return options.get(GLOBAL_INDEX_COLUMN_UPDATE_ACTION); + } + public LookupStrategy lookupStrategy() { return LookupStrategy.from( mergeEngine().equals(MergeEngine.FIRST_ROW), @@ -4015,4 +4027,16 @@ public InlineElement getDescription() { return text(description); } } + + /** + * Action to take when an UPDATE (e.g. via MERGE INTO) modifies columns that are covered by a + * global index. + */ + public enum GlobalIndexColumnUpdateAction { + /** Updating indexed columns is forbidden. */ + THROW_ERROR, + + /** Drop all global index entries for the whole partitions affected by the update. */ + DROP_PARTITION_INDEX + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 82a3ae60e384..3cbee6f2dffd 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -18,7 +18,11 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction +import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.io.{CompactIncrement, DataIncrement} +import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.PaimonRelation import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable @@ -27,7 +31,7 @@ import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan import org.apache.paimon.table.FileStoreTable -import org.apache.paimon.table.sink.CommitMessage +import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} import org.apache.paimon.table.source.DataSplit import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -45,7 +49,7 @@ import org.apache.spark.sql.types.StructType import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ import scala.collection.Searching.{search, Found, InsertionPoint} -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} /** Command for Merge Into for Data Evolution paimon table. */ case class MergeIntoPaimonDataEvolutionTable( @@ -76,6 +80,23 @@ case class MergeIntoPaimonDataEvolutionTable( import MergeIntoPaimonDataEvolutionTable._ override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] + + private val updateColumns: Set[AttributeReference] = { + val columns = mutable.Set[AttributeReference]() + for (action <- matchedActions) { + action match { + case updateAction: UpdateAction => + for (assignment <- updateAction.assignments) { + if (!assignment.key.equals(assignment.value)) { + val key = assignment.key.asInstanceOf[AttributeReference] + columns ++= Seq(key) + } + } + } + } + columns.toSet + } + private val firstRowIds: immutable.IndexedSeq[Long] = table .store() .newScan() @@ -169,9 +190,10 @@ case class MergeIntoPaimonDataEvolutionTable( // step 2: invoke update action val updateCommit = - if (matchedActions.nonEmpty) - updateActionInvoke(sparkSession, touchedFileTargetRelation) - else Nil + if (matchedActions.nonEmpty) { + val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation) + checkUpdateResult(updateResult) + } else Nil // step 3: invoke insert action val insertCommit = @@ -232,18 +254,6 @@ case class MergeIntoPaimonDataEvolutionTable( (o1, o2) => { o1.toString().compareTo(o2.toString()) }) ++ mergeFields - val updateColumns = mutable.Set[AttributeReference]() - for (action <- matchedActions) { - action match { - case updateAction: UpdateAction => - for (assignment <- updateAction.assignments) { - if (!assignment.key.equals(assignment.value)) { - val key = assignment.key.asInstanceOf[AttributeReference] - updateColumns ++= Seq(key) - } - } - } - } val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) @@ -459,6 +469,67 @@ case class MergeIntoPaimonDataEvolutionTable( } } + private def checkUpdateResult(updateCommit: Seq[CommitMessage]): Seq[CommitMessage] = { + val affectedParts: Set[BinaryRow] = updateCommit.map(_.partition()).toSet + val rowType = table.rowType() + + // find all global index files of affected partitions and updated columns + val latestSnapshot = table.latestSnapshot() + if (latestSnapshot.isEmpty) { + return updateCommit + } + + val filter: org.apache.paimon.utils.Filter[IndexManifestEntry] = + (entry: IndexManifestEntry) => { + val globalIndexMeta = entry.indexFile().globalIndexMeta() + if (globalIndexMeta == null) { + false + } else { + val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + affectedParts.contains(entry.partition()) && updateColumns.exists( + _.name.equals(fieldName)) + } + } + + val affectedIndexEntries = table + .store() + .newIndexFileHandler() + .scan(latestSnapshot.get(), filter) + .asScala + + if (affectedIndexEntries.isEmpty) { + updateCommit + } else { + table.coreOptions().globalIndexColumnUpdateAction() match { + case GlobalIndexColumnUpdateAction.THROW_ERROR => + val updatedColNames = updateColumns.map(_.name) + val conflicted = affectedIndexEntries + .map(_.indexFile().globalIndexMeta().indexFieldId()) + .map(id => rowType.getField(id).name()) + .toSet + throw new RuntimeException( + s"""MergeInto: update columns contain globally indexed columns, not supported now. + |Updated columns: ${updatedColNames.toSeq.sorted.mkString("[", ", ", "]")} + |Conflicted columns: ${conflicted.toSeq.sorted.mkString("[", ", ", "]")} + |""".stripMargin) + case GlobalIndexColumnUpdateAction.DROP_PARTITION_INDEX => + val grouped: Map[BinaryRow, Seq[IndexManifestEntry]] = + affectedIndexEntries.groupBy(_.partition()) + val deleteCommitMessages = ArrayBuffer.empty[CommitMessage] + grouped.foreach { + case (part, entries) => + deleteCommitMessages += new CommitMessageImpl( + part, + 0, + null, + DataIncrement.deleteIndexIncrement(entries.map(_.indexFile()).asJava), + CompactIncrement.emptyIncrement()) + } + updateCommit ++ deleteCommitMessages + } + } + } + private def findRelatedFirstRowIds( dataset: Dataset[Row], sparkSession: SparkSession, diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 2a0e32c18f86..083505fa7d9a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.QueryExecutionListener import java.util.concurrent.{CountDownLatch, TimeUnit} +import scala.jdk.CollectionConverters.asScalaBufferConverter + abstract class RowTrackingTestBase extends PaimonSparkTestBase { import testImplicits._ @@ -650,4 +652,110 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { ) } } + + test("Data Evolution: test global indexed column update action -- throw error") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + // write two partitions: p0 & p1 + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + // create global index for p0 + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " partitions => 'pt=\"p0\"', options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + + // call merge into to update global-indexed partition + assert(intercept[RuntimeException] { + sql(s""" + |MERGE INTO T + |USING T AS source + |ON T._ROW_ID = source._ROW_ID AND T.pt = 'p0' + |WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName' + |""".stripMargin) + }.getMessage + .contains("MergeInto: update columns contain globally indexed columns, not supported now.")) + + // call merge into to update non-indexed partition + sql(s""" + |MERGE INTO T + |USING T AS source + |ON T._ROW_ID = source._ROW_ID AND T.pt = 'p1' + |WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName' + |""".stripMargin) + } + } + + test("Data Evolution: test global indexed column update action -- drop partition index") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'global-index.column-update-action' = 'DROP_PARTITION_INDEX') + | PARTITIONED BY (pt) + |""".stripMargin) + + // write two partitions: p0 & p1 + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + // create global index for all parts + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + + // call merge into to update some data of p1 + sql(s""" + |MERGE INTO T + |USING T AS source + |ON T._ROW_ID = source._ROW_ID AND T.pt = 'p1' + |WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName' + |""".stripMargin) + + val table = loadTable("T") + val indexEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + + // all modified partitions' index entries should have been removed + assert(indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p0"))) + assert(!indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p1"))) + } + } } From a45aa843140278f8ca24e816b7c989267063b15f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 21 Jan 2026 16:33:08 +0800 Subject: [PATCH 2/4] fix test --- .../spark/commands/MergeIntoPaimonDataEvolutionTable.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 3cbee6f2dffd..545bf36fc1be 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -475,7 +475,7 @@ case class MergeIntoPaimonDataEvolutionTable( // find all global index files of affected partitions and updated columns val latestSnapshot = table.latestSnapshot() - if (latestSnapshot.isEmpty) { + if (!latestSnapshot.isPresent) { return updateCommit } @@ -496,6 +496,7 @@ case class MergeIntoPaimonDataEvolutionTable( .newIndexFileHandler() .scan(latestSnapshot.get(), filter) .asScala + .toSeq if (affectedIndexEntries.isEmpty) { updateCommit @@ -513,8 +514,7 @@ case class MergeIntoPaimonDataEvolutionTable( |Conflicted columns: ${conflicted.toSeq.sorted.mkString("[", ", ", "]")} |""".stripMargin) case GlobalIndexColumnUpdateAction.DROP_PARTITION_INDEX => - val grouped: Map[BinaryRow, Seq[IndexManifestEntry]] = - affectedIndexEntries.groupBy(_.partition()) + val grouped = affectedIndexEntries.groupBy(_.partition()) val deleteCommitMessages = ArrayBuffer.empty[CommitMessage] grouped.foreach { case (part, entries) => From a4704372bbda369044bf4da326cb0e121b92688a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 21 Jan 2026 16:51:27 +0800 Subject: [PATCH 3/4] fix test --- .../scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 083505fa7d9a..36feb56f824a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.util.QueryExecutionListener import java.util.concurrent.{CountDownLatch, TimeUnit} -import scala.jdk.CollectionConverters.asScalaBufferConverter +import scala.collection.JavaConverters._ abstract class RowTrackingTestBase extends PaimonSparkTestBase { From 684cb6f5e855be235d5a8f43c8187363ac966775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 21 Jan 2026 17:40:13 +0800 Subject: [PATCH 4/4] fix docs --- .../shortcodes/generated/core_configuration.html | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 1e1ac14edf31..12d45a6a003d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -602,6 +602,12 @@ Integer For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0. + +
global-index.column-update-action
+ THROW_ERROR +

Enum

+ Defines the action to take when an update modifies columns that are covered by a global index.

Possible values:
  • "THROW_ERROR"
  • "DROP_PARTITION_INDEX"
+
global-index.enabled
true @@ -626,12 +632,6 @@ Integer The maximum number of concurrent scanner for global index.By default is the number of processors available to the Java virtual machine. - -
global-index.column-update-action
- THROW_ERROR - Enum - Defines the action to take when an update modifies columns that are covered by a global index. -
ignore-delete
false