Skip to content

Commit

Permalink
[#3423] Fix unnecessary DynamoDB GET calls during LogStore::listFrom …
Browse files Browse the repository at this point in the history
…VACUUM calls (#3425)

#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Resolves #3423.

This PR updates the logic in `BaseExternalLogStore::listFrom` so that it
does not make a request to get the latest entry from the external store
(which is used to perform recovery operations) in the event that a non
`_delta_log` file is being listed.

This is useful for VACUUM operations which may do hundreds or thousands
of list calls in the table directory and nested partition directories of
parquet files. This is NOT the `_delta_log`. Thus, checking the external
store during these list calls is (1) useless and unwanted as we are not
listing the `_delta_log` so clearly now isn't the time to attempt to do
a fixup, and (2) expensive.

This PR makes it so that future VACUUM operations do not perform
unnecessary calls to the external store (e.g. DyanamoDB).

## How was this patch tested?

Unit tests and an integration test that actually runs VACUUM and
compares the number of external store calls using the old/new logic. I
ran that test myself 50 times, too, and it passed every time (therefore,
not flaky).

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
scottsand-db committed Aug 1, 2024
1 parent 45258e1 commit 6073d94
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -128,14 +129,19 @@ public BaseExternalLogStore(Configuration hadoopConf) {
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = stripUserInfo(fs.makeQualified(path));
final Path tablePath = getTablePath(resolvedPath);
final Optional<ExternalCommitEntry> entry = getLatestExternalEntry(tablePath);

if (entry.isPresent() && !entry.get().complete) {
// Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce
// the chance of many reader threads in a single JVM doing duplicate copies of
// T(N) -> N.json.
fixDeltaLog(fs, entry.get());

// VACUUM operations may use this LogStore::listFrom API. We don't need to attempt to
// perform a fix/recovery during such operations that are not listing the _delta_log.
if (isDeltaLogPath(resolvedPath)) {
final Path tablePath = getTablePath(resolvedPath);
final Optional<ExternalCommitEntry> entry = getLatestExternalEntry(tablePath);

if (entry.isPresent() && !entry.get().complete) {
// Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce
// the chance of many reader threads in a single JVM doing duplicate copies of
// T(N) -> N.json.
fixDeltaLog(fs, entry.get());
}
}

// This is predicated on the storage system providing consistent listing
Expand Down Expand Up @@ -471,4 +477,14 @@ private Path stripUserInfo(Path path) {
throw new IllegalArgumentException(e);
}
}

/** Returns true if this path is contained within a _delta_log folder. */
@VisibleForTesting
protected boolean isDeltaLogPath(Path normalizedPath) {
return Arrays.stream(normalizedPath
.toUri()
.toString()
.split(Path.SEPARATOR)
).anyMatch("_delta_log"::equals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,19 @@
* database)
*/
public class MemoryLogStore extends BaseExternalLogStore {
public static String IS_DELTA_LOG_PATH_OVERRIDE_KEY =
"spark.hadoop.io.delta.storage.MemoryLogStore.isDeltaLogPath.alwaysTrue";

public static int numGetLatestExternalEntryCalls = 0;

public MemoryLogStore(Configuration hadoopConf) {
super(hadoopConf);
}

///////////////////
// API Overrides //
///////////////////

@Override
protected void putExternalEntry(
ExternalCommitEntry entry,
Expand Down Expand Up @@ -69,6 +78,8 @@ protected Optional<ExternalCommitEntry> getExternalEntry(

@Override
protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
numGetLatestExternalEntryCalls++;

final Path fixedTablePath = new Path(fixPathSchema(tablePath.toString()));
return hashMap
.values()
Expand All @@ -77,6 +88,19 @@ protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
.max(Comparator.comparing(ExternalCommitEntry::absoluteFilePath));
}

@Override
protected boolean isDeltaLogPath(Path normalizedPath) {
if (initHadoopConf().getBoolean(IS_DELTA_LOG_PATH_OVERRIDE_KEY, false)) {
return true; // hardcoded to return true
} else {
return super.isDeltaLogPath(normalizedPath); // only return true if in _delta_log folder
}
}

////////////////////
// Static Helpers //
////////////////////

/**
* ExternalLogStoreSuite sometimes uses "failing:" scheme prefix to inject errors during tests
* However, we want lookups for the same $tablePath to return the same result, regardless of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@ import org.apache.hadoop.fs._
import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.delta.FakeFileSystem
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStoreAdaptor
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.functions.col

/////////////////////
// Base Test Suite //
/////////////////////

class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSuite {
protected def shouldUseRenameToWriteCheckpoint: Boolean = false

override protected val publicLogStoreClassName: String =
classOf[MemoryLogStore].getName

protected override def beforeEach(): Unit = {
super.beforeEach()

MemoryLogStore.numGetLatestExternalEntryCalls = 0
}

testHadoopConf(
expectedErrMsg = "No FileSystem for scheme \"fake\"",
"fs.fake.impl" -> classOf[FakeFileSystem].getName,
Expand All @@ -48,6 +59,102 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui
FileNames.deltaFile(new Path(s"failing:${logDir.getCanonicalPath}"), version)
}

test("#3423: listFrom only checks latest external store entry if listing a delta log file") {
withTempDir { tempDir =>
val store = createLogStore(spark)
.asInstanceOf[LogStoreAdaptor].logStoreImpl
.asInstanceOf[MemoryLogStore]
val logDir = new File(tempDir.getCanonicalPath, "_delta_log")
logDir.mkdir()

val deltaFilePath = getDeltaVersionPath(logDir, 0)
val dataFilePath = new Path(tempDir.getCanonicalPath, ".part-00000-da82aeb5-snappy.parquet")

val fs = deltaFilePath.getFileSystem(sessionHadoopConf)
fs.create(deltaFilePath).close()
fs.create(dataFilePath).close()

assert(MemoryLogStore.numGetLatestExternalEntryCalls == 0)

store.listFrom(deltaFilePath, sessionHadoopConf)
assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // contacted external store

store.listFrom(dataFilePath, sessionHadoopConf)
assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // did not contact external store
}
}

test("#3423: VACUUM does not check external store for latest entry") {

// previous behaviour: always check external store for latest entry when listing
// current behaviour: only check external store for latest entry when listing a delta log file
def doVacuumTestGetNumVacuumExternalStoreCalls(usePreviousListBehavior: Boolean): Int = {
var ret = -1

withTempDir { tempDir =>
withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") {
spark.conf.set(
MemoryLogStore.IS_DELTA_LOG_PATH_OVERRIDE_KEY,
usePreviousListBehavior)

val path = tempDir.getCanonicalPath

spark.range(100)
.withColumn("part", col("id") % 10)
.write
.format("delta")
.partitionBy("part")
.save(path)

spark.sql(s"DELETE FROM delta.`$path`")

val numExternalCallsBeforeVacuum = MemoryLogStore.numGetLatestExternalEntryCalls

spark.sql(s"VACUUM delta.`$path` RETAIN 0 HOURS")

val numExternalCallsAfterVacuum = MemoryLogStore.numGetLatestExternalEntryCalls

ret = numExternalCallsAfterVacuum - numExternalCallsBeforeVacuum
}
}

ret
}

assert(
doVacuumTestGetNumVacuumExternalStoreCalls(true) >
doVacuumTestGetNumVacuumExternalStoreCalls(false)
)
}

test("#3423: BaseExternalLogStore::isDeltaLogPath") {
val store = createLogStore(spark)
.asInstanceOf[LogStoreAdaptor].logStoreImpl
.asInstanceOf[MemoryLogStore]

// json file
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000.json")))

// checkpoint file
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0010.checkpoint.parquet")))

// file listing prefix
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000.")))

// delta_log folder (with / prefix)
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/")))

// delta_log folder (without / prefix)
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log")))

// obvious negative cases
assert(!store.isDeltaLogPath(new Path("s3://bucket/part-000-UUID.parquet")))

// edge cases of `_delta_log` in a folder
assert(!store.isDeltaLogPath(new Path("s3://bucket/__delta_log/")))
assert(!store.isDeltaLogPath(new Path("s3://bucket/_delta_log_")))
}

test("single write") {
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)
Expand Down Expand Up @@ -262,8 +369,6 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui
}
}
}

protected def shouldUseRenameToWriteCheckpoint: Boolean = false
}

///////////////////////////////////
Expand Down

0 comments on commit 6073d94

Please sign in to comment.