Skip to content

Commit

Permalink
[SC-18133][DELTA] Add a isPartialWriteVisible interface in LogStore
Browse files Browse the repository at this point in the history
The writing of checkpoint files doesn't go through log store, so we add an interface `isPartialWriteVisible` to `LogStore` to let the out-of-band writers know whether to use rename or not.

This is a temporary solution - ultimately it would be good to encapsulate this information within log store.

Add a simple end to end test for different log store implementations.

Author: liwensun <liwen.sun@databricks.com>

GitOrigin-RevId: 5bf04c818d702ea24d28bed981f838d164a8d373
  • Loading branch information
liwensun committed May 31, 2019
1 parent 43a7a7a commit ae4aa3c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
7 changes: 3 additions & 4 deletions src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ trait Checkpoints extends DeltaLogging {
}

object Checkpoints {

import org.apache.spark.sql.delta.storage.HDFSLogStoreImpl
/**
* Writes out the contents of a [[Snapshot]] into a checkpoint file that
* can be used to short-circuit future replays of the log.
Expand All @@ -224,8 +222,9 @@ object Checkpoints {
new SerializableConfiguration(job.getConfiguration))
}

// If HDFSLogStore then use rename, otherwise write directly
val useRename = deltaLog.store.isInstanceOf[HDFSLogStoreImpl]
// The writing of checkpoints doesn't go through log store, so we need to check with the
// log store and decide whether to use rename.
val useRename = deltaLog.store.isPartialWriteVisible(deltaLog.logPath)

val checkpointSize = spark.sparkContext.longAccumulator("checkpointSize")
val numOfFiles = spark.sparkContext.longAccumulator("numOfFiles")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ class AzureLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
protected def createTempPath(path: Path): Path = {
new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}.tmp")
}

override def isPartialWriteVisible(path: Path): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,6 @@ class HDFSLogStoreImpl(sparkConf: SparkConf, defaultHadoopConf: Configuration) e
override def resolvePathOnPhysicalStorage(path: Path): Path = {
getFileContext(path).makeQualified(path)
}

override def isPartialWriteVisible(path: Path): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ trait LogStore {
def resolvePathOnPhysicalStorage(path: Path): Path = {
throw new UnsupportedOperationException()
}

/**
* Whether a partial write is visible when writing to `path`.
*
* As this depends on the underlying file system implementations, we require the input of `path`
* here in order to identify the underlying file system, even though in most cases a log store
* only deals with one file system.
*/
def isPartialWriteVisible(path: Path): Boolean = false
}

object LogStore extends Logging {
Expand Down
32 changes: 32 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/LogStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta
import java.io.{File, IOException}
import java.net.URI

import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate}
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.storage._
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}

Expand All @@ -30,6 +32,8 @@ abstract class LogStoreSuiteBase extends QueryTest with SharedSQLContext {

def createLogStore(spark: SparkSession): LogStore

def logStoreName: String = this.getClass.getSimpleName.replace("Suite", "")

test("read / write") {
val tempDir = Utils.createTempDir()
val store = createLogStore(spark)
Expand Down Expand Up @@ -74,6 +78,24 @@ abstract class LogStoreSuiteBase extends QueryTest with SharedSQLContext {
assert(store.listFrom(deltas(4)).map(_.getPath.getName).toArray === Nil)
}

test("simple log store test") {
val tempDir = Utils.createTempDir()
val log1 = DeltaLog(spark, new Path(tempDir.getCanonicalPath))
assert(log1.store.getClass.getSimpleName == logStoreName)

val txn = log1.startTransaction()
val file = AddFile("1", Map.empty, 1, 1, true) :: Nil
txn.commit(file, ManualUpdate)
log1.checkpoint()

DeltaLog.clearCache()
val log2 = DeltaLog(spark, new Path(tempDir.getCanonicalPath))
assert(log2.store.getClass.getSimpleName == logStoreName)

assert(log2.lastCheckpoint.map(_.version) === Some(0L))
assert(log2.snapshot.allFiles.count == 1)
}

protected def testHadoopConf(expectedErrMsg: String, fsImplConfs: (String, String)*): Unit = {
test("should pick up fs impl conf from session Hadoop configuration") {
withTempDir { tempDir =>
Expand All @@ -94,6 +116,11 @@ abstract class LogStoreSuiteBase extends QueryTest with SharedSQLContext {
}

class AzureLogStoreSuite extends LogStoreSuiteBase {
protected override def sparkConf = {
super.sparkConf.set(
"spark.databricks.tahoe.logStore.class", classOf[AzureLogStore].getName)
}

override def createLogStore(spark: SparkSession): LogStore = {
new AzureLogStore(spark.sparkContext.getConf, spark.sessionState.newHadoopConf())
}
Expand All @@ -105,6 +132,11 @@ class AzureLogStoreSuite extends LogStoreSuiteBase {
}

class HDFSLogStoreImplSuite extends LogStoreSuiteBase {
protected override def sparkConf = {
super.sparkConf.set(
"spark.databricks.tahoe.logStore.class", classOf[HDFSLogStoreImpl].getName)
}

override def createLogStore(spark: SparkSession): LogStore = {
new HDFSLogStoreImpl(spark.sparkContext.getConf, spark.sessionState.newHadoopConf())
}
Expand Down

0 comments on commit ae4aa3c

Please sign in to comment.