diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index afbc5a71505e6..309f16b445293 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -38,6 +38,7 @@ import org.apache.flink.state.forst.fs.filemapping.FSDataOutputStreamWithEntry; import org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource; import org.apache.flink.state.forst.fs.filemapping.FileMappingManager; +import org.apache.flink.state.forst.fs.filemapping.FileOwnership; import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider; import org.apache.flink.state.forst.fs.filemapping.MappingEntry; import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource; @@ -211,7 +212,11 @@ public synchronized ByteBufferWritableFSDataOutputStream create( // Try to create file cache for SST files CachedDataOutputStream cachedDataOutputStream = - createCachedDataOutputStream(dbFilePath, sourceRealPath, outputStream); + createCachedDataOutputStream( + dbFilePath, + sourceRealPath, + outputStream, + createdMappingEntry.getFileOwnership()); LOG.trace( "Create file: dbFilePath: {}, sourceRealPath: {}, cachedDataOutputStream: {}", @@ -233,7 +238,11 @@ public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath, in () -> { FSDataInputStream inputStream = source.openInputStream(bufferSize); CachedDataInputStream cachedDataInputStream = - createCachedDataInputStream(dbFilePath, source, inputStream); + createCachedDataInputStream( + dbFilePath, + source, + inputStream, + mappingEntry.getFileOwnership()); return cachedDataInputStream == null ? inputStream : cachedDataInputStream; }, DEFAULT_INPUT_STREAM_CAPACITY, @@ -251,7 +260,11 @@ public synchronized ByteBufferReadableFSDataInputStream open(Path dbFilePath) () -> { FSDataInputStream inputStream = source.openInputStream(); CachedDataInputStream cachedDataInputStream = - createCachedDataInputStream(dbFilePath, source, inputStream); + createCachedDataInputStream( + dbFilePath, + source, + inputStream, + mappingEntry.getFileOwnership()); return cachedDataInputStream == null ? inputStream : cachedDataInputStream; }, DEFAULT_INPUT_STREAM_CAPACITY, @@ -285,7 +298,7 @@ public synchronized boolean exists(final Path f) throws IOException { return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir(); } - if (FileOwnershipDecider.shouldAlwaysBeLocal(f)) { + if (FileOwnershipDecider.shouldAlwaysBeLocal(f, mappingEntry.getFileOwnership())) { return localFS.exists(mappingEntry.getSourcePath()); } else { // Should be protected with synchronized, since the file closing is not an atomic @@ -305,7 +318,7 @@ public synchronized FileStatus getFileStatus(Path path) throws IOException { if (mappingEntry == null) { return new FileStatusWrapper(delegateFS.getFileStatus(path), path); } - if (FileOwnershipDecider.shouldAlwaysBeLocal(path)) { + if (FileOwnershipDecider.shouldAlwaysBeLocal(path, mappingEntry.getFileOwnership())) { return new FileStatusWrapper(localFS.getFileStatus(mappingEntry.getSourcePath()), path); } else { // Should be protected with synchronized, since the file closing is not an atomic @@ -401,9 +414,13 @@ private static FileSystem getUnguardedFileSystem(Path path) throws IOException { } private @Nullable CachedDataOutputStream createCachedDataOutputStream( - Path dbFilePath, Path srcRealPath, FSDataOutputStream outputStream) throws IOException { + Path dbFilePath, + Path srcRealPath, + FSDataOutputStream outputStream, + FileOwnership fileOwnership) + throws IOException { // do not create cache for local files - if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath)) { + if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership)) { return null; } @@ -411,9 +428,13 @@ private static FileSystem getUnguardedFileSystem(Path path) throws IOException { } private @Nullable CachedDataInputStream createCachedDataInputStream( - Path dbFilePath, MappingEntrySource source, FSDataInputStream inputStream) + Path dbFilePath, + MappingEntrySource source, + FSDataInputStream inputStream, + FileOwnership fileOwnership) throws IOException { - if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath) || !source.cacheable()) { + if (FileOwnershipDecider.shouldAlwaysBeLocal(dbFilePath, fileOwnership) + || !source.cacheable()) { return null; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java index 909b44762704f..119ec9b44d400 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileOwnershipDecider.java @@ -43,6 +43,10 @@ public static boolean isSstFile(Path filePath) { return filePath.getName().endsWith(SST_SUFFIX); } + public static boolean shouldAlwaysBeLocal(Path filePath, FileOwnership fileOwnership) { + return !isSstFile(filePath) && fileOwnership != FileOwnership.NOT_OWNED; + } + public static boolean shouldAlwaysBeLocal(Path filePath) { return !isSstFile(filePath); } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index 50c3a1ed0bad1..b6f61afdb66f6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -30,11 +30,13 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.state.forst.fs.cache.BundledCacheLimitPolicy; import org.apache.flink.state.forst.fs.cache.FileBasedCache; import org.apache.flink.state.forst.fs.cache.FileCacheEntry; import org.apache.flink.state.forst.fs.cache.SizeBasedCacheLimitPolicy; import org.apache.flink.state.forst.fs.cache.SpaceBasedCacheLimitPolicy; +import org.apache.flink.state.forst.fs.filemapping.MappingEntry; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; @@ -426,6 +428,49 @@ public void testFileStatusAndExist() throws IOException { assertFileStatusAndBlockLocations(fileSystem, fileSystem.getFileStatus(sstRemotePath1)); } + @Test + public void testNotOwnedFileStatus() throws IOException { + org.apache.flink.core.fs.Path remotePath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote"); + org.apache.flink.core.fs.Path localPath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/local"); + ForStFlinkFileSystem fileSystem = + new ForStFlinkFileSystem( + // Return dummy file status which differs from real local file system + new LocalFileSystem() { + @Override + public FileStatus getFileStatus(org.apache.flink.core.fs.Path path) { + return new ForStFlinkFileSystem.DummyFSFileStatus(path); + } + }, + remotePath.toString(), + localPath.toString(), + null); + fileSystem.mkdirs(remotePath); + fileSystem.mkdirs(localPath); + org.apache.flink.core.fs.Path sstRemotePath1 = + new org.apache.flink.core.fs.Path(remotePath, "1.sst"); + ByteBufferWritableFSDataOutputStream os1 = fileSystem.create(sstRemotePath1); + os1.write(1); + os1.close(); + + // Mock restore procedure, getFileStatus should not use local file system to access when the + // ownership is given to Flink + MappingEntry mappingEntry = fileSystem.getMappingEntry(sstRemotePath1); + assertThat(mappingEntry).isNotNull(); + assertThat(mappingEntry.getSourcePath()).isNotNull(); + FileStateHandle remoteFileStateHandle = + new FileStateHandle(mappingEntry.getSourcePath(), 1L); + fileSystem.registerReusedRestoredFile( + mappingEntry.getSourcePath().toString(), remoteFileStateHandle, sstRemotePath1); + fileSystem.giveUpOwnership(sstRemotePath1, remoteFileStateHandle); + assertThat(fileSystem.getFileStatus(mappingEntry.getSourcePath()).getLen()) + .isNotEqualTo( + FileSystem.getLocalFileSystem() + .getFileStatus(mappingEntry.getSourcePath()) + .getLen()); + } + @Test public void testOverride() throws IOException { org.apache.flink.core.fs.Path remotePath =