Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1105] Change Data Feed - MERGE command #1155

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, StructType}

case class MergeDataSizes(
@JsonDeserialize(contentAs = classOf[java.lang.Long])
Expand Down Expand Up @@ -216,6 +216,7 @@ case class MergeIntoCommand(
import MergeIntoCommand._

import SQLMetrics._
import org.apache.spark.sql.delta.commands.cdc.CDCReader._

override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE)
override val canOverwriteSchema: Boolean = false
Expand Down Expand Up @@ -254,6 +255,10 @@ case class MergeIntoCommand(
"numTargetFilesAfterSkipping" -> createMetric(sc, "number of target files after skipping"),
"numTargetFilesRemoved" -> createMetric(sc, "number of files removed to target"),
"numTargetFilesAdded" -> createMetric(sc, "number of files added to target"),
"numTargetChangeFilesAdded" ->
createMetric(sc, "number of change data capture files generated"),
"numTargetChangeFileBytes" ->
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
createMetric(sc, "total size of change data capture files generated"),
"numTargetBytesBeforeSkipping" -> createMetric(sc, "number of target bytes before skipping"),
"numTargetBytesAfterSkipping" -> createMetric(sc, "number of target bytes after skipping"),
"numTargetBytesRemoved" -> createMetric(sc, "number of target bytes removed"),
Expand Down Expand Up @@ -456,7 +461,7 @@ case class MergeIntoCommand(

val outputColNames = getTargetOutputCols(deltaTxn).map(_.name)
// we use head here since we know there is only a single notMatchedClause
val outputExprs = notMatchedClauses.head.resolvedActions.map(_.expr) :+ incrInsertedCountExpr
val outputExprs = notMatchedClauses.head.resolvedActions.map(_.expr)
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) =>
new Column(Alias(expr, name)())
}
Expand All @@ -478,6 +483,7 @@ case class MergeIntoCommand(

val insertDf = sourceDF.join(targetDF, new Column(condition), "leftanti")
.select(outputCols: _*)
.filter(new Column(incrInsertedCountExpr))

val newFiles = deltaTxn
.writeFiles(repartitionIfNeeded(spark, insertDf, deltaTxn.metadata.partitionColumns))
Expand All @@ -494,7 +500,7 @@ case class MergeIntoCommand(
metrics("numTargetBytesRemoved") += 0
metrics("numTargetPartitionsRemovedFrom") += 0
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
metrics("numTargetFilesAdded") += newFiles.size
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
metrics("numTargetBytesAdded") += addedBytes
metrics("numTargetPartitionsAddedTo") += addedPartitions
newFiles
Expand All @@ -503,12 +509,18 @@ case class MergeIntoCommand(
/**
* Write new files by reading the touched files and updating/inserting data using the source
* query/table. This is implemented using a full|right-outer-join using the merge condition.
*
* Note that unlike the insert-only code paths with just one control column INCR_ROW_COUNT_COL,
* this method has two additional control columns ROW_DROPPED_COL for dropping deleted rows and
* CDC_TYPE_COL_NAME used for handling CDC when enabled.
*/
private def writeAllChanges(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile]
): Seq[FileAction] = recordMergeOperation(sqlMetricName = "rewriteTimeMs") {
import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}

val targetOutputCols = getTargetOutputCols(deltaTxn)

// Generate a new logical plan that has same output attributes exprIds as the target plan.
Expand Down Expand Up @@ -548,36 +560,108 @@ case class MergeIntoCommand(
}

val joinedPlan = joinedDF.queryExecution.analyzed
val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = {
tryResolveReferencesForExpressions(spark, exprs, joinedPlan)
}

def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Expression] = {
// ==== Generate the expressions to process full-outer join output and generate target rows ====
// If there are N columns in the target table, there will be N + 3 columns after processing
// - N columns for target table
// - ROW_DROPPED_COL to define whether the generated row should dropped or written
// - INCR_ROW_COUNT_COL containing a UDF to update the output row row counter
// - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row

// To generate these N + 3 columns, we will generate N + 3 expressions and apply them to the
// rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before
// performing the final write, and the other two will always be dropped after executing the
// metrics UDF and filtering on ROW_DROPPED_COL.

// We produce rows for both the main table data (with CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC),
// and rows for the CDC data which will be output to CDCReader.CDC_LOCATION.
// See [[CDCReader]] for general details on how partitioning on the CDC type column works.
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved

// In the following two functions `matchedClauseOutput` and `notMatchedClauseOutput`, we
// produce a Seq[Expression] for each intended output row.
// Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a
// Seq[Seq[Expression]]

def matchedClauseOutput(clause: DeltaMergeIntoMatchedClause): Seq[Seq[Expression]] = {
val exprs = clause match {
case u: DeltaMergeIntoUpdateClause =>
// Generate update expressions and set ROW_DELETED_COL = false
u.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrUpdatedCountExpr
// Generate update expressions and set ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val mainDataOutput = u.resolvedActions.map(_.expr) :+ FalseLiteral :+
incrUpdatedCountExpr :+ Literal(CDC_TYPE_NOT_CDC)
if (cdcEnabled) {
// For update preimage, we have do a no-op copy with ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op
// (because the metric will be incremented in `mainDataOutput`)
val preImageOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+
Literal(CDC_TYPE_UPDATE_PREIMAGE)
// For update postimage, we have the same expressions as for mainDataOutput but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
// `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE
val postImageOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+
Literal(CDC_TYPE_UPDATE_POSTIMAGE)
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
Seq(mainDataOutput, preImageOutput, postImageOutput)
} else {
Seq(mainDataOutput)
}
case _: DeltaMergeIntoDeleteClause =>
// Generate expressions to set the ROW_DELETED_COL = true
targetOutputCols :+ Literal.TrueLiteral :+ incrDeletedCountExpr
// Generate expressions to set the ROW_DELETED_COL = true and CDC_TYPE_COLUMN_NAME =
// CDC_TYPE_NOT_CDC
val mainDataOutput = targetOutputCols :+ TrueLiteral :+ incrDeletedCountExpr :+
Literal(CDC_TYPE_NOT_CDC)
if (cdcEnabled) {
// For delete we do a no-op copy with ROW_DELETED_COL = false, INCR_ROW_COUNT_COL as a
// no-op (because the metric will be incremented in `mainDataOutput`) and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE
val deleteCdcOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+
Literal(CDC_TYPE_DELETE)
Seq(mainDataOutput, deleteCdcOutput)
} else {
Seq(mainDataOutput)
}
}
resolveOnJoinedPlan(exprs)
exprs.map(resolveOnJoinedPlan)
}

def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Expression] = {
resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ Literal.FalseLiteral :+ incrInsertedCountExpr)
def notMatchedClauseOutput(clause: DeltaMergeIntoInsertClause): Seq[Seq[Expression]] = {
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
// Generate insert expressions and set ROW_DELETED_COL = false and
// CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC
val mainDataOutput = resolveOnJoinedPlan(
clause.resolvedActions.map(_.expr) :+ FalseLiteral :+ incrInsertedCountExpr :+
Literal(CDC_TYPE_NOT_CDC))
if (cdcEnabled) {
// For insert we have the same expressions as for mainDataOutput, but with
// INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in
// `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT
val insertCdcOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ Literal(CDC_TYPE_INSERT)
Seq(mainDataOutput, insertCdcOutput)
} else {
Seq(mainDataOutput)
}
}

def clauseCondition(clause: DeltaMergeIntoClause): Expression = {
// if condition is None, then expression always evaluates to true
val condExpr = clause.condition.getOrElse(Literal.TrueLiteral)
val condExpr = clause.condition.getOrElse(TrueLiteral)
resolveOnJoinedPlan(Seq(condExpr)).head
}

val outputRowSchema = if (!cdcEnabled) {
deltaTxn.metadata.schema
} else {
deltaTxn.metadata.schema
.add(ROW_DROPPED_COL, DataTypes.BooleanType)
.add(INCR_ROW_COUNT_COL, DataTypes.BooleanType)
.add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType)
}

val joinedRowEncoder = RowEncoder(joinedPlan.schema)
val outputRowEncoder = RowEncoder(deltaTxn.metadata.schema).resolveAndBind()
val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind()

val processor = new JoinedRowProcessor(
targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,
Expand All @@ -587,15 +671,19 @@ case class MergeIntoCommand(
notMatchedConditions = notMatchedClauses.map(clauseCondition),
notMatchedOutputs = notMatchedClauses.map(notMatchedClauseOutput),
noopCopyOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.FalseLiteral :+ incrNoopCountExpr),
resolveOnJoinedPlan(targetOutputCols :+ FalseLiteral :+ incrNoopCountExpr :+
Literal(CDC_TYPE_NOT_CDC)),
deleteRowOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.TrueLiteral :+ Literal.TrueLiteral),
resolveOnJoinedPlan(targetOutputCols :+ TrueLiteral :+ TrueLiteral :+
Literal(CDC_TYPE_NOT_CDC)),
joinedAttributes = joinedPlan.output,
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)

