Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@
<td>Integer</td>
<td>For streaming write, full compaction will be constantly triggered after delta commits. For batch write, full compaction will be triggered with each commit as long as this value is greater than 0.</td>
</tr>
<tr>
<td><h5>global-index.column-update-action</h5></td>
<td style="word-wrap: break-word;">THROW_ERROR</td>
<td><p>Enum</p></td>
<td>Defines the action to take when an update modifies columns that are covered by a global index.<br /><br />Possible values:<ul><li>"THROW_ERROR"</li><li>"DROP_PARTITION_INDEX"</li></ul></td>
</tr>
<tr>
<td><h5>global-index.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
24 changes: 24 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,14 @@ public InlineElement getDescription() {
.withDescription(
"Global index root directory, if not set, the global index files will be stored under the <table-root-directory>/index.");

public static final ConfigOption<GlobalIndexColumnUpdateAction>
GLOBAL_INDEX_COLUMN_UPDATE_ACTION =
key("global-index.column-update-action")
.enumType(GlobalIndexColumnUpdateAction.class)
.defaultValue(GlobalIndexColumnUpdateAction.THROW_ERROR)
.withDescription(
"Defines the action to take when an update modifies columns that are covered by a global index.");

public static final ConfigOption<MemorySize> LOOKUP_MERGE_BUFFER_SIZE =
key("lookup.merge-buffer-size")
.memoryType()
Expand Down Expand Up @@ -2780,6 +2788,10 @@ public Path globalIndexExternalPath() {
return path;
}

public GlobalIndexColumnUpdateAction globalIndexColumnUpdateAction() {
return options.get(GLOBAL_INDEX_COLUMN_UPDATE_ACTION);
}

public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down Expand Up @@ -4015,4 +4027,16 @@ public InlineElement getDescription() {
return text(description);
}
}

