Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Sep 25, 2024
1 parent 49445a5 commit 7dd6d54
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ case class MergeIntoPaimonTable(
createNewScanPlan(
candidateDataSplits,
targetOnlyCondition.getOrElse(TrueLiteral),
relation))
relation,
metadataCols))
val ds = constructChangedRows(
sparkSession,
filteredRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import org.apache.paimon.utils.SerializationUtils

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter => FilterLogicalNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter => FilterLogicalNode, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, EqualTo, Filter}

Expand All @@ -47,7 +48,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLConfHelper {

/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate`
Expand Down Expand Up @@ -131,6 +132,21 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
.map(relativePath)
}

protected def createNewScanPlan(
candidateDataSplits: Seq[DataSplit],
condition: Expression,
relation: DataSourceV2Relation,
metadataColumns: Seq[PaimonMetadataColumn]): LogicalPlan = {
val newRelation = createNewScanPlan(candidateDataSplits, condition, relation)
val resolvedMetadataColumns = metadataColumns.map {
col =>
val attr = newRelation.resolve(col.name :: Nil, conf.resolver)
assert(attr.isDefined)
attr.get
}
Project(relation.output ++ resolvedMetadataColumns, newRelation)
}

protected def createNewScanPlan(
candidateDataSplits: Seq[DataSplit],
condition: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,78 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorsMaintainer}
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.paimon.Utils
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.util.QueryExecutionListener
import org.junit.jupiter.api.Assertions

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

class DeletionVectorTest extends PaimonSparkTestBase {
class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelper {

import testImplicits._

private def runAndCheckSplitScan(query: String): Unit = {
val batchScans = new ArrayBuffer[(DataSourceV2Relation, BatchScanExec)]()
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}

private def isValidSplitScan(scan: BatchScanExec): Boolean = {
if (!scan.scan.isInstanceOf[PaimonSplitScan]) {
return false
}
val splitScan = scan.scan.asInstanceOf[PaimonSplitScan]
assert(splitScan.table.primaryKeys().isEmpty)
splitScan.coreOptions.deletionVectorsEnabled() &&
scan.output.exists(
attr => PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.contains(attr.name))
}

private def appendScan(qe: QueryExecution, plan: SparkPlan): Unit = {
plan match {
case memory: InMemoryTableScanExec =>
foreach(memory.relation.cachedPlan)(p => appendScan(qe, p))
case scan: BatchScanExec if isValidSplitScan(scan) =>
val logicalScan = qe.analyzed.find(_.isInstanceOf[DataSourceV2Relation])
assert(logicalScan.isDefined)
batchScans.append((logicalScan.get.asInstanceOf[DataSourceV2Relation], scan))
case _ =>
}
}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
foreach(qe.executedPlan)(p => appendScan(qe, p))
}
}
spark.listenerManager.register(listener)

try {
val df = spark.sql(query)
df.collect()
Utils.waitUntilEventEmpty(df.sparkSession)
assert(batchScans.nonEmpty, query)
assert(
batchScans.forall {
case (logicalScan, scan) =>
logicalScan.output.size > scan.output.size
},
batchScans)
} finally {
spark.listenerManager.unregister(listener)
}
}

bucketModes.foreach {
bucket =>
test(s"Paimon DeletionVector: merge into with bucket = $bucket") {
Expand All @@ -59,21 +117,21 @@ class DeletionVectorTest extends PaimonSparkTestBase {
val table = loadTable("target")
val dvMaintainerFactory =
new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
spark.sql(s"""
|MERGE INTO target
|USING source
|ON target.a = source.a
|WHEN MATCHED AND target.a = 5 THEN
|UPDATE SET b = source.b + target.b
|WHEN MATCHED AND source.c > 'c2' THEN
|UPDATE SET *
|WHEN MATCHED THEN
|DELETE
|WHEN NOT MATCHED AND c > 'c9' THEN
|INSERT (a, b, c) VALUES (a, b * 1.1, c)
|WHEN NOT MATCHED THEN
|INSERT *
|""".stripMargin)
runAndCheckSplitScan(s"""
|MERGE INTO target
|USING source
|ON target.a = source.a
|WHEN MATCHED AND target.a = 5 THEN
|UPDATE SET b = source.b + target.b
|WHEN MATCHED AND source.c > 'c2' THEN
|UPDATE SET *
|WHEN MATCHED THEN
|DELETE
|WHEN NOT MATCHED AND c > 'c9' THEN
|INSERT (a, b, c) VALUES (a, b * 1.1, c)
|WHEN NOT MATCHED THEN
|INSERT *
|""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM target ORDER BY a, b"),
Expand Down Expand Up @@ -116,7 +174,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond1 = "id = 2"
val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
spark.sql(s"UPDATE T SET name = 'b_2' WHERE $cond1")
runAndCheckSplitScan(s"UPDATE T SET name = 'b_2' WHERE $cond1")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Nil)
Expand All @@ -136,12 +194,12 @@ class DeletionVectorTest extends PaimonSparkTestBase {
Assertions.assertTrue(deletionVectors2 == deletionVectors3)

val cond2 = "id % 2 = 1"
spark.sql(s"UPDATE T SET name = concat(name, '_2') WHERE $cond2")
runAndCheckSplitScan(s"UPDATE T SET name = concat(name, '_2') WHERE $cond2")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a_2") :: Row(2, "b_2") :: Row(3, "c_2") :: Row(4, "d") :: Row(5, "e_2") :: Nil)

spark.sql(s"UPDATE T SET name = '_all'")
runAndCheckSplitScan("UPDATE T SET name = '_all'")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4, "_all") :: Row(
Expand Down Expand Up @@ -188,7 +246,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond1 = "id = 2"
val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
spark.sql(s"UPDATE T SET name = 'b_2' WHERE $cond1")
runAndCheckSplitScan(s"UPDATE T SET name = 'b_2' WHERE $cond1")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(2, "b_2", "2024") :: Row(3, "c", "2025") :: Row(
Expand All @@ -209,7 +267,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond2 = "pt = '2025'"
val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
spark.sql(s"UPDATE T SET name = concat(name, '_2') WHERE $cond2")
runAndCheckSplitScan(s"UPDATE T SET name = concat(name, '_2') WHERE $cond2")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(2, "b_2", "2024") :: Row(3, "c_2", "2025") :: Row(
Expand Down Expand Up @@ -273,7 +331,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond1 = "id = 2"
val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
spark.sql(s"DELETE FROM T WHERE $cond1")
runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1")
checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") :: Nil)
val deletionVectors2 = getAllLatestDeletionVectors(table, dvMaintainerFactory)
Assertions.assertEquals(1, deletionVectors2.size)
Expand All @@ -291,7 +349,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
Assertions.assertTrue(deletionVectors2 == deletionVectors3)

val cond2 = "id % 2 = 1"
spark.sql(s"DELETE FROM T WHERE $cond2")
runAndCheckSplitScan(s"DELETE FROM T WHERE $cond2")
checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") :: Row(4, "d") :: Nil)

spark.sql("CALL sys.compact('T')")
Expand Down Expand Up @@ -337,7 +395,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond1 = "id = 2"
val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
spark.sql(s"DELETE FROM T WHERE $cond1")
runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") :: Nil)
Expand All @@ -351,7 +409,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {

val cond2 = "id = 3"
val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
spark.sql(s"DELETE FROM T WHERE $cond2")
runAndCheckSplitScan(s"DELETE FROM T WHERE $cond2")
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.spark.paimon

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{Utils => SparkUtils}

import java.io.File
Expand All @@ -29,4 +30,7 @@ object Utils {

def createTempDir: File = SparkUtils.createTempDir()

def waitUntilEventEmpty(spark: SparkSession): Unit = {
spark.sparkContext.listenerBus.waitUntilEmpty()
}
}

0 comments on commit 7dd6d54

Please sign in to comment.