Skip to content

Commit

Permalink
Fix issues around empty file action transactions
Browse files Browse the repository at this point in the history
This PR fixes the empty file action transaction issue.
Currently Delta uses SnapshotIsolation when a txn wants to commit stuff with no file actions. This is wrong - A txn without no FileActions can still have Metadata updates. A metadata update shouldn't use SnapshotIsolation.

GitOrigin-RevId: 10d4fbe5f6ff29b9a6aad995f0e4dbc4b30da135
  • Loading branch information
prakharjain09 authored and allisonport-db committed Jan 21, 2022
1 parent f5cef19 commit 0d07d09
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private[delta] class CurrentTransactionInfo(

/** Final actions to commit - including the [[CommitInfo]] */
lazy val finalActionsToCommit: Seq[Action] = actions ++ commitInfo

/** Whether this transaction wants to commit actions other than [[FileAction]] */
val hasOnlyFileActions = actions.forall(_.isInstanceOf[FileAction])
}

/**
Expand Down Expand Up @@ -162,10 +165,10 @@ private[delta] class ConflictChecker(
recordTime("checked-appends") {
// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = isolationLevel match {
case Serializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
case WriteSerializable =>
case WriteSerializable if currentTransactionInfo.hasOnlyFileActions =>
winningCommitSummary.changedDataAddedFiles // don't conflict with blind appends
case Serializable | WriteSerializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
case SnapshotIsolation =>
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
val preparedActions = prepareCommit(actions, op)

// Find the isolation level to use for this commit
val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false)
val isolationLevelToUse = if (noDataChanged) {
// If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
// provides Serializable guarantee. Hence, allow reduced conflict detection by using
// SnapshotIsolation of what the table isolation level is.
SnapshotIsolation
} else {
Serializable
}
val isolationLevelToUse = getIsolationLevelToUse(preparedActions)

if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) {
val isBlindAppend = {
Expand Down Expand Up @@ -638,6 +630,27 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
finalActions
}

// Returns the isolation level to use for committing the transaction
protected def getIsolationLevelToUse(preparedActions: Seq[Action]): IsolationLevel = {
val noDataChanged = preparedActions
.collectFirst { case f: FileAction if f.dataChange => f }
.isEmpty
val hasOnlyFileActions = preparedActions.forall(_.isInstanceOf[FileAction])
val isolationLevelToUse = if (noDataChanged && hasOnlyFileActions) {
// If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
// provides Serializable guarantee. Hence, allow reduced conflict detection by using
// SnapshotIsolation of what the table isolation level is.
SnapshotIsolation
} else {
getDefaultIsolationLevel()
}
isolationLevelToUse
}

protected def getDefaultIsolationLevel(): IsolationLevel = {
Serializable
}

/**
* Returns true if we should checkpoint the version that has just been committed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class OptimisticTransactionSuite
val isolationLevels = log.history.getHistory(Some(10)).map(_.isolationLevel)
assert(isolationLevels.size == 2)
assert(isolationLevels(0).exists(_.contains("Serializable")))
assert(isolationLevels(1) == Some(SnapshotIsolation.toString))
assert(isolationLevels(0).exists(_.contains("Serializable")))
}
}

Expand Down

0 comments on commit 0d07d09

Please sign in to comment.