diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala index 551d5b58176..45741217c47 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.FileNames._ @@ -59,6 +60,19 @@ trait CheckpointProvider extends UninitializedCheckpointProvider { def allActionsFileIndexes(): Seq[DeltaLogFileIndex] } +object CheckpointProvider extends DeltaLogging { + + private[delta] def isV2CheckpointEnabled(protocol: Protocol): Boolean = + protocol.isFeatureSupported(V2CheckpointTableFeature) + + /** + * Returns whether V2 Checkpoints are enabled or not. + * This means an underlying checkpoint in this table could be a V2Checkpoint with sidecar files. + */ + def isV2CheckpointEnabled(snapshotDescriptor: SnapshotDescriptor): Boolean = + isV2CheckpointEnabled(snapshotDescriptor.protocol) +} + /** * An implementation of [[CheckpointProvider]] where the information about checkpoint files * (i.e. Seq[FileStatus]) is already known in advance. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 10dec100852..924cec3419f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -24,7 +24,7 @@ import scala.math.Ordering.Implicits._ import scala.util.control.NonFatal // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.actions.{Metadata, SingleAction} +import org.apache.spark.sql.delta.actions.{Action, CheckpointMetadata, Metadata, SidecarFile, SingleAction} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore @@ -36,11 +36,14 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID} import org.apache.hadoop.mapreduce.{Job, TaskType} -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.TaskContext +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.{coalesce, col, struct, when} import org.apache.spark.sql.internal.SQLConf @@ -389,7 +392,9 @@ trait Checkpoints extends DeltaLogging { } } -object Checkpoints extends DeltaLogging { +object Checkpoints + extends DeltaLogging + { /** The name of the last checkpoint file */ val LAST_CHECKPOINT_FILE_NAME = "_last_checkpoint" @@ -424,6 +429,26 @@ object Checkpoints extends DeltaLogging { // log store and decide whether to use rename. val useRename = deltaLog.store.isPartialWriteVisible(deltaLog.logPath, hadoopConf) + val v2CheckpointFormatOpt = { + val policy = DeltaConfigs.CHECKPOINT_POLICY.fromMetaData(snapshot.metadata) + if (policy.needsV2CheckpointSupport) { + assert(CheckpointProvider.isV2CheckpointEnabled(snapshot)) + val v2Format = spark.conf.get(DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT) + // The format of the top level file in V2 checkpoints can be configured through + // the optional config [[DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT]]. + // If nothing is specified, we use the json format. In the future, we may + // write json/parquet dynamically based on heuristics. + v2Format match { + case Some(V2Checkpoint.Format.JSON.name) | None => Some(V2Checkpoint.Format.JSON) + case Some(V2Checkpoint.Format.PARQUET.name) => Some(V2Checkpoint.Format.PARQUET) + case _ => throw new IllegalStateException("unknown checkpoint format") + } + } else { + None + } + } + val v2CheckpointEnabled = v2CheckpointFormatOpt.nonEmpty + val checkpointRowCount = spark.sparkContext.longAccumulator("checkpointRowCount") val numOfFiles = spark.sparkContext.longAccumulator("numOfFiles") @@ -433,34 +458,35 @@ object Checkpoints extends DeltaLogging { val numParts = checkpointPartSize.map { partSize => math.ceil((snapshot.numOfFiles + snapshot.numOfRemoves).toDouble / partSize).toLong - }.getOrElse(1L) - - val checkpointPaths = if (numParts > 1) { - checkpointFileWithParts(snapshot.path, snapshot.version, numParts.toInt) - } else { - checkpointFileSingular(snapshot.path, snapshot.version) :: Nil - } - - val numPartsOption = if (numParts > 1) { - Some(checkpointPaths.length) - } else { - None - } - - // Use the string in the closure as Path is not Serializable. - val paths = checkpointPaths.map(_.toString) - val base = snapshot.stateDS - .repartition(paths.length, coalesce(col("add.path"), col("remove.path"))) - .map { action => - if (action.add != null) { - numOfFiles.add(1) + }.getOrElse(1L).toInt + val legacyMultiPartCheckpoint = !v2CheckpointEnabled && numParts > 1 + + val base = { + val repartitioned = snapshot.stateDS + .repartition(numParts, coalesce(col("add.path"), col("remove.path"))) + .map { action => + if (action.add != null) { + numOfFiles.add(1) + } + action } - action + // commitInfo, cdc and remove.tags are not included in both classic and V2 checkpoints. + if (v2CheckpointEnabled) { + // When V2 Checkpoint is enabled, the baseCheckpoint refers to the sidecar files which will + // only have AddFile and RemoveFile actions. The other non-file actions will be written + // separately after sidecar files are written. + repartitioned + .select("add", "remove") + .withColumn("remove", col("remove").dropFields("tags", "stats")) + .where("add is not null or remove is not null") + } else { + // When V2 Checkpoint is disabled, the baseCheckpoint refers to the main classic checkpoint + // which has all actions except "commitInfo", "cdc", "checkpointMetadata", "sidecar". + repartitioned + .drop("commitInfo", "cdc", "checkpointMetadata", "sidecar") + .withColumn("remove", col("remove").dropFields("tags", "stats")) } - // commitInfo, cdc, remove.tags and remove.stats are not included in the checkpoint - // TODO: Add support for V2 Checkpoints here. - .drop("commitInfo", "cdc", "checkpointMetadata", "sidecar") - .withColumn("remove", col("remove").dropFields("tags", "stats")) + } val chk = buildCheckpoint(base, snapshot) val schema = chk.schema.asNullable @@ -472,25 +498,27 @@ object Checkpoints extends DeltaLogging { new SerializableConfiguration(job.getConfiguration)) } + // Use the SparkPath in the closure as Path is not Serializable. + val logSparkPath = SparkPath.fromPath(snapshot.path) + val version = snapshot.version + // This is a hack to get spark to write directly to a file. val qe = chk.queryExecution def executeFinalCheckpointFiles(): Array[SerializableFileStatus] = qe .executedPlan .execute() - .mapPartitionsWithIndex { case (index, iter) => - val finalPath = new Path(paths(index)) - val writtenPath = - if (useRename) { - // Two instances of the same task may run at the same time in some cases (e.g., - // speculation, stage retry), so generate the temp path here to avoid two tasks - // using the same path. - val tempPath = - new Path(finalPath.getParent, s".${finalPath.getName}.${UUID.randomUUID}.tmp") - DeltaFileOperations.registerTempFileDeletionTaskFailureListener(serConf.value, tempPath) - tempPath - } else { - finalPath - } + .mapPartitions { case iter => + val actualNumParts = Option(TaskContext.get).map(_.numPartitions()) + .getOrElse(numParts) + val partition = TaskContext.getPartitionId() + val (writtenPath, finalPath) = Checkpoints.getCheckpointWritePath( + serConf.value, + logSparkPath.toPath, + version, + actualNumParts, + partition, + useRename, + v2CheckpointEnabled) val fs = writtenPath.getFileSystem(serConf.value) val writeAction = () => { try { @@ -547,25 +575,282 @@ object Checkpoints extends DeltaLogging { executeFinalCheckpointFiles() } - val checkpointSizeInBytes = finalCheckpointFiles.map(_.length).sum if (numOfFiles.value != snapshot.numOfFiles) { throw DeltaErrors.checkpointMismatchWithSnapshot } - // Attempting to write empty checkpoint - if (checkpointRowCount.value == 0) { + val parquetFilesSizeInBytes = finalCheckpointFiles.map(_.length).sum + var overallCheckpointSizeInBytes = parquetFilesSizeInBytes + var overallNumCheckpointActions: Long = checkpointRowCount.value + var checkpointSchemaToWriteInLastCheckpoint: Option[StructType] = + Checkpoints.checkpointSchemaToWriteInLastCheckpointFile(spark, schema) + + val v2Checkpoint = if (v2CheckpointEnabled) { + val (v2CheckpointFileStatus, nonFileActionsWriten, v2Checkpoint, checkpointSchema) = + Checkpoints.writeTopLevelV2Checkpoint( + v2CheckpointFormatOpt.get, + finalCheckpointFiles, + spark, + schema, + snapshot, + deltaLog, + overallNumCheckpointActions, + parquetFilesSizeInBytes, + hadoopConf, + useRename + ) + overallCheckpointSizeInBytes += v2CheckpointFileStatus.getLen + overallNumCheckpointActions += nonFileActionsWriten.size + checkpointSchemaToWriteInLastCheckpoint = checkpointSchema + + Some(v2Checkpoint) + } else { + None + } + + if (!v2CheckpointEnabled && checkpointRowCount.value == 0) { + // In case of V2 Checkpoints, zero row count is possible. logWarning(DeltaErrors.EmptyCheckpointErrorMessage) } + + // If we don't parallelize, we use None for backwards compatibility + val checkpointParts = if (legacyMultiPartCheckpoint) Some(numParts) else None + LastCheckpointInfo( version = snapshot.version, - size = checkpointRowCount.value, - parts = numPartsOption, - sizeInBytes = Some(checkpointSizeInBytes), + size = overallNumCheckpointActions, + parts = checkpointParts, + sizeInBytes = Some(overallCheckpointSizeInBytes), numOfAddFiles = Some(snapshot.numOfFiles), - checkpointSchema = checkpointSchemaToWriteInLastCheckpointFile(spark, schema) + v2Checkpoint = v2Checkpoint, + checkpointSchema = checkpointSchemaToWriteInLastCheckpoint + ) + } + + /** + * Generate a tuple of the file to write the checkpoint and where it may later need + * to be copied. Should be used within a task, so that task or stage retries don't + * create the same files. + */ + def getCheckpointWritePath( + conf: Configuration, + logPath: Path, + version: Long, + numParts: Int, + part: Int, + useRename: Boolean, + v2CheckpointEnabled: Boolean): (Path, Path) = { + def getCheckpointWritePath(path: Path): Path = { + if (useRename) { + val tempPath = + new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}.tmp") + DeltaFileOperations.registerTempFileDeletionTaskFailureListener(conf, tempPath) + tempPath + } else { + path + } + } + val destinationName: Path = if (v2CheckpointEnabled) { + newV2CheckpointSidecarFile(logPath, version, numParts, part + 1) + } else { + if (numParts > 1) { + assert(part < numParts, s"Asked to create part: $part of max $numParts in checkpoint.") + checkpointFileWithParts(logPath, version, numParts)(part) + } else { + checkpointFileSingular(logPath, version) + } + } + + getCheckpointWritePath(destinationName) -> destinationName + } + + /** + * Writes a top-level V2 Checkpoint file which may point to multiple + * sidecar files. + * + * @param v2CheckpointFormat The format in which the top-level file should be + * written. Currently, json and parquet are supported. + * @param sidecarCheckpointFiles The list of sidecar files that have already been + * written. The top-level file will store this list. + * @param spark The current spark session + * @param sidecarSchema The schema of the sidecar parquet files. + * @param snapshot The snapshot for which the checkpoint is being written. + * @param deltaLog The deltaLog instance pointing to our tables deltaLog. + * @param rowsWrittenInCheckpointJob The number of rows that were written in total + * to the sidecar files. + * @param parquetFilesSizeInBytes The combined size of all sidecar files in bytes. + * @param hadoopConf The hadoopConf to use for the filesystem operation. + * @param useRename Whether we should first write to a temporary file and then + * rename it to the target file name during the write. + * @return A tuple containing + * 1. [[FileStatus]] of the newly created top-level V2Checkpoint. + * 2. The sequence of actions that were written to the top-level file. + * 3. An instance of the LastCheckpointV2 containing V2-checkpoint related + * metadata which can later be written to LAST_CHECKPOINT + * 4. Schema of the newly written top-level file (only for parquet files) + */ + protected[delta] def writeTopLevelV2Checkpoint( + v2CheckpointFormat: V2Checkpoint.Format, + sidecarCheckpointFiles: Array[SerializableFileStatus], + spark: SparkSession, + sidecarSchema: StructType, + snapshot: Snapshot, + deltaLog: DeltaLog, + rowsWrittenInCheckpointJob: Long, + parquetFilesSizeInBytes: Long, + hadoopConf: Configuration, + useRename: Boolean) : (FileStatus, Seq[Action], LastCheckpointV2, Option[StructType]) = { + // Write the main v2 checkpoint file. + val sidecarFilesWritten = sidecarCheckpointFiles.map(SidecarFile(_)).toSeq + // Filter out the sidecar schema if it is too large. + val sidecarFileSchemaOpt = + Checkpoints.checkpointSchemaToWriteInLastCheckpointFile(spark, sidecarSchema) + val checkpointMetadata = CheckpointMetadata(snapshot.version) + + val nonFileActionsToWrite = + (checkpointMetadata +: sidecarFilesWritten) ++ snapshot.nonFileActions + val (v2CheckpointPath, checkpointSchemaToWriteInLastCheckpoint) = + if (v2CheckpointFormat == V2Checkpoint.Format.JSON) { + val v2CheckpointPath = newV2CheckpointJsonFile(deltaLog.logPath, snapshot.version) + deltaLog.store.write( + v2CheckpointPath, + nonFileActionsToWrite.map(_.json).toIterator, + overwrite = true, + hadoopConf = hadoopConf + ) + (v2CheckpointPath, None) + } else if (v2CheckpointFormat == V2Checkpoint.Format.PARQUET) { + val sparkSession = spark + // scalastyle:off sparkimplicits + import sparkSession.implicits._ + // scalastyle:on sparkimplicits + val dfToWrite = nonFileActionsToWrite.map(_.wrap).toDF() + val v2CheckpointPath = newV2CheckpointParquetFile(deltaLog.logPath, snapshot.version) + val schemaOfDfWritten = createCheckpointV2ParquetFile( + spark, dfToWrite, v2CheckpointPath, hadoopConf, useRename) + (v2CheckpointPath, Some(schemaOfDfWritten)) + } else { + throw DeltaErrors.assertionFailedError( + s"Unrecognized checkpoint V2 format: $v2CheckpointFormat") + } + // Main Checkpoint V2 File written successfully. Now create the last checkpoint v2 blob so + // that we can persist it in _last_checkpoint file. + val v2CheckpointFileStatus = + v2CheckpointPath.getFileSystem(hadoopConf).getFileStatus(v2CheckpointPath) + val unfilteredV2Checkpoint = LastCheckpointV2( + fileStatus = v2CheckpointFileStatus, + nonFileActions = Some((snapshot.nonFileActions :+ checkpointMetadata).map(_.wrap)), + sidecarFiles = Some(sidecarFilesWritten) + ) + ( + v2CheckpointFileStatus, + nonFileActionsToWrite, + trimLastCheckpointV2(unfilteredV2Checkpoint, spark), + checkpointSchemaToWriteInLastCheckpoint ) } + /** + * Helper method to create a V2 Checkpoint parquet file or the V2 Checkpoint Compat file. + * V2 Checkpoint Compat files follow the same naming convention as classic checkpoints + * and they are needed so that V2Checkpoint-unaware readers can read them to understand + * that they don't have the capability to read table for which they were created. + * This is needed in cases where commit 0 has been cleaned up and the reader needs to + * read a checkpoint to read the [[Protocol]]. + */ + def createCheckpointV2ParquetFile( + spark: SparkSession, + ds: Dataset[Row], + finalPath: Path, + hadoopConf: Configuration, + useRename: Boolean): StructType = { + val df = ds.select( + "txn", "add", "remove", "metaData", "protocol", "domainMetadata", + "checkpointMetadata", "sidecar") + val schema = df.schema.asNullable + val format = new ParquetFileFormat() + val job = Job.getInstance(hadoopConf) + val factory = format.prepareWrite(spark, job, Map.empty, schema) + val serConf = new SerializableConfiguration(job.getConfiguration) + val finalSparkPath = SparkPath.fromPath(finalPath) + + df.repartition(1) + .queryExecution + .executedPlan + .execute() + .mapPartitions { iter => + val actualNumParts = Option(TaskContext.get).map(_.numPartitions()).getOrElse(1) + require(actualNumParts == 1, "The parquet V2 checkpoint must be written in 1 file") + val partition = TaskContext.getPartitionId() + val finalPath = finalSparkPath.toPath + val writePath = if (useRename) { + val tempPath = + new Path(finalPath.getParent, s".${finalPath.getName}.${UUID.randomUUID}.tmp") + DeltaFileOperations.registerTempFileDeletionTaskFailureListener(serConf.value, tempPath) + tempPath + } else { + finalPath + } + + val fs = writePath.getFileSystem(serConf.value) + + val attemptId = 0 + val taskAttemptContext = new TaskAttemptContextImpl( + new JobConf(serConf.value), + new TaskAttemptID("", 0, TaskType.REDUCE, partition, attemptId)) + + var writerOpt: Option[OutputWriter] = None + + try { + writerOpt = Some(factory.newInstance( + writePath.toString, + schema, + taskAttemptContext)) + + val writer = writerOpt.get + iter.foreach { row => + writer.write(row) + } + // Note: `writer.close()` is not put in a `finally` clause because we don't want to + // close it when an exception happens. Closing the file would flush the content to the + // storage and create an incomplete file. A concurrent reader might see it and fail. + // This would leak resources but we don't have a way to abort the storage request here. + writer.close() + } catch { + case _: org.apache.hadoop.fs.FileAlreadyExistsException + if !useRename && fs.exists(writePath) => + // The file has been written by a zombie task. We can just use this checkpoint file + // rather than failing a Delta commit. + case t: Throwable => + throw t + } + if (useRename) { + renameAndCleanupTempPartFile(writePath, finalPath, fs) + } + val finalPathFileStatus = try { + fs.getFileStatus(finalPath) + } catch { + case _: FileNotFoundException if useRename => + throw DeltaErrors.failOnCheckpointRename(writePath, finalPath) + } + Iterator(SerializableFileStatus.fromStatus(finalPathFileStatus)) + }.collect() + schema + } + + /** Bounds the size of a [[LastCheckpointV2]] by removing any oversized optional fields */ + def trimLastCheckpointV2( + lastCheckpointV2: LastCheckpointV2, + spark: SparkSession): LastCheckpointV2 = { + val nonFileActionThreshold = + spark.sessionState.conf.getConf(DeltaSQLConf.LAST_CHECKPOINT_NON_FILE_ACTIONS_THRESHOLD) + val sidecarThreshold = + spark.sessionState.conf.getConf(DeltaSQLConf.LAST_CHECKPOINT_SIDECARS_THRESHOLD) + lastCheckpointV2.copy( + sidecarFiles = lastCheckpointV2.sidecarFiles.filter(_.size <= sidecarThreshold), + nonFileActions = lastCheckpointV2.nonFileActions.filter(_.size <= nonFileActionThreshold)) + } + /** * Helper method to rename a `tempPath` checkpoint part file to `finalPath` checkpoint part file. * This also tries to handle any race conditions with Zombie tasks. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/LastCheckpointInfo.scala b/spark/src/main/scala/org/apache/spark/sql/delta/LastCheckpointInfo.scala index 44075db4577..580ba238d77 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/LastCheckpointInfo.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/LastCheckpointInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.delta.actions.{CheckpointMetadata, SidecarFile, SingleAction} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, numCheckpointParts} import org.apache.spark.sql.delta.util.JsonUtils @@ -31,6 +32,46 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +/** + * Information about the V2 Checkpoint in the LAST_CHECKPOINT file + * @param path file name corresponding to the uuid-named v2 checkpoint + * @param sizeInBytes size in bytes for the uuid-named v2 checkpoint + * @param modificationTime modification time for the uuid-named v2 checkpoint + * @param nonFileActions all non file actions for the v2 checkpoint. This info may or may not be + * available. A None value means that info is missing. + * If it is not None, then it should have all the non-FileAction + * corresponding to the checkpoint. + * @param sidecarFiles sidecar files corresponding to the v2 checkpoint. This info may or may + * not be available. A None value means that this info is missing. + * An empty list denotes that the v2 checkpoint has no sidecars. + */ +case class LastCheckpointV2( + path: String, + sizeInBytes: Long, + modificationTime: Long, + nonFileActions: Option[Seq[SingleAction]], + sidecarFiles: Option[Seq[SidecarFile]]) { + + @JsonIgnore + lazy val checkpointMetadataOpt: Option[CheckpointMetadata] = + nonFileActions.flatMap(_.map(_.unwrap).collectFirst { case cm: CheckpointMetadata => cm }) + +} + +object LastCheckpointV2 { + def apply( + fileStatus: FileStatus, + nonFileActions: Option[Seq[SingleAction]] = None, + sidecarFiles: Option[Seq[SidecarFile]] = None): LastCheckpointV2 = { + LastCheckpointV2( + path = fileStatus.getPath.getName, + sizeInBytes = fileStatus.getLen, + modificationTime = fileStatus.getModificationTime, + nonFileActions = nonFileActions, + sidecarFiles = sidecarFiles) + } +} + /** * Records information about a checkpoint. * @@ -67,6 +108,7 @@ case class LastCheckpointInfo( @JsonDeserialize(contentAs = classOf[java.lang.Long]) numOfAddFiles: Option[Long], checkpointSchema: Option[StructType], + v2Checkpoint: Option[LastCheckpointV2] = None, checksum: Option[String] = None) { @JsonIgnore @@ -107,6 +149,7 @@ object LastCheckpointInfo { lastCheckpointInfo.parts, sizeInBytes = None, numOfAddFiles = None, + v2Checkpoint = None, checkpointSchema = None)) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index aa3648b1fab..463303512e4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -94,6 +94,12 @@ class Snapshot( override def columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode + private[delta] lazy val nonFileActions: Seq[Action] = { + Seq(protocol, metadata) ++ + setTransactions ++ + domainMetadata + } + @volatile private[delta] var stateReconstructionTriggered = false /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 7b3e268e902..1f62ca45348 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -603,6 +603,26 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val LAST_CHECKPOINT_NON_FILE_ACTIONS_THRESHOLD = + buildConf("lastCheckpoint.nonFileActions.threshold") + .internal() + .doc(""" + |Threshold for total number of non file-actions to store in the last_checkpoint + | corresponding to the checkpoint v2. + |""".stripMargin) + .intConf + .createWithDefault(30) + + val LAST_CHECKPOINT_SIDECARS_THRESHOLD = + buildConf("lastCheckpoint.sidecars.threshold") + .internal() + .doc(""" + |Threshold for total number of sidecar files to store in the last_checkpoint + | corresponding to the checkpoint v2. + |""".stripMargin) + .intConf + .createWithDefault(30) + val DELTA_WRITE_CHECKSUM_ENABLED = buildConf("writeChecksumFile.enabled") .doc("Whether the checksum file can be written.") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala index f2ec30f7d46..d68bf518138 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/FileNames.scala @@ -25,7 +25,7 @@ object FileNames { val deltaFileRegex = raw"(\d+)\.json".r val checksumFileRegex = raw"(\d+)\.crc".r - val checkpointFileRegex = raw"(\d+)\.checkpoint(\.(\d+)\.(\d+))?\.parquet".r + val checkpointFileRegex = raw"(\d+)\.checkpoint((\.\d+\.\d+)?\.parquet|\.[^.]+\.(json|parquet))".r val deltaFilePattern = deltaFileRegex.pattern val checksumFilePattern = checksumFileRegex.pattern diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 23016b66cb4..38e9863c33d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -20,7 +20,7 @@ import java.io.File import java.net.URI // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.actions.AddCDCFile +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LocalLogStore @@ -29,7 +29,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataOutputStream, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{FileStatus, FSDataOutputStream, Path, RawLocalFileSystem} import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.util.Progressable @@ -72,6 +72,91 @@ class CheckpointsSuite extends QueryTest } } + def testDifferentV2Checkpoints(testName: String)(f: => Unit): Unit = { + for (checkpointFormat <- Seq(V2Checkpoint.Format.JSON.name, V2Checkpoint.Format.PARQUET.name)) { + test(s"$testName [v2CheckpointFormat: $checkpointFormat]") { + withSQLConf( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> checkpointFormat + ) { + f + } + } + } + } + + testDifferentV2Checkpoints("checkpoint metadata - checkpoint schema not persisted in" + + " json v2 checkpoints but persisted in parquet v2 checkpoints") { + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + deltaLog.checkpoint() + val lastCheckpointOpt = deltaLog.readLastCheckpointFile() + assert(lastCheckpointOpt.nonEmpty) + val expectedFormat = + spark.conf.getOption(DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key) + assert(lastCheckpointOpt.get.checkpointSchema.isEmpty === + (expectedFormat.contains(V2Checkpoint.Format.JSON.name))) + } + } + + testDifferentV2Checkpoints(s"V2 Checkpoint write test" + + s" - metadata, protocol, sidecar, checkpoint metadata actions") { + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + deltaLog.checkpoint() + val checkpointFiles = deltaLog.listFrom(0).filter(FileNames.isCheckpointFile).toList + assert(checkpointFiles.length == 1) + val checkpoint = checkpointFiles.head + val fileNameParts = checkpoint.getPath.getName.split("\\.") + // The file name should be .checkpoint..parquet. + assert(fileNameParts.length == 4) + fileNameParts match { + case Array(version, checkpointLiteral, _, format) => + val expectedFormat = + spark.conf.getOption(DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key).get + assert(format == expectedFormat) + assert(version.toLong == 0) + assert(checkpointLiteral == "checkpoint") + } + + def getCheckpointFileActions(checkpoint: FileStatus) : Seq[Action] = { + if (checkpoint.getPath.toString.endsWith("json")) { + deltaLog.store.read(checkpoint.getPath).map(Action.fromJson) + } else { + val fileIndex = + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, Seq(checkpoint)).get + deltaLog.loadIndex(fileIndex, Action.logSchema) + .as[SingleAction].collect().map(_.unwrap).toSeq + } + } + val actions = getCheckpointFileActions(checkpoint) + // V2 Checkpoints should contain exactly one action each of types + // Metadata, CheckpointMetadata, and Protocol + // In this particular case, we should only have one sidecar file + val sidecarActions = actions.collect{ case s: SidecarFile => s} + assert(sidecarActions.length == 1) + val sidecarPath = sidecarActions.head.path + assert(sidecarPath.endsWith("parquet")) + + val metadataActions = actions.collect { case m: Metadata => m } + assert(metadataActions.length == 1) + + val checkpointMetadataActions = actions.collect { case cm: CheckpointMetadata => cm } + assert(checkpointMetadataActions.length == 1) + + assert( + DeltaConfigs.CHECKPOINT_POLICY.fromMetaData(metadataActions.head) + .needsV2CheckpointSupport + ) + + val protocolActions = actions.collect { case p: Protocol => p } + assert(protocolActions.length == 1) + assert(CheckpointProvider.isV2CheckpointEnabled(protocolActions.head)) + } + } + test("SC-86940: isGCSPath") { val conf = new Configuration() assert(Checkpoints.isGCSPath(conf, new Path("gs://foo/bar")))