Skip to content

Commit

Permalink
S3 Multi-cluster writes support using DynamoDB
Browse files Browse the repository at this point in the history
Resolves delta-io#41

This PR addresses issue delta-io#41 - Support for AWS S3 (multiple clusters/drivers/JVMs).

It implements few ideas from delta-io#41 discussion:

- provides generic base class BaseExternalLogStore for storing listing of commit files
in external DB. This class may be easily extended for specific DB backend
- stores contents of commit in temporary file and links to it in DB's row
to be able to finish uncompleted write operation while reading
- provides concrete DynamoDBLogStore implementation extending BaseExternalLogStore
- implementations for other DB backends should be simple to implement
(ZooKeeper implementation is almost ready, I can create separate PR if anyone is interested)

- unit tests in `ExternalLogStoreSuite` which uses `InMemoryLogStore` to mock `DynamoDBLogStore`
- python integration test inside of `storage-dynamodb/integration_test/dynamodb_logstore.py` which tests concurrent readers and writers
- that integration test can also run using `FailingDynamoDBLogStore` which injects errors into the runtime execution to test error edge cases
- This solution has been also stress-tested (by SambaTV) on Amazon's EMR cluster
(multiple test jobs writing thousands of parallel transactions to single delta table)
and no data loss has beed observed so far

To enable DynamoDBLogStore set following spark property:
`spark.delta.logStore.class=io.delta.storage.DynamoDBLogStore`

Following configuration properties are recognized:

io.delta.storage.DynamoDBLogStore.tableName - table name (defaults to 'delta_log')
io.delta.storage.DynamoDBLogStore.region - AWS region (defaults to 'us-east-1')

Closes delta-io#1044

Co-authored-by: Scott Sandre <scott.sandre@databricks.com>
Co-authored-by: Allison Portis <allison.portis@databricks.com>

Signed-off-by: Scott Sandre <scott.sandre@databricks.com>
GitOrigin-RevId: 7c276f95be92a0ebf1eaa9038d118112d25ebc21
  • Loading branch information
