Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1105] Change Data Feed - MERGE command #1155

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, StructType}

case class MergeDataSizes(
@JsonDeserialize(contentAs = classOf[java.lang.Long])
Expand Down Expand Up @@ -216,6 +216,7 @@ case class MergeIntoCommand(
import MergeIntoCommand._

import SQLMetrics._
import org.apache.spark.sql.delta.commands.cdc.CDCReader._

override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE)
override val canOverwriteSchema: Boolean = false
Expand Down Expand Up @@ -254,6 +255,10 @@ case class MergeIntoCommand(
"numTargetFilesAfterSkipping" -> createMetric(sc, "number of target files after skipping"),
"numTargetFilesRemoved" -> createMetric(sc, "number of files removed to target"),
"numTargetFilesAdded" -> createMetric(sc, "number of files added to target"),
"numTargetChangeFilesAdded" ->
createMetric(sc, "number of change data capture files generated"),
"numTargetChangeFileBytes" ->
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
createMetric(sc, "total size of change data capture files generated"),
"numTargetBytesBeforeSkipping" -> createMetric(sc, "number of target bytes before skipping"),
"numTargetBytesAfterSkipping" -> createMetric(sc, "number of target bytes after skipping"),
"numTargetBytesRemoved" -> createMetric(sc, "number of target bytes removed"),
Expand Down Expand Up @@ -456,7 +461,7 @@ case class MergeIntoCommand(

val outputColNames = getTargetOutputCols(deltaTxn).map(_.name)
// we use head here since we know there is only a single notMatchedClause
val outputExprs = notMatchedClauses.head.resolvedActions.map(_.expr) :+ incrInsertedCountExpr
val outputExprs = notMatchedClauses.head.resolvedActions.map(_.expr)
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) =>
new Column(Alias(expr, name)())
}
Expand All @@ -478,6 +483,7 @@ case class MergeIntoCommand(

val insertDf = sourceDF.join(targetDF, new Column(condition), "leftanti")
.select(outputCols: _*)
.filter(new Column(incrInsertedCountExpr))

val newFiles = deltaTxn
.writeFiles(repartitionIfNeeded(spark, insertDf, deltaTxn.metadata.partitionColumns))
Expand All @@ -494,7 +500,7 @@ case class MergeIntoCommand(
metrics("numTargetBytesRemoved") += 0
metrics("numTargetPartitionsRemovedFrom") += 0
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
metrics("numTargetFilesAdded") += newFiles.size
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
metrics("numTargetBytesAdded") += addedBytes
metrics("numTargetPartitionsAddedTo") += addedPartitions
newFiles
Expand All @@ -503,6 +509,9 @@ case class MergeIntoCommand(
/**
* Write new files by reading the touched files and updating/inserting data using the source
* query/table. This is implemented using a full|right-outer-join using the merge condition.
*
* Note that unlike the insert-only code paths with just one control columns INCR_ROW_COUNT_COL,
* this method has a second control column CDC_TYPE_COL_NAME used for handling CDC when enabled.
*/
private def writeAllChanges(
spark: SparkSession,
Expand Down Expand Up @@ -548,26 +557,78 @@ case class MergeIntoCommand(
}

val joinedPlan = joinedDF.queryExecution.analyzed
val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

// ==== Generate the expressions to process full-outer join output and generate target rows ====
// If there are N columns in the target table, there will be N + 2 columns after processing
// - N columns for target table
// - INCR_ROW_COUNT_COL containing a UDF to update the output row row counter
// - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row

// To generate these N + 2 columns, we will generate N + 2 expressions and apply them to the
// rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before
// performing the final write, and the increment column will always be dropped after executing
// the metrics UDF.

// We produce both rows for the CDC_TYPE_NOT_CDC partition to be written to the main table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

itst hard to understand what partition means in the immediate context of this code. rewrite differently

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. LMK if it's more clear.

// and rows for the CDC partitions to be written as CDC files.
// See [[CDCReader]] for general details on how partitioning on the CDC type column works.
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved

def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = {
tryResolveReferencesForExpressions(spark, exprs, joinedPlan)
}

def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Expression] = {
def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Seq[Expression]] = {
val exprs = clause match {
case u: DeltaMergeIntoUpdateClause =>
// Generate update expressions and set ROW_DELETED_COL = false
u.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrUpdatedCountExpr
// Generate update expressions and set CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val mainDataOutput = u.resolvedActions.map(_.expr) :+ incrUpdatedCountExpr :+
Literal(CDC_TYPE_NOT_CDC)
if (cdcEnabled) {
// For update preimage, we have do a no-op copy with
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op
// (because the metric will be incremented in the main partition)
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
val preImageOutput = targetOutputCols :+ Literal.TrueLiteral :+
Literal(CDC_TYPE_UPDATE_PREIMAGE)
// For update postimage, we have the same expressions as for mainDataOutput but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in the main
// partition), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE
val postImageOutput = mainDataOutput.dropRight(2) :+ Literal.TrueLiteral :+
Literal(CDC_TYPE_UPDATE_POSTIMAGE)
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
Seq(mainDataOutput, preImageOutput, postImageOutput)
} else {
Seq(mainDataOutput)
}
case _: DeltaMergeIntoDeleteClause =>
// Generate expressions to set the ROW_DELETED_COL = true
targetOutputCols :+ Literal.TrueLiteral :+ incrDeletedCountExpr
// Since the row will be deleted we don't need an output expression for the main partition
if (cdcEnabled) {
// For delete we do a no-op copy with CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE
// Since we don't write to the main partition, we need to increment the metric column
// INCR_ROW_COUNT_COL here
val deleteCdcOutput = targetOutputCols :+ incrDeletedCountExpr :+
Literal(CDC_TYPE_DELETE)
Seq(deleteCdcOutput)
} else {
Seq()
}
}
resolveOnJoinedPlan(exprs)
exprs.map(resolveOnJoinedPlan)
}

def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Expression] = {
resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrInsertedCountExpr)
def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]] = {
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
// Generate insert expressions and set CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val mainDataOutput = resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ incrInsertedCountExpr :+ Literal(CDC_TYPE_NOT_CDC))
if (cdcEnabled) {
// For insert we have the same expressions as for mainDataOutput, but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in the main
// partition), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
val insertCdcOutput = mainDataOutput.dropRight(2) :+ Literal.TrueLiteral :+
Literal(CDC_TYPE_INSERT)
Seq(mainDataOutput, insertCdcOutput)
} else {
Seq(mainDataOutput)
}
}

def clauseCondition(clause: DeltaMergeIntoClause): Expression = {
Expand All @@ -576,8 +637,16 @@ case class MergeIntoCommand(
resolveOnJoinedPlan(Seq(condExpr)).head
}

val outputRowSchema = if (!cdcEnabled) {
deltaTxn.metadata.schema
} else {
deltaTxn.metadata.schema
.add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
.add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
}

val joinedRowEncoder = RowEncoder(joinedPlan.schema)
val outputRowEncoder = RowEncoder(deltaTxn.metadata.schema).resolveAndBind()
val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind()

val processor = new JoinedRowProcessor(
targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,
Expand All @@ -587,15 +656,15 @@ case class MergeIntoCommand(
notMatchedConditions = notMatchedClauses.map(clauseCondition),
notMatchedOutputs = notMatchedClauses.map(notMatchedClauseOutput),
noopCopyOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.FalseLiteral :+ incrNoopCountExpr),
deleteRowOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.TrueLiteral :+ Literal.TrueLiteral),
resolveOnJoinedPlan(targetOutputCols :+ incrNoopCountExpr :+ Literal(CDC_TYPE_NOT_CDC)),
joinedAttributes = joinedPlan.output,
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)

val outputDF =
Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)
.drop(INCR_ROW_COUNT_COL)

logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)

// Write to Delta
Expand All @@ -604,7 +673,8 @@ case class MergeIntoCommand(

// Update metrics
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
metrics("numTargetFilesAdded") += newFiles.size
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile])
metrics("numTargetBytesAdded") += addedBytes
metrics("numTargetPartitionsAddedTo") += addedPartitions

Expand Down Expand Up @@ -748,16 +818,16 @@ object MergeIntoCommand {
val FILE_NAME_COL = "_file_name_"
val SOURCE_ROW_PRESENT_COL = "_source_row_present_"
val TARGET_ROW_PRESENT_COL = "_target_row_present_"
val INCR_ROW_COUNT_COL = "_incr_row_count_"

class JoinedRowProcessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this last time. Definitely add param docs. the triple sequence is hella confusing. honestly i should have param docs when i had originally implemented this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added param docs. An overview of what JoinedRowProcessor is doing may also be helpful, what do you think? I can add tomorrow

targetRowHasNoMatch: Expression,
sourceRowHasNoMatch: Expression,
matchedConditions: Seq[Expression],
matchedOutputs: Seq[Seq[Expression]],
matchedOutputs: Seq[Seq[Seq[Expression]]],
notMatchedConditions: Seq[Expression],
notMatchedOutputs: Seq[Seq[Expression]],
notMatchedOutputs: Seq[Seq[Seq[Expression]]],
noopCopyOutput: Seq[Expression],
deleteRowOutput: Seq[Expression],
joinedAttributes: Seq[Attribute],
joinedRowEncoder: ExpressionEncoder[Row],
outputRowEncoder: ExpressionEncoder[Row]) extends Serializable {
Expand All @@ -775,28 +845,24 @@ object MergeIntoCommand {
val targetRowHasNoMatchPred = generatePredicate(targetRowHasNoMatch)
val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch)
val matchedPreds = matchedConditions.map(generatePredicate)
val matchedProjs = matchedOutputs.map(generateProjection)
val matchedProjs = matchedOutputs.map(_.map(generateProjection))
val notMatchedPreds = notMatchedConditions.map(generatePredicate)
val notMatchedProjs = notMatchedOutputs.map(generateProjection)
val notMatchedProjs = notMatchedOutputs.map(_.map(generateProjection))
val noopCopyProj = generateProjection(noopCopyOutput)
val deleteRowProj = generateProjection(deleteRowOutput)
val outputProj = UnsafeProjection.create(outputRowEncoder.schema)

def shouldDeleteRow(row: InternalRow): Boolean =
row.getBoolean(outputRowEncoder.schema.fields.size)

def processRow(inputRow: InternalRow): InternalRow = {
def processRow(inputRow: InternalRow): Iterator[InternalRow] = {
if (targetRowHasNoMatchPred.eval(inputRow)) {
// Target row did not match any source row, so just copy it to the output
noopCopyProj.apply(inputRow)
Iterator(noopCopyProj.apply(inputRow))
} else {
// identify which set of clauses to execute: matched or not-matched ones
val (predicates, projections, noopAction) = if (sourceRowHasNoMatchPred.eval(inputRow)) {
// Source row did not match with any target row, so insert the new source row
(notMatchedPreds, notMatchedProjs, deleteRowProj)
(notMatchedPreds, notMatchedProjs, None)
} else {
// Source row matched with target row, so update the target row
(matchedPreds, matchedProjs, noopCopyProj)
(matchedPreds, matchedProjs, Some(noopCopyProj))
}

// find (predicate, projection) pair whose predicate satisfies inputRow
Expand All @@ -805,8 +871,9 @@ object MergeIntoCommand {
}

pair match {
case Some((_, projections)) => projections.apply(inputRow)
case None => noopAction.apply(inputRow)
case Some((_, projections)) =>
projections.map(_.apply(inputRow)).iterator
case None => noopAction.map(_.apply(inputRow)).iterator
}
}
}
Expand All @@ -815,8 +882,7 @@ object MergeIntoCommand {
val fromRow = outputRowEncoder.createDeserializer()
rowIterator
.map(toRow)
.map(processRow)
.filter(!shouldDeleteRow(_))
.flatMap(processRow)
.map { notDeletedInternalRow =>
tdas marked this conversation as resolved.
Show resolved Hide resolved
fromRow(outputProj(notDeletedInternalRow))
}
Expand Down
Loading