From 25c3250cdc1bd3ada3b4ac71ef917bc098a3be33 Mon Sep 17 00:00:00 2001 From: Ryan Murray Date: Sun, 27 Sep 2020 11:55:20 +0100 Subject: [PATCH] Changes to support customization of delta file names (#4) * Parse file metadata as a separate task * change version to distinguish this branch * log store chooses where checkpoitns go (#6) * handle snapshot names (#9) Signed-off-by: Ryan Murray rymurr@gmail.com --- .../apache/spark/sql/delta/Checkpoints.scala | 23 ++-- .../spark/sql/delta/DeltaHistoryManager.scala | 9 +- .../org/apache/spark/sql/delta/DeltaLog.scala | 42 +++---- .../spark/sql/delta/MetadataCleanup.scala | 4 +- .../spark/sql/delta/SnapshotManagement.scala | 7 +- .../sql/delta/storage/LogFileMetaParser.scala | 115 ++++++++++++++++++ .../sql/delta/util/DeltaFileOperations.scala | 32 +++-- 7 files changed, 168 insertions(+), 64 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/storage/LogFileMetaParser.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 2b98ce6c691..169378fc3e6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.DeltaFileOperations -import org.apache.spark.sql.delta.LogFileMeta.isCheckpointFile +import org.apache.spark.sql.delta.storage.LogFileMeta.isCheckpointFile import org.apache.spark.sql.delta.util.FileNames.{checkpointFileSingular, checkpointFileWithParts, checkpointPrefix, checkpointVersion, numCheckpointParts} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.Path @@ -38,6 +38,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.{col, struct, when} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -121,12 +122,16 @@ trait Checkpoints extends DeltaLogging { /** The path to the file that holds metadata about the most recent checkpoint. */ val LAST_CHECKPOINT = new Path(logPath, "_last_checkpoint") + /** + * Creates a checkpoint using the default snapshot. + */ + def checkpoint(): Unit = checkpoint(snapshot) + /** * Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version. */ - def checkpoint(_snapshotToCheckpoint: Option[Snapshot] = None): Unit = + def checkpoint(snapshotToCheckpoint: Snapshot): Unit = recordDeltaOperation(this, "delta.checkpoint") { - val snapshotToCheckpoint = _snapshotToCheckpoint.getOrElse(snapshot) if (snapshotToCheckpoint.version < 0) { throw DeltaErrors.checkpointNonExistTable(dataPath) } @@ -343,9 +348,7 @@ object Checkpoints extends DeltaLogging { val sessionConf = state.sparkSession.sessionState.conf // We provide fine grained control using the session conf for now, until users explicitly // opt in our out of the struct conf. - val includeStructColumns = DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT - .fromMetaData(snapshot.metadata) - .getOrElse(sessionConf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_V2_ENABLED)) + val includeStructColumns = getWriteStatsAsStructConf(sessionConf, snapshot) if (includeStructColumns) { additionalCols ++= CheckpointV2.extractPartitionValues(snapshot.metadata.partitionSchema) } @@ -361,6 +364,12 @@ object Checkpoints extends DeltaLogging { )) ) } + + def getWriteStatsAsStructConf(conf: SQLConf, snapshot: Snapshot): Boolean = { + DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT + .fromMetaData(snapshot.metadata) + .getOrElse(conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_V2_ENABLED)) + } } /** diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index e6cd770fc53..54f7c572cfc 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,19 +19,16 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.io.FileNotFoundException import java.sql.Timestamp - import scala.collection.mutable - import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker} import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.storage.LogStore +import org.apache.spark.sql.delta.storage.{LogFileMetaParser, LogStore} import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter} import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.{FileStatus, Path} - import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} +import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile} import org.apache.spark.sql.delta.util.FileNames.deltaFile import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index f9795acab5d..f213dd8694d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,39 +17,35 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine -import java.io.{File, FileNotFoundException, IOException} -import java.util.concurrent.{Callable, TimeUnit} +import java.io.{File, IOException} +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock - -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.util.Try import scala.util.control.NonFatal - import com.databricks.spark.util.TagDefinitions._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex} -import org.apache.spark.sql.delta.LogFileMeta.isDeltaFile +import org.apache.spark.sql.delta.storage.LogFileMeta.isDeltaFile import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils -import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf} -import org.apache.spark.sql.delta.storage.LogStoreProvider +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.{LogFileMetaProvider, LogStoreProvider} import org.apache.spark.sql.delta.util.FileNames.deltaFile import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification} import org.apache.hadoop.fs.Path - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LocalRelation} +import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.{Clock, SystemClock} /** * Used to query the current state of the log as well as modify it by adding @@ -225,8 +221,8 @@ class DeltaLog private( * return an empty Iterator. */ def getChanges( - startVersion: Long, - failOnDataLoss: Boolean = false): Iterator[(Long, Seq[Action])] = { + startVersion: Long, + failOnDataLoss: Boolean = false): Iterator[(Long, Seq[Action])] = { val deltas = logFileHandler.listFilesFrom(deltaFile(logPath, startVersion)).filter(isDeltaFile) // Subtract 1 to ensure that we have the same check for the inclusive startVersion var lastSeenVersion = startVersion - 1 @@ -245,13 +241,6 @@ class DeltaLog private( | Protocol validation | * --------------------- */ - /** - * If the given `protocol` is older than that of the client. - */ - private def isProtocolOld(protocol: Protocol): Boolean = protocol != null && - (Action.readerVersion > protocol.minReaderVersion || - Action.writerVersion > protocol.minWriterVersion) - /** * Asserts that the client is up to date with the protocol and * allowed to read the table that is using the given `protocol`. @@ -384,14 +373,12 @@ object DeltaLog extends DeltaLogging { private val deltaLogCache = { val builder = CacheBuilder.newBuilder() .expireAfterAccess(60, TimeUnit.MINUTES) - .removalListener(new RemovalListener[Path, DeltaLog] { - override def onRemoval(removalNotification: RemovalNotification[Path, DeltaLog]) = { + .removalListener((removalNotification: RemovalNotification[Path, DeltaLog]) => { val log = removalNotification.getValue try log.snapshot.uncache() catch { case _: java.lang.NullPointerException => - // Various layers will throw null pointer if the RDD is already gone. + // Various layers will throw null pointer if the RDD is already gone. } - } }) sys.props.get("delta.log.cacheSize") .flatMap(v => Try(v.toLong).toOption) @@ -474,8 +461,7 @@ object DeltaLog extends DeltaLogging { // - Different `authority` (e.g., different user tokens in the path) // - Different mount point. try { - deltaLogCache.get(path, new Callable[DeltaLog] { - override def call(): DeltaLog = recordDeltaOperation( + deltaLogCache.get(path, () => { recordDeltaOperation( null, "delta.log.create", Map(TAG_TAHOE_PATH -> path.getParent.toString)) { AnalysisHelper.allowInvokingTransformsInAnalyzer { new DeltaLog(path, path.getParent, clock) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/core/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index ac85a68a4f4..d9f0309d235 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta import java.util.{Calendar, TimeZone} import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator -import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} +import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.FileNames.checkpointPrefix import org.apache.commons.lang3.time.DateUtils diff --git a/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index e06efa39514..5671079d561 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,9 @@ package org.apache.spark.sql.delta -import java.io.FileNotFoundException +import org.apache.spark.sql.delta.storage.LogFileMeta +import java.io.FileNotFoundException import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -30,7 +31,7 @@ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkContext -import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} +import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile} import org.apache.spark.sql.delta.util.FileNames.{checkpointPrefix, deltaFile} import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.execution.SQLExecution diff --git a/core/src/main/scala/org/apache/spark/sql/delta/storage/LogFileMetaParser.scala b/core/src/main/scala/org/apache/spark/sql/delta/storage/LogFileMetaParser.scala new file mode 100644 index 00000000000..d5339da4099 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/storage/LogFileMetaParser.scala @@ -0,0 +1,115 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.storage + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.CheckpointInstance +import org.apache.spark.sql.delta.util.FileNames._ +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext} + +import scala.util.Try + +sealed case class DeltaFileType(value: String) + +object DeltaFileType { + object DELTA extends DeltaFileType("DELTA") + object CHECKPOINT extends DeltaFileType("CHECKPOINT") + object CHECKSUM extends DeltaFileType("CHECKSUM") + object UNKNOWN extends DeltaFileType("UNKNOWN") + + val values = Seq(DELTA, CHECKPOINT, CHECKSUM, UNKNOWN) + + def getFileType(path: Path): DeltaFileType = { + path match { + case f if isCheckpointFile(f) => DeltaFileType.CHECKPOINT + case f if isDeltaFile(f) => DeltaFileType.DELTA + case f if isChecksumFile(f) => DeltaFileType.CHECKSUM + case _ => DeltaFileType.UNKNOWN + } + } +} + +case class LogFileMeta(fileStatus: FileStatus, + version: Long, + fileType: DeltaFileType, + numParts: Option[Int]) { + + def asCheckpointInstance(): CheckpointInstance = { + CheckpointInstance(version, numParts) + } +} + +object LogFileMeta { + def isCheckpointFile(logFileMeta: LogFileMeta): Boolean = { + logFileMeta.fileType == DeltaFileType.CHECKPOINT + } + + def isDeltaFile(logFileMeta: LogFileMeta): Boolean = { + logFileMeta.fileType == DeltaFileType.DELTA + } +} + + +class LogFileMetaParser(logStore: LogStore) { + + def listFilesFrom(logPath: Path): Iterator[LogFileMeta] = { + + logStore.listFrom(logPath).map(fs => { + LogFileMeta(fs, + Try(deltaVersion(fs.getPath)).getOrElse(Try(checkpointVersion(fs.getPath)).getOrElse(-1L)), + DeltaFileType.getFileType(fs.getPath), + numCheckpointParts(fs.getPath)) + }) + } + +} + +object LogFileMetaParser extends LogFileMetaProvider + with Logging { + + def apply(sc: SparkContext, logStore: LogStore): LogFileMetaParser = { + apply(sc.getConf, sc.hadoopConfiguration, logStore) + } + + def apply(sparkConf: SparkConf, + hadoopConf: Configuration, + logStore: LogStore): LogFileMetaParser = { + createLogFileMetaParser(sparkConf, hadoopConf, logStore) + } +} + +trait LogFileMetaProvider { + + def createLogFileMetaParser(spark: SparkSession, logStore: LogStore): LogFileMetaParser = { + val sc = spark.sparkContext + createLogFileMetaParser(sc.getConf, sc.hadoopConfiguration, logStore) + } + + def createLogFileMetaParser(sparkConf: SparkConf, + hadoopConf: Configuration, + logStore: LogStore): LogFileMetaParser = { + val logStoreClassName = sparkConf.get("spark.delta.logFileHandler.class", + classOf[LogFileMetaParser].getName) + val logStoreClass = Utils.classForName(logStoreClassName) + logStoreClass.getConstructor(classOf[LogStore]) + .newInstance(logStore).asInstanceOf[LogFileMetaParser] + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala index 8d983f30a6f..8b1020107d8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala @@ -1,5 +1,5 @@ /* - * Copyright (2020) The Delta Lake Project Authors. + * Copyright (2021) The Delta Lake Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,20 +19,16 @@ package org.apache.spark.sql.delta.util import java.io.{FileNotFoundException, IOException} import java.net.URI import java.util.Locale - import scala.util.Random import scala.util.control.NonFatal - import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.storage.LogStore +import org.apache.spark.sql.delta.storage.{LogFileMetaParser, LogStore} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{Footer, ParquetFileReader} - import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.delta.LogFileMetaParser import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -137,7 +133,6 @@ object DeltaFileOperations extends DeltaLogging { def list(dir: String, tries: Int): Iterator[SerializableFileStatus] = { logInfo(s"Listing $dir") try { - val path = if (listAsDirectories) new Path(dir, "\u0000") else new Path(dir + "\u0000") logStore.listFilesFrom(path) .filterNot(f => hiddenFileNameFilter(f.fileStatus.getPath.getName)) @@ -213,8 +208,7 @@ object DeltaFileOperations extends DeltaLogging { val dirsAndFiles = spark.sparkContext.parallelize(subDirs).mapPartitions { dirs => val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value.value) val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, hadoopConf.value.value, logStore) - listUsingLogStore(logFileHandler, dirs, recurse = false, hiddenFileNameFilter, - listAsDirectories) + listUsingLogStore(logFileHandler, dirs, recurse = false, hiddenFileNameFilter) }.repartition(listParallelism) // Initial list of subDirs may be small val allDirsAndFiles = dirsAndFiles.mapPartitions { firstLevelDirsAndFiles => @@ -262,10 +256,11 @@ object DeltaFileOperations extends DeltaLogging { * from LogStore. */ def localListDirs( - spark: SparkSession, - dirs: Seq[String], - recursive: Boolean = true, - fileFilter: String => Boolean = defaultHiddenFileFilter): Iterator[SerializableFileStatus] = { + spark: SparkSession, + dirs: Seq[String], + recursive: Boolean = true, + fileFilter: String => Boolean = defaultHiddenFileFilter): + Iterator[SerializableFileStatus] = { val hadoopConfig = spark.sessionState.newHadoopConf val sparkConf = SparkEnv.get.conf val logStore = LogStore(sparkConf, hadoopConfig) @@ -279,11 +274,12 @@ object DeltaFileOperations extends DeltaLogging { * Listed locally using LogStore without launching a spark job. Returns an iterator from LogStore. */ def localListFrom( - spark: SparkSession, - listFilename: String, - topDir: String, - recursive: Boolean = true, - fileFilter: String => Boolean = defaultHiddenFileFilter): Iterator[SerializableFileStatus] = { + spark: SparkSession, + listFilename: String, + topDir: String, + recursive: Boolean = true, + fileFilter: String => Boolean = defaultHiddenFileFilter): + Iterator[SerializableFileStatus] = { val hadoopConfig = spark.sessionState.newHadoopConf val sparkConf = SparkEnv.get.conf val logStore = LogStore(sparkConf, hadoopConfig)