Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.state.forst.fs.filemapping.FSDataOutputStreamWithEntry;
import org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
import org.apache.flink.state.forst.fs.filemapping.FileMappingManager;
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
Expand Down Expand Up @@ -211,7 +212,11 @@ public synchronized ByteBufferWritableFSDataOutputStream create(

// Try to create file cache for SST files
CachedDataOutputStream cachedDataOutputStream =
createCachedDataOutputStream(dbFilePath, sourceRealPath, outputStream);
createCachedDataOutputStream(
dbFilePath,
sourceRealPath,
outputStream,
createdMappingEntry.getFileOwnership());

LOG.trace(
"Create file: dbFilePath: {}, sourceRealPath: {}, cachedDataOutputStream: {}",
Expand All @@ -233,7 +238,11 @@ public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath, in
() -> {
FSDataInputStream inputStream = source.openInputStream(bufferSize);
CachedDataInputStream cachedDataInputStream =
createCachedDataInputStream(dbFilePath, source, inputStream);
createCachedDataInputStream(
dbFilePath,
source,
inputStream,
mappingEntry.getFileOwnership());
return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
Expand All @@ -251,7 +260,11 @@ public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath)
() -> {
FSDataInputStream inputStream = source.openInputStream();
CachedDataInputStream cachedDataInputStream =
createCachedDataInputStream(dbFilePath, source, inputStream);
createCachedDataInputStream(
dbFilePath,
source,
inputStream,
mappingEntry.getFileOwnership());
return cachedDataInputStream == null ? inputStream : cachedDataInputStream;
},
DEFAULT_INPUT_STREAM_CAPACITY,
Expand Down Expand Up @@ -285,7 +298,7 @@ public synchronized boolean exists(final Path f) throws IOException {
return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
}

if (FileOwnershipDecider.shouldAlwaysBeLocal(f)) {
if (FileOwnershipDecider.shouldAlwaysBeLocal(f, mappingEntry.getFileOwnership())) {
return localFS.exists(mappingEntry.getSourcePath());
} else {
// Should be protected with synchronized, since the file closing is not an atomic
Expand All @@ -305,7 +318,7 @@ public synchronized FileStatus getFileStatus(Path path) throws IOException {
if (mappingEntry == null) {
return new FileStatusWrapper(delegateFS.getFileStatus(path), path);
}
if (FileOwnershipDecider.shouldAlwaysBeLocal(path)) {
if (FileOwnershipDecider.shouldAlwaysBeLocal(path, mappingEntry.getFileOwnership())) {
return new FileStatusWrapper(localFS.getFileStatus(mappingEntry.getSourcePath()), path);
} else {
// Should be protected with synchronized, since the file closing is not an atomic
Expand Down Expand Up @@ -401,19 +414,27 @@ private static FileSystem getUnguardedFileSystem(Path path) throws IOException {
}

private @Nullable CachedDataOutputStream createCachedDataOutputStream(
Path dbFilePath, Path srcRealPath, FSDataOutputStream outputStream) throws IOException {
Path dbFilePath,
Path srcRealPath,
FSDataOutputStream outputStream,
FileOwnership fileOwnership)
throws IOException {
// do not create cache for local files
if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath)) {
if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)) {
return null;
}

return fileBasedCache == null ? null : fileBasedCache.create(outputStream, srcRealPath);
}

private @Nullable CachedDataInputStream createCachedDataInputStream(
Path dbFilePath, MappingEntrySource source, FSDataInputStream inputStream)
Path dbFilePath,
MappingEntrySource source,
FSDataInputStream inputStream,
FileOwnership fileOwnership)
throws IOException {
if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath) || !source.cacheable()) {
if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)
|| !source.cacheable()) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public static boolean isSstFile(Path filePath) {
return filePath.getName().endsWith(SST_SUFFIX);
}

public static boolean shouldAlwaysBeLocal(Path filePath, FileOwnership fileOwnership) {
return !isSstFile(filePath) && fileOwnership != FileOwnership.NOT_OWNED;
}

public static boolean shouldAlwaysBeLocal(Path filePath) {
return !isSstFile(filePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.fs.cache.BundledCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.fs.cache.FileCacheEntry;
import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
Expand Down Expand Up @@ -426,6 +428,49 @@ public void testFileStatusAndExist() throws IOException {
assertFileStatusAndBlockLocations(fileSystem, fileSystem.getFileStatus(sstRemotePath1));
}

@Test
public void testNotOwnedFileStatus() throws IOException {
org.apache.flink.core.fs.Path remotePath =
new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote");
org.apache.flink.core.fs.Path localPath =
new org.apache.flink.core.fs.Path(tempDir.toString() + "/local");
ForStFlinkFileSystem fileSystem =
new ForStFlinkFileSystem(
// Return dummy file status which differs from real local file system
new LocalFileSystem() {
@Override
public FileStatus getFileStatus(org.apache.flink.core.fs.Path path) {
return new ForStFlinkFileSystem.DummyFSFileStatus(path);
}
},
remotePath.toString(),
localPath.toString(),
null);
fileSystem.mkdirs(remotePath);
fileSystem.mkdirs(localPath);
org.apache.flink.core.fs.Path sstRemotePath1 =
new org.apache.flink.core.fs.Path(remotePath, "1.sst");
ByteBufferWritableFSDataOutputStream os1 = fileSystem.create(sstRemotePath1);
os1.write(1);
os1.close();

// Mock restore procedure, getFileStatus should not use local file system to access when the
// ownership is given to Flink
MappingEntry mappingEntry = fileSystem.getMappingEntry(sstRemotePath1);
assertThat(mappingEntry).isNotNull();
assertThat(mappingEntry.getSourcePath()).isNotNull();
FileStateHandle remoteFileStateHandle =
new FileStateHandle(mappingEntry.getSourcePath(), 1L);
fileSystem.registerReusedRestoredFile(
mappingEntry.getSourcePath().toString(), remoteFileStateHandle, sstRemotePath1);
fileSystem.giveUpOwnership(sstRemotePath1, remoteFileStateHandle);
assertThat(fileSystem.getFileStatus(mappingEntry.getSourcePath()).getLen())
.isNotEqualTo(
FileSystem.getLocalFileSystem()
.getFileStatus(mappingEntry.getSourcePath())
.getLen());
}

@Test
public void testOverride() throws IOException {
org.apache.flink.core.fs.Path remotePath =
Expand Down