val outputDF =
Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)
.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL)

logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)

// Write to Delta
Expand All @@ -604,7 +692,9 @@ case class MergeIntoCommand(

// Update metrics
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
metrics("numTargetFilesAdded") += newFiles.size
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])
metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile])
metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum
metrics("numTargetBytesAdded") += addedBytes
metrics("numTargetPartitionsAddedTo") += addedPartitions

Expand Down Expand Up @@ -748,14 +838,36 @@ object MergeIntoCommand {
val FILE_NAME_COL = "_file_name_"
val SOURCE_ROW_PRESENT_COL = "_source_row_present_"
val TARGET_ROW_PRESENT_COL = "_target_row_present_"
val ROW_DROPPED_COL = "_row_dropped_"
val INCR_ROW_COUNT_COL = "_incr_row_count_"

/**
* @param targetRowHasNoMatch whether a joined row is a target row with no match in the source
* table
* @param sourceRowHasNoMatch whether a joined row is a source row with no match in the target
* table
* @param matchedConditions condition for each match clause
* @param matchedOutputs corresponding output for each match clause. for each clause, we
* have 1-3 output rows, each of which is a sequence of expressions
* to apply to the joined row
* @param notMatchedConditions condition for each not-matched clause
* @param notMatchedOutputs corresponding output for each not-matched clause. for each clause,
* we have 1-2 output rows, each of which is a sequence of
* expressions to apply to the joined row
* @param noopCopyOutput no-op expression to copy a target row to the output
* @param deleteRowOutput expression to drop a row from the final output. this is used for
* source rows that don't match any not-matched clauses
* @param joinedAttributes schema of our outer-joined dataframe
* @param joinedRowEncoder joinedDF row encoder
* @param outputRowEncoder final output row encoder
*/
class JoinedRowProcessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this last time. Definitely add param docs. the triple sequence is hella confusing. honestly i should have param docs when i had originally implemented this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added param docs. An overview of what JoinedRowProcessor is doing may also be helpful, what do you think? I can add tomorrow

targetRowHasNoMatch: Expression,
sourceRowHasNoMatch: Expression,
matchedConditions: Seq[Expression],
matchedOutputs: Seq[Seq[Expression]],
matchedOutputs: Seq[Seq[Seq[Expression]]],
notMatchedConditions: Seq[Expression],
notMatchedOutputs: Seq[Seq[Expression]],
notMatchedOutputs: Seq[Seq[Seq[Expression]]],
noopCopyOutput: Seq[Expression],
deleteRowOutput: Seq[Expression],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't actually need this. it can be done the way it is in https://github.com/allisonport-db/delta/blob/02a238e6666e31cc74ea1dbda12842ce929de4d6/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L860 such that we simply do not create an output row. Not sure which is clearer to readers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont get what you are referring to here. thread got lost?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry might be hard to explain in writing.

But basically since now processRow returns an Iterator[InternalRow] instead of just InternalRow, instead of using an expression to create our "deletedRowOutput" that we later delete, we could simply omit that inputRow from the returned iterator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's implemented that way in the above linked commit, before I added back ROW_DROPPED_COL

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more a question of readability I think... not sure if either way is preferred to the other

joinedAttributes: Seq[Attribute],
Expand All @@ -775,20 +887,26 @@ object MergeIntoCommand {
val targetRowHasNoMatchPred = generatePredicate(targetRowHasNoMatch)
val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch)
val matchedPreds = matchedConditions.map(generatePredicate)
val matchedProjs = matchedOutputs.map(generateProjection)
val matchedProjs = matchedOutputs.map(_.map(generateProjection))
val notMatchedPreds = notMatchedConditions.map(generatePredicate)
val notMatchedProjs = notMatchedOutputs.map(generateProjection)
val notMatchedProjs = notMatchedOutputs.map(_.map(generateProjection))
val noopCopyProj = generateProjection(noopCopyOutput)
val deleteRowProj = generateProjection(deleteRowOutput)
val outputProj = UnsafeProjection.create(outputRowEncoder.schema)

def shouldDeleteRow(row: InternalRow): Boolean =
row.getBoolean(outputRowEncoder.schema.fields.size)
// this is accessing ROW_DROPPED_COL. If ROW_DROPPED_COL is not in outputRowEncoder.schema
// then CDC must be disabled and it's the column after our output cols
def shouldDeleteRow(row: InternalRow): Boolean = {
row.getBoolean(
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
outputRowEncoder.schema.getFieldIndex(ROW_DROPPED_COL)
.getOrElse(outputRowEncoder.schema.fields.size)
)
Comment on lines +900 to +903
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, this is simply mimicking the prior implementation when CDC is disabled.

Another solution is to have outputRowEncoder include ROW_DROPPED_COL when CDC is disabled. It will be dropped on line 684 regardless. Not sure the tradeoff with respect to decoding a column we don't need.

Copy link
Contributor

@tdas tdas Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is moot discussion now right? you have to used ROW_DROPPED_COL to get the metrics right .. right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just about how we get the index of ROW_DROPPED_COL. This fx could be simplified to

row.getBoolean(outputRowEncoder.schema.fieldIndex(ROW_DROPPED_COL))

if we always include ROW_DROPPED_COL in outputRowEncoder

}

def processRow(inputRow: InternalRow): InternalRow = {
def processRow(inputRow: InternalRow): Iterator[InternalRow] = {
if (targetRowHasNoMatchPred.eval(inputRow)) {
// Target row did not match any source row, so just copy it to the output
noopCopyProj.apply(inputRow)
Iterator(noopCopyProj.apply(inputRow))
} else {
// identify which set of clauses to execute: matched or not-matched ones
val (predicates, projections, noopAction) = if (sourceRowHasNoMatchPred.eval(inputRow)) {
Expand All @@ -805,8 +923,9 @@ object MergeIntoCommand {
}

pair match {
case Some((_, projections)) => projections.apply(inputRow)
case None => noopAction.apply(inputRow)
case Some((_, projections)) =>
projections.map(_.apply(inputRow)).iterator
case None => Iterator(noopAction.apply(inputRow))
}
}
}
Expand All @@ -815,7 +934,7 @@ object MergeIntoCommand {
val fromRow = outputRowEncoder.createDeserializer()
rowIterator
.map(toRow)
.map(processRow)
.flatMap(processRow)
.filter(!shouldDeleteRow(_))
.map { notDeletedInternalRow =>
tdas marked this conversation as resolved.
Show resolved Hide resolved
fromRow(outputProj(notDeletedInternalRow))
Expand Down
Loading