Skip to content

Commit

Permalink
[Spark] Fix O(n^2) issue in find last complete checkpoint before (#3060)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR fixes an O(n^2) issue in `find last complete checkpoint before`
method.

Today the `findLastCompleteCheckpointBefore` tries to find the last
checkpoint before a given version. In order to do this, it do following:
findLastCompleteCheckpointBefore(10000):
1. List from 9000
2. List from 8000
3. List from 7000
...

Each of these listing today lists to the end as they completely ignore
delta files and try to list with takeWhile with version clause:

```
    listFrom(..)
          .filter { file => isCheckpointFile(file) && file.getLen != 0 }
          .map{ file => CheckpointInstance(file.getPath) }
          .takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv)
```

This PR tries to fix this issue by terminating each listing early by
checking if we have crossed a deltaFile for untilVersion.

In addition to this, we also optimize how much to list in each
iteration.
E.g. After this PR, findLastCompleteCheckpointBefore(10000) will need:
1. Iteration-1 lists from 9000 to 10000.
2. Iteration-2 lists from 8000 to 9000.
3. Iteration-3 lists from 7000 to 8000.
4. and so on...


## How was this patch tested?

UT
  • Loading branch information
prakharjain09 authored May 7, 2024
1 parent 3baeb99 commit d0bd28e
Show file tree
Hide file tree
Showing 4 changed files with 518 additions and 24 deletions.
85 changes: 61 additions & 24 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.apache.spark.sql.delta.actions.{Action, CheckpointMetadata, Metadata,
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaLogGroupingIterator, FileNames}
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -417,7 +416,7 @@ trait Checkpoints extends DeltaLogging {
* Note that the returned checkpoint will always be < `version`.
* @param version The checkpoint version to compare against
*/
protected def findLastCompleteCheckpointBefore(version: Long): Option[CheckpointInstance] = {
private[delta] def findLastCompleteCheckpointBefore(version: Long): Option[CheckpointInstance] = {
val upperBound = CheckpointInstance(version, CheckpointInstance.Format.SINGLE, numParts = None)
findLastCompleteCheckpointBefore(Some(upperBound))
}
Expand All @@ -428,38 +427,76 @@ trait Checkpoints extends DeltaLogging {
* deltalog directory.
* @param checkpointInstance The checkpoint instance to compare against
*/
protected def findLastCompleteCheckpointBefore(
private[delta] def findLastCompleteCheckpointBefore(
checkpointInstance: Option[CheckpointInstance] = None): Option[CheckpointInstance] = {
val (upperBoundCv, startVersion) = checkpointInstance
.collect { case cv if cv.version >= 0 => (cv, cv.version) }
.getOrElse((CheckpointInstance.sentinelValue(versionOpt = None), 0L))
var cur = startVersion
val hadoopConf = newDeltaHadoopConf()

logInfo(s"Try to find Delta last complete checkpoint before version $startVersion")
while (cur >= 0) {
val checkpoints = store.listFrom(
listingPrefix(logPath, math.max(0, cur - 1000)),
hadoopConf)
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
// such files, hence we drop them so that we never pick up such checkpoints.
.filter { file => isCheckpointFile(file) && file.getLen != 0 }
.map{ file => CheckpointInstance(file.getPath) }
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv < upperBoundCv)
.toArray
val upperBoundCv = checkpointInstance.filterNot(_.version < 0).getOrElse {
logInfo(s"Try to find Delta last complete checkpoint")
return findLastCompleteCheckpoint()
}
logInfo(s"Try to find Delta last complete checkpoint before version ${upperBoundCv.version}")
var listingEndVersion = upperBoundCv.version

// Do a backward listing from the upperBoundCv version. We list in chunks of 1000 versions.
// ...........................................................................................
// |
// upper bound cv's version
// [ iter-1 looks in this window ]
// [ iter-2 window ]
// [ iter-3 window ]
// |
// latest checkpoint
while (listingEndVersion >= 0) {
val listingStartVersion = math.max(0, listingEndVersion - 1000)
val checkpoints = store
.listFrom(listingPrefix(logPath, listingStartVersion), newDeltaHadoopConf())
.collect {
// Also collect delta files from the listing result so that the next takeWhile helps us
// terminate iterator early if no checkpoint exists upto the `listingEndVersion`
// version.
case DeltaFile(file, version) => (file, FileType.DELTA, version)
case CheckpointFile(file, version) => (file, FileType.CHECKPOINT, version)
}
.takeWhile { case (_, _, currentFileVersion) => currentFileVersion <= listingEndVersion }
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when
// reading such files, hence we drop them so that we never pick up such checkpoints.
.collect { case (file, FileType.CHECKPOINT, _) if file.getLen > 0 =>
CheckpointInstance(file.getPath)
}
// We still need to filter on `upperBoundCv` to eliminate checkpoint files which are
// same version as `upperBoundCv` but have higher [[CheckpointInstance.Format]]. e.g.
// upperBoundCv is a V2_Checkpoint and we have a Single part checkpoint and a v2
// checkpoint at the same version. In such a scenario, we should not consider the
// v2 checkpoint as it is nor lower than the upperBoundCv.
.filter(_ < upperBoundCv)
.toArray
val lastCheckpoint =
getLatestCompleteCheckpointFromList(checkpoints, Some(upperBoundCv.version))
if (lastCheckpoint.isDefined) {
logInfo(s"Delta checkpoint is found at version ${lastCheckpoint.get.version}")
return lastCheckpoint
} else {
cur -= 1000
}
listingEndVersion = listingEndVersion - 1000
}
logInfo(s"No checkpoint found for Delta table before version $startVersion")
logInfo(s"No checkpoint found for Delta table before version ${upperBoundCv.version}")
None
}

/** Returns the last complete checkpoint in the delta log directory (if any) */
private def findLastCompleteCheckpoint(): Option[CheckpointInstance] = {
val hadoopConf = newDeltaHadoopConf()
val listingResult = store
.listFrom(listingPrefix(logPath, 0L), hadoopConf)
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when
// reading such files, hence we drop them so that we never pick up such checkpoints.
.collect { case CheckpointFile(file, _) if file.getLen != 0 => file }
new DeltaLogGroupingIterator(listingResult)
.flatMap { case (_, files) =>
getLatestCompleteCheckpointFromList(files.map(f => CheckpointInstance(f.getPath)).toArray)
}.foldLeft(Option.empty[CheckpointInstance])((_, right) => Some(right))
// ^The foldLeft here emulates the non-existing Iterator.tailOption method.

}

/**
* Given a list of checkpoint files, pick the latest complete checkpoint instance which is not
* later than `notLaterThan`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.util.FileNames.{CheckpointFile, DeltaFile}
import org.apache.hadoop.fs.FileStatus

/**
* An iterator that groups same types of files by version.
* Note that this class could handle only Checkpoints and Delta files.
* For example for an input iterator:
* - 11.checkpoint.0.1.parquet
* - 11.checkpoint.1.1.parquet
* - 11.json
* - 12.checkpoint.parquet
* - 12.json
* - 13.json
* - 14.json
* - 15.checkpoint.0.1.parquet
* - 15.checkpoint.1.1.parquet
* - 15.checkpoint.<uuid>.parquet
* - 15.json
* This will return:
* - (11, Seq(11.checkpoint.0.1.parquet, 11.checkpoint.1.1.parquet, 11.json))
* - (12, Seq(12.checkpoint.parquet, 12.json))
* - (13, Seq(13.json))
* - (14, Seq(14.json))
* - (15, Seq(15.checkpoint.0.1.parquet, 15.checkpoint.1.1.parquet, 15.checkpoint.<uuid>.parquet,
* 15.json))
*/
class DeltaLogGroupingIterator(
checkpointAndDeltas: Iterator[FileStatus]) extends Iterator[(Long, ArrayBuffer[FileStatus])] {

private val bufferedIterator = checkpointAndDeltas.buffered

/**
* Validates that the underlying file is a checkpoint/delta file and returns the corresponding
* version.
*/
private def getFileVersion(file: FileStatus): Long = {
file match {
case DeltaFile(_, version) => version
case CheckpointFile(_, version) => version
case _ =>
throw new IllegalStateException(
s"${file.getPath} is not a valid commit file / checkpoint file")
}
}

override def hasNext: Boolean = bufferedIterator.hasNext

override def next(): (Long, ArrayBuffer[FileStatus]) = {
val first = bufferedIterator.next()
val buffer = scala.collection.mutable.ArrayBuffer(first)
val firstFileVersion = getFileVersion(first)
while (bufferedIterator.headOption.exists(getFileVersion(_) == firstFileVersion)) {
buffer += bufferedIterator.next()
}
firstFileVersion -> buffer
}
}
Loading

0 comments on commit d0bd28e

Please sign in to comment.