Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2 Cherry Pick] [#3423] Fix unnecessary DynamoDB GET calls during LogStore::listFrom VACUUM calls #3463

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -128,14 +129,19 @@ public BaseExternalLogStore(Configuration hadoopConf) {
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = stripUserInfo(fs.makeQualified(path));
final Path tablePath = getTablePath(resolvedPath);
final Optional<ExternalCommitEntry> entry = getLatestExternalEntry(tablePath);

if (entry.isPresent() && !entry.get().complete) {
// Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce
// the chance of many reader threads in a single JVM doing duplicate copies of
// T(N) -> N.json.
fixDeltaLog(fs, entry.get());

// VACUUM operations may use this LogStore::listFrom API. We don't need to attempt to
// perform a fix/recovery during such operations that are not listing the _delta_log.
if (isDeltaLogPath(resolvedPath)) {
final Path tablePath = getTablePath(resolvedPath);
final Optional<ExternalCommitEntry> entry = getLatestExternalEntry(tablePath);

if (entry.isPresent() && !entry.get().complete) {
// Note: `fixDeltaLog` will apply per-JVM mutual exclusion via a lock to help reduce
// the chance of many reader threads in a single JVM doing duplicate copies of
// T(N) -> N.json.
fixDeltaLog(fs, entry.get());
}
}

// This is predicated on the storage system providing consistent listing
Expand Down Expand Up @@ -471,4 +477,14 @@ private Path stripUserInfo(Path path) {
throw new IllegalArgumentException(e);
}
}

/** Returns true if this path is contained within a _delta_log folder. */
@VisibleForTesting
protected boolean isDeltaLogPath(Path normalizedPath) {
return Arrays.stream(normalizedPath
.toUri()
.toString()
.split(Path.SEPARATOR)
).anyMatch("_delta_log"::equals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,19 @@
* database)
*/
public class MemoryLogStore extends BaseExternalLogStore {
public static String IS_DELTA_LOG_PATH_OVERRIDE_KEY =
"spark.hadoop.io.delta.storage.MemoryLogStore.isDeltaLogPath.alwaysTrue";

public static int numGetLatestExternalEntryCalls = 0;

public MemoryLogStore(Configuration hadoopConf) {
super(hadoopConf);
}

///////////////////
// API Overrides //
///////////////////

@Override
protected void putExternalEntry(
ExternalCommitEntry entry,
Expand Down Expand Up @@ -69,6 +78,8 @@ protected Optional<ExternalCommitEntry> getExternalEntry(

@Override
protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
numGetLatestExternalEntryCalls++;

final Path fixedTablePath = new Path(fixPathSchema(tablePath.toString()));
return hashMap
.values()
Expand All @@ -77,6 +88,19 @@ protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
.max(Comparator.comparing(ExternalCommitEntry::absoluteFilePath));
}

@Override
protected boolean isDeltaLogPath(Path normalizedPath) {
if (initHadoopConf().getBoolean(IS_DELTA_LOG_PATH_OVERRIDE_KEY, false)) {
return true; // hardcoded to return true
} else {
return super.isDeltaLogPath(normalizedPath); // only return true if in _delta_log folder
}
}

////////////////////
// Static Helpers //
////////////////////

/**
* ExternalLogStoreSuite sometimes uses "failing:" scheme prefix to inject errors during tests
* However, we want lookups for the same $tablePath to return the same result, regardless of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@ import org.apache.hadoop.fs._
import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.delta.FakeFileSystem
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStoreAdaptor
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.functions.col

/////////////////////
// Base Test Suite //
/////////////////////

class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSuite {
protected def shouldUseRenameToWriteCheckpoint: Boolean = false

override protected val publicLogStoreClassName: String =
classOf[MemoryLogStore].getName

protected override def beforeEach(): Unit = {
super.beforeEach()

MemoryLogStore.numGetLatestExternalEntryCalls = 0
}

testHadoopConf(
expectedErrMsg = "No FileSystem for scheme \"fake\"",
"fs.fake.impl" -> classOf[FakeFileSystem].getName,
Expand All @@ -48,6 +59,102 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui
FileNames.unsafeDeltaFile(new Path(s"failing:${logDir.getCanonicalPath}"), version)
}

test("#3423: listFrom only checks latest external store entry if listing a delta log file") {
withTempDir { tempDir =>
val store = createLogStore(spark)
.asInstanceOf[LogStoreAdaptor].logStoreImpl
.asInstanceOf[MemoryLogStore]
val logDir = new File(tempDir.getCanonicalPath, "_delta_log")
logDir.mkdir()

val deltaFilePath = getDeltaVersionPath(logDir, 0)
val dataFilePath = new Path(tempDir.getCanonicalPath, ".part-00000-da82aeb5-snappy.parquet")

val fs = deltaFilePath.getFileSystem(sessionHadoopConf)
fs.create(deltaFilePath).close()
fs.create(dataFilePath).close()

assert(MemoryLogStore.numGetLatestExternalEntryCalls == 0)

store.listFrom(deltaFilePath, sessionHadoopConf)
assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // contacted external store

store.listFrom(dataFilePath, sessionHadoopConf)
assert(MemoryLogStore.numGetLatestExternalEntryCalls == 1) // did not contact external store
}
}

test("#3423: VACUUM does not check external store for latest entry") {

// previous behaviour: always check external store for latest entry when listing
// current behaviour: only check external store for latest entry when listing a delta log file
def doVacuumTestGetNumVacuumExternalStoreCalls(usePreviousListBehavior: Boolean): Int = {
var ret = -1

withTempDir { tempDir =>
withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") {
spark.conf.set(
MemoryLogStore.IS_DELTA_LOG_PATH_OVERRIDE_KEY,
usePreviousListBehavior)

val path = tempDir.getCanonicalPath

spark.range(100)
.withColumn("part", col("id") % 10)
.write
.format("delta")
.partitionBy("part")
.save(path)

spark.sql(s"DELETE FROM delta.`$path`")

val numExternalCallsBeforeVacuum = MemoryLogStore.numGetLatestExternalEntryCalls

spark.sql(s"VACUUM delta.`$path` RETAIN 0 HOURS")

val numExternalCallsAfterVacuum = MemoryLogStore.numGetLatestExternalEntryCalls

ret = numExternalCallsAfterVacuum - numExternalCallsBeforeVacuum
}
}

ret
}

assert(
doVacuumTestGetNumVacuumExternalStoreCalls(true) >
doVacuumTestGetNumVacuumExternalStoreCalls(false)
)
}

test("#3423: BaseExternalLogStore::isDeltaLogPath") {
val store = createLogStore(spark)
.asInstanceOf[LogStoreAdaptor].logStoreImpl
.asInstanceOf[MemoryLogStore]

// json file
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000.json")))

// checkpoint file
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0010.checkpoint.parquet")))

// file listing prefix
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/0000.")))

// delta_log folder (with / prefix)
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log/")))

// delta_log folder (without / prefix)
assert(store.isDeltaLogPath(new Path("s3://bucket/_delta_log")))

// obvious negative cases
assert(!store.isDeltaLogPath(new Path("s3://bucket/part-000-UUID.parquet")))

// edge cases of `_delta_log` in a folder
assert(!store.isDeltaLogPath(new Path("s3://bucket/__delta_log/")))
assert(!store.isDeltaLogPath(new Path("s3://bucket/_delta_log_")))
}

test("single write") {
withTempLogDir { tempLogDir =>
val store = createLogStore(spark)
Expand Down Expand Up @@ -262,8 +369,6 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui
}
}
}

protected def shouldUseRenameToWriteCheckpoint: Boolean = false
}

///////////////////////////////////
Expand Down
Loading