mrk-its authored and jbguerraz committed Jul 6, 2022
1 parent 08d16d0 commit 21393b2
Show file tree
Hide file tree
Showing 11 changed files with 1,258 additions and 65 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ lazy val storage = (project in file("storage"))
.settings (
name := "delta-storage",
commonSettings,
releaseSettings,
releaseSettings, // TODO: proper artifact name
libraryDependencies ++= Seq(
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
Expand All @@ -180,10 +180,10 @@ lazy val storageDynamodb = (project in file("storage-dynamodb"))
.settings (
name := "delta-storage-dynamodb",
commonSettings,
skipReleaseSettings,
// releaseSettings,
releaseSettings, // TODO: proper artifact name with no scala version
// Test / publishArtifact := true, // uncomment only when testing FailingDynamoDBLogStore
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.7.4"
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided"
)
)

Expand Down Expand Up @@ -284,7 +284,7 @@ def ignoreUndocumentedPackages(packages: Seq[Seq[java.io.File]]): Seq[Seq[java.i
}

lazy val unidocSettings = Seq(

// Configure Scala unidoc
ScalaUnidoc / unidoc / scalacOptions ++= Seq(
"-skip-packages", "org:com:io.delta.sql:io.delta.tables.execution",
Expand Down
94 changes: 54 additions & 40 deletions core/src/test/scala/org/apache/spark/sql/delta/LogStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ abstract class LogStoreSuiteBase extends QueryTest

testInitFromSparkConf()

protected def withTempLogDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir()
val deltaLogDir = new File(dir, "_delta_log")
deltaLogDir.mkdir()
try f(deltaLogDir) finally {
Utils.deleteRecursively(dir)
}
}

test("read / write") {
def assertNoLeakedCrcFiles(dir: File): Unit = {
// crc file should not be leaked when origin file doesn't exist.
Expand All @@ -78,55 +87,60 @@ abstract class LogStoreSuiteBase extends QueryTest
s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
}

val tempDir = Utils.createTempDir()
val store = createLogStore(spark)
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)
val deltas = Seq(0, 1)
.map(i => new File(tempLogDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas.head, Iterator("zero", "none"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf)

val deltas = Seq(0, 1).map(i => new File(tempDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas.head, Iterator("zero", "none"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf)
assert(store.read(deltas.head, sessionHadoopConf) == Seq("zero", "none"))
assert(store.readAsIterator(deltas.head, sessionHadoopConf).toSeq == Seq("zero", "none"))
assert(store.read(deltas(1), sessionHadoopConf) == Seq("one"))
assert(store.readAsIterator(deltas(1), sessionHadoopConf).toSeq == Seq("one"))

assert(store.read(deltas.head, sessionHadoopConf) == Seq("zero", "none"))
assert(store.readAsIterator(deltas.head, sessionHadoopConf).toSeq == Seq("zero", "none"))
assert(store.read(deltas(1), sessionHadoopConf) == Seq("one"))
assert(store.readAsIterator(deltas(1), sessionHadoopConf).toSeq == Seq("one"))
assertNoLeakedCrcFiles(tempLogDir)
}

assertNoLeakedCrcFiles(tempDir)
}

test("detects conflict") {
val tempDir = Utils.createTempDir()
val store = createLogStore(spark)

val deltas = Seq(0, 1).map(i => new File(tempDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas.head, Iterator("zero"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf)
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)
val deltas = Seq(0, 1)
.map(i => new File(tempLogDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas.head, Iterator("zero"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf)

intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(1), Iterator("uno"), overwrite = false, sessionHadoopConf)
intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(1), Iterator("uno"), overwrite = false, sessionHadoopConf)
}
}

}

test("listFrom") {
val tempDir = Utils.createTempDir()
val store = createLogStore(spark)

val deltas =
Seq(0, 1, 2, 3, 4).map(i => new File(tempDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas(1), Iterator("zero"), overwrite = false, sessionHadoopConf)
store.write(deltas(2), Iterator("one"), overwrite = false, sessionHadoopConf)
store.write(deltas(3), Iterator("two"), overwrite = false, sessionHadoopConf)

assert(
store.listFrom(deltas.head, sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(1, 2, 3).map(_.toString))
assert(
store.listFrom(deltas(1), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(1, 2, 3).map(_.toString))
assert(store.listFrom(deltas(2), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(2, 3).map(_.toString))
assert(store.listFrom(deltas(3), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(3).map(_.toString))
assert(store.listFrom(deltas(4), sessionHadoopConf).map(_.getPath.getName).toArray === Nil)
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)

val deltas =
Seq(0, 1, 2, 3, 4).map(i => new File(tempLogDir, i.toString)).map(_.toURI).map(new Path(_))
store.write(deltas(1), Iterator("zero"), overwrite = false, sessionHadoopConf)
store.write(deltas(2), Iterator("one"), overwrite = false, sessionHadoopConf)
store.write(deltas(3), Iterator("two"), overwrite = false, sessionHadoopConf)

assert(
store.listFrom(deltas.head, sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(1, 2, 3).map(_.toString))
assert(
store.listFrom(deltas(1), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(1, 2, 3).map(_.toString))
assert(store.listFrom(deltas(2), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(2, 3).map(_.toString))
assert(store.listFrom(deltas(3), sessionHadoopConf)
.map(_.getPath.getName).toArray === Seq(3).map(_.toString))
assert(store.listFrom(deltas(4), sessionHadoopConf).map(_.getPath.getName).toArray === Nil)
}
}

test("simple log store test") {
Expand Down Expand Up @@ -187,9 +201,9 @@ abstract class LogStoreSuiteBase extends QueryTest
}

test("readAsIterator should be lazy") {
withTempDir { tempDir =>
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)
val testFile = new File(tempDir, "readAsIterator").getCanonicalPath
val testFile = new File(tempLogDir, "readAsIterator").getCanonicalPath
store.write(new Path(testFile), Iterator("foo", "bar"), overwrite = false, sessionHadoopConf)

withSQLConf(
Expand Down
81 changes: 72 additions & 9 deletions run-integration-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def run_scala_integration_tests(root_dir, version, test_name, extra_maven_repo,
try:
cmd = ["build/sbt", "runMain example.%s" % test_class]
print("\nRunning Scala tests in %s\n=====================" % test_class)
print("Command: %s" % str(cmd))
print("Command: %s" % " ".join(cmd))
run_cmd(cmd, stream_output=True, env=env)
except:
print("Failed Scala tests in %s" % (test_class))
Expand All @@ -79,28 +79,70 @@ def run_python_integration_tests(root_dir, version, test_name, extra_maven_repo,
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
package = "io.delta:delta-core_2.12:" + version

if extra_maven_repo:
repo = extra_maven_repo
else:
repo = ""
repo = extra_maven_repo if extra_maven_repo else ""

for test_file in test_files:
if test_name is not None and test_name not in test_file:
print("\nSkipping Python tests in %s\n=====================" % test_file)
continue

try:
cmd = ["spark-submit",
"--driver-class-path=%s" % extra_class_path, # for less verbose logging
"--packages", package,
"--repositories", repo, test_file]
print("\nRunning Python tests in %s\n=============" % test_file)
print("Command: %s" % str(cmd))
print("Command: %s" % " ".join(cmd))
run_cmd(cmd, stream_output=True)
except:
print("Failed Python tests in %s" % (test_file))
raise


def run_dynamodb_logstore_integration_tests(root_dir, version, test_name, extra_maven_repo,
extra_packages, conf, use_local):
print(
"\n\n##### Running DynamoDB logstore integration tests on version %s #####" % str(version)
)
clear_artifact_cache()
if use_local:
run_cmd(["build/sbt", "publishM2"])

test_dir = path.join(root_dir, path.join("storage-dynamodb", "integration_tests"))
test_files = [path.join(test_dir, f) for f in os.listdir(test_dir)
if path.isfile(path.join(test_dir, f)) and
f.endswith(".py") and not f.startswith("_")]

python_root_dir = path.join(root_dir, "python")
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
packages = "io.delta:delta-core_2.12:" + version
# TODO: update this with proper delta-storage artifact ID (i.e. no _2.12 scala version)
packages += "," + "io.delta:delta-storage-dynamodb_2.12:" + version
if extra_packages:
packages += "," + extra_packages

conf_args = []
if conf:
for i in conf:
conf_args.extend(["--conf", i])

repo_args = ["--repositories", extra_maven_repo] if extra_maven_repo else []

for test_file in test_files:
if test_name is not None and test_name not in test_file:
print("\nSkipping DynamoDB logstore integration tests in %s\n============" % test_file)
continue
try:
cmd = ["spark-submit",
"--driver-class-path=%s" % extra_class_path, # for less verbose logging
"--packages", packages] + repo_args + conf_args + [test_file]
print("\nRunning DynamoDB logstore integration tests in %s\n=============" % test_file)
print("Command: %s" % " ".join(cmd))
run_cmd(cmd, stream_output=True)
except:
print("Failed DynamoDB logstore integration tests tests in %s" % (test_file))
raise


def run_pip_installation_tests(root_dir, version, use_testpypi, extra_maven_repo):
print("\n\n##### Running pip installation tests on version %s #####" % str(version))
clear_artifact_cache()
Expand Down Expand Up @@ -249,8 +291,24 @@ def __exit__(self, tpe, value, traceback):
required=False,
default=False,
action="store_true",
help="Generate JARs from local source code and use to run tests"
)
help="Generate JARs from local source code and use to run tests")
parser.add_argument(
"--run-storage-dynamodb-integration-tests",
required=False,
default=False,
action="store_true",
help="Run the DynamoDB integration tests (and only them)")
parser.add_argument(
"--dbb-packages",
required=False,
default=None,
help="Additional packages required for Dynamodb logstore integration tests")
parser.add_argument(
"--dbb-conf",
required=False,
default=None,
nargs="+",
help="All `--conf` values passed to `spark-submit` for DynamoDB logstore integration tests")

args = parser.parse_args()

Expand All @@ -268,6 +326,11 @@ def __exit__(self, tpe, value, traceback):
run_scala = not args.python_only and not args.pip_only
run_pip = not args.python_only and not args.scala_only and not args.no_pip

if args.run_storage_dynamodb_integration_tests:
run_dynamodb_logstore_integration_tests(root_dir, args.version, args.test, args.maven_repo,
args.dbb_packages, args.dbb_conf, args.use_local)
quit()

if run_scala:
run_scala_integration_tests(root_dir, args.version, args.test, args.maven_repo,
args.scala_version, args.use_local)
Expand Down
Loading

0 comments on commit 21393b2

Please sign in to comment.