Skip to content

Commit

Permalink
[SC-18034][DELTA-OSS] S3 Support
Browse files Browse the repository at this point in the history
Add a S3 LogStore implementation.

new unit tests

Author: liwensun <liwen.sun@databricks.com>

GitOrigin-RevId: 5071e09398fd7237d4ec3de4d3ed80103ec0371f
  • Loading branch information
liwensun committed May 31, 2019
1 parent ae4aa3c commit c8169bd
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.file.FileAlreadyExistsException
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkConf

Expand Down
251 changes: 251 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/storage/S3LogStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.storage

import java.io.FileNotFoundException
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.util.FileNames
import com.google.common.cache.CacheBuilder
import com.google.common.io.CountingOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

import org.apache.spark.SparkConf

/**
* Single driver/JVM LogStore implementation for S3.
*
* We assume the following from S3's [[FileSystem]] implementations:
* - File writing on S3 is all-or-nothing, whether overwrite or not.
* - List-after-write can be inconsistent.
*
* Regarding file creation, this implementation:
* - Opens a stream to write to S3 (regardless of the overwrite option).
* - Failures during stream write may leak resources, but may never result in partial writes.
*
* Regarding directory listing, this implementation:
* - returns a list by merging the files listed from S3 and recently-written files from the cache.

This comment has been minimized.

Copy link
@julienledem

julienledem Jun 24, 2019

does that mean that it is consistent only to the writer?

This comment has been minimized.

Copy link
@liwensun

liwensun Jun 24, 2019

Author Contributor

@julienledem Good question. Yes, due to the list-after-write inconsistency, reader may not see the new writes immediately. But that's ok as the reader still sees a consistent snapshot. Also, empirically speaking, the chance of list-after-write inconsistency happening is still small. so I would say this is a mainly problem for writes, because no matter how small the chances are, this inconsistency can lead to duplicate commits and thus data corruption. So we use a cache on the writer side to guard against that.

*/
class S3LogStore(
sparkConf: SparkConf,
hadoopConf: Configuration) extends FileSystemLogStore(sparkConf, hadoopConf) {
import S3LogStore._

private def resolved(path: Path): (FileSystem, Path) = {
val fs = path.getFileSystem(getHadoopConfiguration)
val resolvedPath = stripUserInfo(fs.makeQualified(path))
(fs, resolvedPath)
}

private def getPathKey(resolvedPath: Path): Path = {
stripUserInfo(resolvedPath)
}

private def stripUserInfo(path: Path): Path = {
val uri = path.toUri
val newUri = new URI(
uri.getScheme,
null,
uri.getHost,
uri.getPort,
uri.getPath,
uri.getQuery,
uri.getFragment)
new Path(newUri)
}

/**
* Merge two iterators of [[FileStatus]] into a single iterator ordered by file path name.
* In case both iterators have [[FileStatus]]s for the same file path, keep the one from
* `iterWithPrecedence` and discard that from `iter`.
*/
private def mergeFileIterators(
iter: Iterator[FileStatus],
iterWithPrecedence: Iterator[FileStatus]): Iterator[FileStatus] = {
(iter.map(f => (f.getPath, f)).toMap ++ iterWithPrecedence.map(f => (f.getPath, f)))
.values
.toSeq
.sortBy(_.getPath.getName)
.iterator
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private def listFromCache(fs: FileSystem, resolvedPath: Path) = {
val pathKey = getPathKey(resolvedPath)
writtenPathCache
.asMap()
.asScala
.iterator
.filter { case (path, _) =>
path.getParent == pathKey.getParent() && path.getName >= pathKey.getName }
.map { case (path, fileMetadata) =>
new FileStatus(
fileMetadata.length,
false,
1,
fs.getDefaultBlockSize(path),
fileMetadata.modificationTime,
path)
}
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory, which merges
* the file system list and the cache list when `useCache` is on, otherwise
* use file system list only.
*/
private def listFromInternal(fs: FileSystem, resolvedPath: Path, useCache: Boolean = true) = {
val parentPath = resolvedPath.getParent
if (!fs.exists(parentPath)) {
throw new FileNotFoundException(s"No such file or directory: $parentPath")
}
val listedFromFs =
fs.listStatus(parentPath).filter(_.getPath.getName >= resolvedPath.getName).iterator
val listedFromCache = if (useCache) listFromCache(fs, resolvedPath) else Iterator.empty

// File statuses listed from file system take precedence
mergeFileIterators(listedFromCache, listedFromFs)
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
override def listFrom(path: Path): Iterator[FileStatus] = {
val (fs, resolvedPath) = resolved(path)
listFromInternal(fs, resolvedPath)
}

/**
* Check if the path is an initial version of a Delta log.
*/
private def isInitialVersion(path: Path): Boolean = {
FileNames.isDeltaFile(path) && FileNames.deltaVersion(path) == 0L
}

/**
* Check if a path exists. Normally we check both the file system and the cache, but when the
* path is the first version of a Delta log, we ignore the cache.
*/
private def exists(fs: FileSystem, resolvedPath: Path): Boolean = {
// Ignore the cache for the first file of a Delta log
listFromInternal(fs, resolvedPath, useCache = !isInitialVersion(resolvedPath))
.take(1)
.exists(_.getPath.getName == resolvedPath.getName)
}

override def write(path: Path, actions: Iterator[String], overwrite: Boolean = false): Unit = {
val (fs, resolvedPath) = resolved(path)
val lockedPath = getPathKey(resolvedPath)
var stream: FSDataOutputStream = null
acquirePathLock(lockedPath)
try {
if (exists(fs, resolvedPath) && !overwrite) {
throw new java.nio.file.FileAlreadyExistsException(resolvedPath.toUri.toString)
}
val countingStream = new CountingOutputStream(stream)
stream = fs.create(resolvedPath, overwrite)
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
stream.close()

// When a Delta log starts afresh, all cached files in that Delta log become obsolete,
// so we remove them from the cache.
if (isInitialVersion(resolvedPath)) {
val obsoleteFiles = writtenPathCache
.asMap()
.asScala
.keys
.filter(_.getParent == lockedPath.getParent())
.asJava

writtenPathCache.invalidateAll(obsoleteFiles)
}

// Cache the information of written files to help fix the inconsistency in future listings
writtenPathCache.put(lockedPath,
FileMetadata(countingStream.getCount(), System.currentTimeMillis()))
} catch {
// Convert Hadoop's FileAlreadyExistsException to Java's FileAlreadyExistsException
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
throw new java.nio.file.FileAlreadyExistsException(e.getMessage)
} finally {
releasePathLock(lockedPath)
}
}

override def invalidateCache(): Unit = {
writtenPathCache.invalidateAll()
}
}

object S3LogStore {
/**
* A global path lock to ensure that no concurrent writers writing to the same path in the same
* JVM.
*/
private val pathLock = new ConcurrentHashMap[Path, AnyRef]()

/**
* A global cache that records the metadata of the files recently written.
* As list-after-write may be inconsistent on S3, we can use the files in the cache
* to fix the inconsistent file listing.
*/
private val writtenPathCache =
CacheBuilder.newBuilder()
.expireAfterAccess(120, TimeUnit.MINUTES)
.build[Path, FileMetadata]()

/**
* Release the lock for the path after writing.
*
* Note: the caller should resolve the path to make sure we are locking the correct absolute path.
*/
private def releasePathLock(resolvedPath: Path): Unit = {
val lock = pathLock.remove(resolvedPath)
lock.synchronized {
lock.notifyAll()
}
}

/**
* Acquire a lock for the path before writing.
*
* Note: the caller should resolve the path to make sure we are locking the correct absolute path.
*/
private def acquirePathLock(resolvedPath: Path): Unit = {
while (true) {
val lock = pathLock.putIfAbsent(resolvedPath, new Object)
if (lock == null) return
lock.synchronized {
while (pathLock.get(resolvedPath) == lock) {
lock.wait()
}
}
}
}
}

/**
* The file metadata to be stored in the cache.
*/
case class FileMetadata(length: Long, modificationTime: Long)
127 changes: 127 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/S3LogStoreSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.io.File

import org.apache.spark.sql.delta.storage.{LogStore, S3LogStore}
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.SparkSession

class S3LogStoreSuite extends LogStoreSuiteBase {

protected override def sparkConf = {
super.sparkConf.set(
"spark.databricks.tahoe.logStore.class", classOf[S3LogStore].getName)
}

override def createLogStore(spark: SparkSession): LogStore = {
new S3LogStore(spark.sparkContext.getConf, spark.sessionState.newHadoopConf())
}

private def checkLogStoreList(store: LogStore, path: Path, expectedVersions: Seq[Int]): Unit = {
assert(
store.listFrom(path).map(f => FileNames.deltaVersion(f.getPath)).toSeq === expectedVersions)
}

private def checkFileSystemList(fs: FileSystem, path: Path, expectedVersions: Seq[Int]): Unit = {
val fsList = fs.listStatus(path.getParent).filter(_.getPath.getName >= path.getName)
assert(fsList.map(f => FileNames.deltaVersion(f.getPath)).sorted === expectedVersions)
}

testHadoopConf(
"No FileSystem for scheme: fake",
"fs.fake.impl" -> classOf[FakeFileSystem].getName,
"fs.fake.impl.disable.cache" -> "true")

test("cache works") {
withTempDir { dir =>
val store = createLogStore(spark)
val deltas =
Seq(0, 1, 2, 3, 4).map(i => FileNames.deltaFile(new Path(dir.toURI), i))
store.write(deltas(0), Iterator("zero"))
store.write(deltas(1), Iterator("one"))
store.write(deltas(2), Iterator("two"))

// delete delta file 2 from file system
val fs = new Path(dir.getCanonicalPath).getFileSystem(spark.sessionState.newHadoopConf())
fs.delete(deltas(2), true)

// file system listing doesn't see file 2
checkFileSystemList(fs, deltas(0), Seq(0, 1))

// can't re-write because cache says it still exists
intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(2), Iterator("two"))
}

// log store list still sees file 2 as it's cached
checkLogStoreList(store, deltas(0), Seq(0, 1, 2))

// clear the cache
store.invalidateCache()

// log store list doesn't see file 2 anymore
checkLogStoreList(store, deltas(0), Seq(0, 1))

// write a new file 2
store.write(deltas(2), Iterator("two"))

// add a file 3 to cache only
store.write(deltas(3), Iterator("three"))
fs.delete(deltas(3), true)

// log store listing returns a union of:
// 1) file system listing: 0, 1, 2
// 2) cache listing: 2, 3
checkLogStoreList(store, deltas(0), Seq(0, 1, 2, 3))
}
}

test("cache works correctly when writing an initial log version") {
withTempDir { dir =>
val store = createLogStore(spark)
val deltas =
Seq(0, 1, 2).map(i => FileNames.deltaFile(new Path(dir.toURI), i))
store.write(deltas(0), Iterator("log version 0"))
store.write(deltas(1), Iterator("log version 1"))
store.write(deltas(2), Iterator("log version 2"))

val fs = new Path(dir.getCanonicalPath).getFileSystem(spark.sessionState.newHadoopConf())
// delete all log files
fs.delete(deltas(2), true)
fs.delete(deltas(1), true)
fs.delete(deltas(0), true)

// can't write a new version 1 as it's in cache
intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(1), Iterator("new log version 1"))
}

// all three log files still in cache
checkLogStoreList(store, deltas(0), Seq(0, 1, 2))

// can write a new version 0 as it's the initial version of the log
store.write(deltas(0), Iterator("new log version 0"))

// writing a new initial version invalidates all files in that log
checkLogStoreList(store, deltas(0), Seq(0))
}
}
}

0 comments on commit c8169bd

Please sign in to comment.