/**
* Action to take when an UPDATE (e.g. via MERGE INTO) modifies columns that are covered by a
* global index.
*/
public enum GlobalIndexColumnUpdateAction {
/** Updating indexed columns is forbidden. */
THROW_ERROR,

/** Drop all global index entries for the whole partitions affected by the update. */
DROP_PARTITION_INDEX
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
import org.apache.paimon.io.{CompactIncrement, DataIncrement}
import org.apache.paimon.manifest.IndexManifestEntry
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
Expand All @@ -27,7 +31,7 @@ import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit

import org.apache.spark.sql.{Dataset, Row, SparkSession}
Expand All @@ -45,7 +49,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.{immutable, mutable}
import scala.collection.JavaConverters._
import scala.collection.Searching.{search, Found, InsertionPoint}
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/** Command for Merge Into for Data Evolution paimon table. */
case class MergeIntoPaimonDataEvolutionTable(
Expand Down Expand Up @@ -76,6 +80,23 @@ case class MergeIntoPaimonDataEvolutionTable(
import MergeIntoPaimonDataEvolutionTable._

override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]

private val updateColumns: Set[AttributeReference] = {
val columns = mutable.Set[AttributeReference]()
for (action <- matchedActions) {
action match {
case updateAction: UpdateAction =>
for (assignment <- updateAction.assignments) {
if (!assignment.key.equals(assignment.value)) {
val key = assignment.key.asInstanceOf[AttributeReference]
columns ++= Seq(key)
}
}
}
}
columns.toSet
}

private val firstRowIds: immutable.IndexedSeq[Long] = table
.store()
.newScan()
Expand Down Expand Up @@ -169,9 +190,10 @@ case class MergeIntoPaimonDataEvolutionTable(

// step 2: invoke update action
val updateCommit =
if (matchedActions.nonEmpty)
updateActionInvoke(sparkSession, touchedFileTargetRelation)
else Nil
if (matchedActions.nonEmpty) {
val updateResult = updateActionInvoke(sparkSession, touchedFileTargetRelation)
checkUpdateResult(updateResult)
} else Nil

// step 3: invoke insert action
val insertCommit =
Expand Down Expand Up @@ -232,18 +254,6 @@ case class MergeIntoPaimonDataEvolutionTable(
(o1, o2) => {
o1.toString().compareTo(o2.toString())
}) ++ mergeFields
val updateColumns = mutable.Set[AttributeReference]()
for (action <- matchedActions) {
action match {
case updateAction: UpdateAction =>
for (assignment <- updateAction.assignments) {
if (!assignment.key.equals(assignment.value)) {
val key = assignment.key.asInstanceOf[AttributeReference]
updateColumns ++= Seq(key)
}
}
}
}

val updateColumnsSorted = updateColumns.toSeq.sortBy(
s => targetTable.output.map(x => x.toString()).indexOf(s.toString()))
Expand Down Expand Up @@ -459,6 +469,67 @@ case class MergeIntoPaimonDataEvolutionTable(
}
}

private def checkUpdateResult(updateCommit: Seq[CommitMessage]): Seq[CommitMessage] = {
val affectedParts: Set[BinaryRow] = updateCommit.map(_.partition()).toSet
val rowType = table.rowType()

// find all global index files of affected partitions and updated columns
val latestSnapshot = table.latestSnapshot()
if (!latestSnapshot.isPresent) {
return updateCommit
}

val filter: org.apache.paimon.utils.Filter[IndexManifestEntry] =
(entry: IndexManifestEntry) => {
val globalIndexMeta = entry.indexFile().globalIndexMeta()
if (globalIndexMeta == null) {
false
} else {
val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name()
affectedParts.contains(entry.partition()) && updateColumns.exists(
_.name.equals(fieldName))
}
}

val affectedIndexEntries = table
.store()
.newIndexFileHandler()
.scan(latestSnapshot.get(), filter)
.asScala
.toSeq

if (affectedIndexEntries.isEmpty) {
updateCommit
} else {
table.coreOptions().globalIndexColumnUpdateAction() match {
case GlobalIndexColumnUpdateAction.THROW_ERROR =>
val updatedColNames = updateColumns.map(_.name)
val conflicted = affectedIndexEntries
.map(_.indexFile().globalIndexMeta().indexFieldId())
.map(id => rowType.getField(id).name())
.toSet
throw new RuntimeException(
s"""MergeInto: update columns contain globally indexed columns, not supported now.
|Updated columns: ${updatedColNames.toSeq.sorted.mkString("[", ", ", "]")}
|Conflicted columns: ${conflicted.toSeq.sorted.mkString("[", ", ", "]")}
|""".stripMargin)
case GlobalIndexColumnUpdateAction.DROP_PARTITION_INDEX =>
val grouped = affectedIndexEntries.groupBy(_.partition())
val deleteCommitMessages = ArrayBuffer.empty[CommitMessage]
grouped.foreach {
case (part, entries) =>
deleteCommitMessages += new CommitMessageImpl(
part,
0,
null,
DataIncrement.deleteIndexIncrement(entries.map(_.indexFile()).asJava),
CompactIncrement.emptyIncrement())
}
updateCommit ++ deleteCommitMessages
}
}
}

private def findRelatedFirstRowIds(
dataset: Dataset[Row],
sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.QueryExecutionListener

import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.JavaConverters._

abstract class RowTrackingTestBase extends PaimonSparkTestBase {

import testImplicits._
Expand Down Expand Up @@ -650,4 +652,110 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
)
}
}

test("Data Evolution: test global indexed column update action -- throw error") {
withTable("T") {
spark.sql("""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'bucket' = '-1',
| 'global-index.row-count-per-shard' = '10000',
| 'row-tracking.enabled' = 'true',
| 'data-evolution.enabled' = 'true')
| PARTITIONED BY (pt)
|""".stripMargin)

// write two partitions: p0 & p1
var values =
(0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

// create global index for p0
val output =
spark
.sql(
"CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," +
" partitions => 'pt=\"p0\"', options => 'btree-index.records-per-range=1000')")
.collect()
.head

assert(output.getBoolean(0))

// call merge into to update global-indexed partition
assert(intercept[RuntimeException] {
sql(s"""
|MERGE INTO T
|USING T AS source
|ON T._ROW_ID = source._ROW_ID AND T.pt = 'p0'
|WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName'
|""".stripMargin)
}.getMessage
.contains("MergeInto: update columns contain globally indexed columns, not supported now."))

// call merge into to update non-indexed partition
sql(s"""
|MERGE INTO T
|USING T AS source
|ON T._ROW_ID = source._ROW_ID AND T.pt = 'p1'
|WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName'
|""".stripMargin)
}
}

test("Data Evolution: test global indexed column update action -- drop partition index") {
withTable("T") {
spark.sql("""
|CREATE TABLE T (id INT, name STRING, pt STRING)
|TBLPROPERTIES (
| 'bucket' = '-1',
| 'global-index.row-count-per-shard' = '10000',
| 'row-tracking.enabled' = 'true',
| 'data-evolution.enabled' = 'true',
| 'global-index.column-update-action' = 'DROP_PARTITION_INDEX')
| PARTITIONED BY (pt)
|""".stripMargin)

// write two partitions: p0 & p1
var values =
(0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

// create global index for all parts
val output =
spark
.sql(
"CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," +
" options => 'btree-index.records-per-range=1000')")
.collect()
.head

assert(output.getBoolean(0))

// call merge into to update some data of p1
sql(s"""
|MERGE INTO T
|USING T AS source
|ON T._ROW_ID = source._ROW_ID AND T.pt = 'p1'
|WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = 'updatedName'
|""".stripMargin)

val table = loadTable("T")
val indexEntries = table
.store()
.newIndexFileHandler()
.scanEntries()
.asScala
.filter(_.indexFile().indexType() == "btree")

// all modified partitions' index entries should have been removed
assert(indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p0")))
assert(!indexEntries.exists(entry => entry.partition().getString(0).toString.equals("p1")))
}
}
}