Skip to content

Commit

Permalink
[DSW] [23] Add logging (delta-io#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db authored Oct 20, 2021
1 parent 56f40e3 commit 935f303
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 101 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ lazy val standalone = (project in file("standalone"))
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module")
),
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.slf4j" % "slf4j-api" % "1.7.25",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
),
sourceGenerators in Compile += Def.task {
val file = (sourceManaged in Compile).value / "meta" / "package.scala"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import io.delta.standalone.data.CloseableIterator
import io.delta.standalone.internal.actions.SingleAction
import io.delta.standalone.internal.util.JsonUtils
import io.delta.standalone.internal.util.FileNames._
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.logging.Logging

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s.ParquetWriter
import io.delta.standalone.internal.exception.DeltaErrors

/**
* Records information about a checkpoint.
Expand Down Expand Up @@ -139,17 +141,12 @@ private[internal] trait Checkpoints {
case _: FileNotFoundException =>
None
case NonFatal(e) if tries < 3 =>
// scalastyle:off println
println(s"Failed to parse $LAST_CHECKPOINT. This may happen if there was an error " +
logWarning(s"Failed to parse $LAST_CHECKPOINT. This may happen if there was an error " +
"during read operation, or a file appears to be partial. Sleeping and trying again.", e)
// scalastyle:on println

Thread.sleep(1000)
loadMetadataFromFile(tries + 1)
case NonFatal(e) =>
// scalastyle:off println
println(s"$LAST_CHECKPOINT is corrupted. Will search the checkpoint files directly", e)
// scalastyle:on println
logWarning(s"$LAST_CHECKPOINT is corrupted. Will search the checkpoint files directly", e)
// Hit a partial file. This could happen on Azure as overwriting _last_checkpoint file is
// not atomic. We will try to list all files to find the latest checkpoint and restore
// CheckpointMetaData from it.
Expand Down Expand Up @@ -208,7 +205,7 @@ private[internal] trait Checkpoints {
}
}

private[internal] object Checkpoints {
private[internal] object Checkpoints extends Logging {
/**
* Writes out the contents of a [[Snapshot]] into a checkpoint file that
* can be used to short-circuit future replays of the log.
Expand Down Expand Up @@ -300,6 +297,11 @@ private[internal] object Checkpoints {
"State of the checkpoint doesn't match that of the snapshot.")
}

// Attempting to write empty checkpoint
if (checkpointSize == 0) {
logWarning(DeltaErrors.EmptyCheckpointErrorMessage)
}

CheckpointMetaData(snapshot.version, checkpointSize, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package io.delta.standalone.internal

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable

import io.delta.standalone.expressions.Expression
import io.delta.standalone.internal.actions._
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.logging.Logging
import io.delta.standalone.internal.util.{FileNames, PartitionUtils}

/**
Expand Down Expand Up @@ -74,8 +78,10 @@ private[internal] case class WinningCommitSummary(actions: Seq[Action], commitVe
private[internal] class ConflictChecker(
currentTransactionInfo: CurrentTransactionInfo,
winningCommitVersion: Long,
isolationLevel: IsolationLevel) {
isolationLevel: IsolationLevel,
logPrefixStr: String) extends Logging {

private val timingStats = mutable.HashMap[String, Long]()
private val deltaLog = currentTransactionInfo.deltaLog
private val winningCommitSummary: WinningCommitSummary = createWinningCommitSummary()

Expand All @@ -86,22 +92,25 @@ private[internal] class ConflictChecker(
checkForDeletedFilesAgainstCurrentTxnReadFiles()
checkForDeletedFilesAgainstCurrentTxnDeletedFiles()
checkForUpdatedApplicationTransactionIdsThatCurrentTxnDependsOn()
reportMetrics()
}

/**
* Initializes [[WinningCommitSummary]] for the already committed
* transaction (winning transaction).
*/
private def createWinningCommitSummary(): WinningCommitSummary = {
import io.delta.standalone.internal.util.Implicits._
recordTime("initialize-old-commit") {
import io.delta.standalone.internal.util.Implicits._

val deltaLog = currentTransactionInfo.deltaLog
val winningCommitActions = deltaLog.store
.read(FileNames.deltaFile(deltaLog.logPath, winningCommitVersion), deltaLog.hadoopConf)
.toArray
.map(Action.fromJson)
val deltaLog = currentTransactionInfo.deltaLog
val winningCommitActions = deltaLog.store
.read(FileNames.deltaFile(deltaLog.logPath, winningCommitVersion), deltaLog.hadoopConf)
.toArray
.map(Action.fromJson)

WinningCommitSummary(winningCommitActions, winningCommitVersion)
WinningCommitSummary(winningCommitActions, winningCommitVersion)
}
}

/**
Expand Down Expand Up @@ -137,27 +146,29 @@ private[internal] class ConflictChecker(
* the current transaction.
*/
private def checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(): Unit = {
// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = isolationLevel match {
case Serializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
case SnapshotIsolation =>
Seq.empty
}
recordTime("checked-appends") {
// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = isolationLevel match {
case Serializable =>
winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
case SnapshotIsolation =>
Seq.empty
}

val predicatesMatchingAddedFiles = currentTransactionInfo.readPredicates.flatMap { p =>
val conflictingFile = PartitionUtils.filterFileList(
currentTransactionInfo.metadata.partitionSchema,
addedFilesToCheckForConflicts,
p
).headOption
val predicatesMatchingAddedFiles = currentTransactionInfo.readPredicates.flatMap { p =>
val conflictingFile = PartitionUtils.filterFileList(
currentTransactionInfo.metadata.partitionSchema,
addedFilesToCheckForConflicts,
p
).headOption

conflictingFile.map(f => getPrettyPartitionMessage(f.partitionValues))
}.headOption
conflictingFile.map(f => getPrettyPartitionMessage(f.partitionValues))
}.headOption

if (predicatesMatchingAddedFiles.isDefined) {
throw DeltaErrors.concurrentAppendException(
if (predicatesMatchingAddedFiles.isDefined) {
throw DeltaErrors.concurrentAppendException(
winningCommitSummary.commitInfo, predicatesMatchingAddedFiles.get)
}
}
}

Expand All @@ -166,21 +177,23 @@ private[internal] class ConflictChecker(
* read by the current transaction.
*/
private def checkForDeletedFilesAgainstCurrentTxnReadFiles(): Unit = {
// Fail if files have been deleted that the txn read.
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = winningCommitSummary.removedFiles
.find(r => readFilePaths.contains(r.path))
if (deleteReadOverlap.nonEmpty) {
val filePath = deleteReadOverlap.get.path
val partition = getPrettyPartitionMessage(readFilePaths(filePath))
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath in $partition")
}
if (winningCommitSummary.removedFiles.nonEmpty && currentTransactionInfo.readWholeTable) {
val filePath = winningCommitSummary.removedFiles.head.path
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath")
recordTime("checked-deletes") {
// Fail if files have been deleted that the txn read.
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = winningCommitSummary.removedFiles
.find(r => readFilePaths.contains(r.path))
if (deleteReadOverlap.nonEmpty) {
val filePath = deleteReadOverlap.get.path
val partition = getPrettyPartitionMessage(readFilePaths(filePath))
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath in $partition")
}
if (winningCommitSummary.removedFiles.nonEmpty && currentTransactionInfo.readWholeTable) {
val filePath = winningCommitSummary.removedFiles.head.path
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath")
}
}
}

Expand All @@ -189,14 +202,16 @@ private[internal] class ConflictChecker(
* [[RemoveFile]] actions this transaction is trying to add.
*/
private def checkForDeletedFilesAgainstCurrentTxnDeletedFiles(): Unit = {
// Fail if a file is deleted twice.
val txnDeletes = currentTransactionInfo.actions
.collect { case r: RemoveFile => r }
.map(_.path).toSet
val deleteOverlap = winningCommitSummary.removedFiles.map(_.path).toSet intersect txnDeletes
if (deleteOverlap.nonEmpty) {
throw DeltaErrors.concurrentDeleteDeleteException(
winningCommitSummary.commitInfo, deleteOverlap.head)
recordTime("checked-2x-deletes") {
// Fail if a file is deleted twice.
val txnDeletes = currentTransactionInfo.actions
.collect { case r: RemoveFile => r }
.map(_.path).toSet
val deleteOverlap = winningCommitSummary.removedFiles.map(_.path).toSet intersect txnDeletes
if (deleteOverlap.nonEmpty) {
throw DeltaErrors.concurrentDeleteDeleteException(
winningCommitSummary.commitInfo, deleteOverlap.head)
}
}
}

Expand Down Expand Up @@ -232,4 +247,22 @@ private[internal] class ConflictChecker(
s"partition $partition"
}
}

private def recordTime[T](phase: String)(f: => T): T = {
val startTimeNs = System.nanoTime()
val ret = f
val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
timingStats += phase -> timeTakenMs
ret
}

private def reportMetrics(): Unit = {
lazy val timingStr = timingStats.keys
.toSeq
.sorted
.map(k => s"$k=${timingStats(k)}")
.mkString(",")

logInfo(s"[$logPrefixStr] Timing stats against $winningCommitVersion [$timingStr]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import java.util.{HashMap, Locale}

import io.delta.standalone.internal.actions.{Action, Metadata, Protocol}
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.logging.Logging
import io.delta.standalone.internal.util.{CalendarInterval, IntervalUtils}

import org.apache.hadoop.conf.Configuration

private[internal] case class DeltaConfig[T](
Expand Down Expand Up @@ -65,7 +67,7 @@ private[internal] case class DeltaConfig[T](
/**
* Contains list of reservoir configs and validation checks.
*/
private[internal] object DeltaConfigs {
private[internal] object DeltaConfigs extends Logging {

/**
* Convert a string to [[CalendarInterval]]. This method is case-insensitive and will throw
Expand Down Expand Up @@ -128,7 +130,7 @@ private[internal] object DeltaConfigs {
def validateConfigurations(configurations: Map[String, String]): Map[String, String] = {
configurations.map {
case kv @ (key, value) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
throw new IllegalArgumentException(s"Unsupported CHECK constraint configuration ${key} set")
throw new IllegalArgumentException(s"Unsupported CHECK constraint configuration $key set")
case (key, value) if key.toLowerCase(Locale.ROOT).startsWith("delta.") =>
Option(entries.get(key.toLowerCase(Locale.ROOT).stripPrefix("delta.")))
.map(_(value))
Expand All @@ -137,12 +139,11 @@ private[internal] object DeltaConfigs {
}
case keyvalue @ (key, _) =>
if (entries.containsKey(key.toLowerCase(Locale.ROOT))) {
// TODO: add log
// logConsole(
// s"""
// |You are trying to set a property the key of which is the same as Delta config: $key.
// |If you are trying to set a Delta config, prefix it with "delta.", e.g. 'delta.$key'.
// """.stripMargin)
logWarning(
s"""
|You are trying to set a property the key of which is the same as Delta config: $key.
|If you are trying to set a Delta config, prefix it with "delta.", e.g. 'delta.$key'.
|""".stripMargin)
}
keyvalue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import java.sql.Timestamp

import scala.collection.JavaConverters._

import io.delta.standalone.storage.LogStore
import io.delta.standalone.internal.actions.{Action, CommitInfo, CommitMarker}
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.util.FileNames
import io.delta.standalone.storage.LogStore
import io.delta.standalone.internal.logging.Logging

import org.apache.hadoop.fs.Path

Expand All @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path
*
* @param deltaLog the transaction log of this table
*/
private[internal] case class DeltaHistoryManager(deltaLog: DeltaLogImpl) {
private[internal] case class DeltaHistoryManager(deltaLog: DeltaLogImpl) extends Logging {

/** Get the persisted commit info for the given delta file. */
def getCommitInfo(version: Long): CommitInfo = {
Expand Down Expand Up @@ -188,6 +189,8 @@ private[internal] case class DeltaHistoryManager(deltaLog: DeltaLogImpl) {
val prevTimestamp = commits(i).getTimestamp
assert(commits(i).getVersion < commits(i + 1).getVersion, "Unordered commits provided.")
if (prevTimestamp >= commits(i + 1).getTimestamp) {
logWarning(s"Found Delta commit ${commits(i).getVersion} with a timestamp $prevTimestamp " +
s"which is greater than the next commit timestamp ${commits(i + 1).getTimestamp}.")
commits(i + 1) = commits(i + 1).withTimestamp(prevTimestamp + 1).asInstanceOf[T]
}
i += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import java.util.TimeZone

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import io.delta.standalone.{DeltaLog, OptimisticTransaction, VersionLog}
import io.delta.standalone.actions.{CommitInfo => CommitInfoJ}
import io.delta.standalone.internal.actions.{Action, Metadata, Protocol}
import io.delta.standalone.internal.exception.DeltaErrors
import io.delta.standalone.internal.logging.Logging
import io.delta.standalone.internal.sources.StandaloneHadoopConf
import io.delta.standalone.internal.storage.LogStoreProvider
import io.delta.standalone.internal.util.{Clock, ConversionUtils, FileNames, SystemClock}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

/**
* Scala implementation of Java interface [[DeltaLog]].
*/
Expand All @@ -44,7 +46,8 @@ private[internal] class DeltaLogImpl private(
with Checkpoints
with MetadataCleanup
with LogStoreProvider
with SnapshotManagement {
with SnapshotManagement
with Logging {

/** Used to read and write physical log files and checkpoints. */
lazy val store = createLogStore(hadoopConf)
Expand All @@ -66,6 +69,9 @@ private[internal] class DeltaLogImpl private(
*/
def minFileRetentionTimestamp: Long = clock.getTimeMillis() - tombstoneRetentionMillis

/** The unique identifier for this table. */
def tableId: String = metadata.id

/** Use ReentrantLock to allow us to call `lockInterruptibly`. */
private val deltaLogLock = new ReentrantLock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,17 @@ private[internal] trait MetadataCleanup {
/** Clean up expired delta and checkpoint logs. Exposed for testing. */
def cleanUpExpiredLogs(): Unit = {
val fileCutOffTime = truncateDay(clock.getTimeMillis() - deltaRetentionMillis).getTime

lazy val formattedDate = fileCutOffTime.toGMTString
logInfo(s"Starting the deletion of log files older than $formattedDate")

var numDeleted = 0
listExpiredDeltaLogs(fileCutOffTime.getTime).map(_.getPath).foreach { path =>
// recursive = false
fs.delete(path, false)
if (fs.delete(path, false)) numDeleted += 1
}

logInfo(s"Deleted $numDeleted log files older than $formattedDate")
}

/**
Expand Down
Loading

0 comments on commit 935f303

Please sign in to comment.