Skip to content

Commit

Permalink
[FLINK-37031][state/forst] Bump forstjni to 0.1.5 && make ForStFlinkF…
Browse files Browse the repository at this point in the history
…ileSystem thread safe" (#25927)
  • Loading branch information
fredia authored Jan 9, 2025
1 parent de40a75 commit a79f2e2
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 54 deletions.
2 changes: 1 addition & 1 deletion flink-dist/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This project bundles the following dependencies under the Apache Software Licens
- com.google.code.findbugs:jsr305:1.3.9
- com.twitter:chill-java:0.7.6
- com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0
- com.ververica:forstjni:0.1.4-beta
- com.ververica:forstjni:0.1.5
- commons-cli:commons-cli:1.5.0
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.15.1
Expand Down
2 changes: 1 addition & 1 deletion flink-state-backends/flink-statebackend-forst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ under the License.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>forstjni</artifactId>
<version>0.1.4-beta</version>
<version>0.1.5</version>
</dependency>

<!-- test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public final class ForStResourceContainer implements AutoCloseable {
// the filename length limit is 255 on most operating systems
// In rocksdb, if db_log_dir is non empty, the log files will be in the specified dir,
// and the db data dir's absolute path will be used as the log file name's prefix.
private static final int INSTANCE_PATH_LENGTH_LIMIT =
255 / 2 - FORST_RELOCATE_LOG_SUFFIX.length();
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length();

@Nullable private FlinkEnv flinkEnv = null;

Expand Down Expand Up @@ -396,9 +395,8 @@ public void clearDirectories() throws Exception {
}
}

private void clearDirectories(Path basePath) throws IOException {
FileSystem fileSystem =
forStFileSystem != null ? forStFileSystem : basePath.getFileSystem();
private static void clearDirectories(Path basePath) throws IOException {
FileSystem fileSystem = basePath.getFileSystem();
if (fileSystem.exists(basePath)) {
fileSystem.delete(basePath, true);
}
Expand Down Expand Up @@ -485,19 +483,11 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions)

String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
if (logDir == null || logDir.isEmpty()) {
if (localForStPath == null
|| localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
// only relocate db log dir in local mode
if (remoteForStPath == null
&& localForStPath != null
&& localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
relocateDefaultDbLogDir(currentOptions);
} else if (remoteForStPath != null) { // log must put in local
Path relocatedPath = localForStPath.getParent().getParent();
LOG.warn("ForSt remote path is not null, relocate log in {}.", relocatedPath);
currentOptions.setDbLogDir(relocatedPath.toString());
} else {
// disable log relocate when instance path length exceeds limit to prevent ForSt
// log file creation failure, details in FLINK-31743
LOG.warn(
"ForSt local path length exceeds limit : {}, disable log relocate.",
localForStPath);
}
} else {
currentOptions.setDbLogDir(logDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
Expand All @@ -38,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -66,13 +69,13 @@ public class ForStStateDataTransfer implements Closeable {

protected final ExecutorService executorService;

private final FileSystem forStFs;
@Nullable private final ForStFlinkFileSystem forStFs;

public ForStStateDataTransfer(int threadNum) {
this(threadNum, null);
}

public ForStStateDataTransfer(int threadNum, FileSystem forStFs) {
public ForStStateDataTransfer(int threadNum, ForStFlinkFileSystem forStFs) {
this.forStFs = forStFs;
if (threadNum > 1) {
executorService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException
}

@Override
public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode)
throws IOException {
public synchronized ByteBufferWritableFSDataOutputStream create(
Path path, WriteMode overwriteMode) throws IOException {
FileMappingManager.RealPath realPath = fileMappingManager.createFile(path);
if (realPath.isLocal) {
return new ByteBufferWritableFSDataOutputStream(
Expand All @@ -152,7 +152,8 @@ public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwrit
}

@Override
public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException {
public synchronized ByteBufferReadableFSDataInputStream open(Path path, int bufferSize)
throws IOException {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
Expand All @@ -176,7 +177,7 @@ public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throw
}

@Override
public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
public synchronized ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
Expand All @@ -200,27 +201,27 @@ public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
public synchronized boolean rename(Path src, Path dst) throws IOException {
return fileMappingManager.renameFile(src.toString(), dst.toString());
}

@Override
public Path getWorkingDirectory() {
public synchronized Path getWorkingDirectory() {
return delegateFS.getWorkingDirectory();
}

@Override
public Path getHomeDirectory() {
public synchronized Path getHomeDirectory() {
return delegateFS.getHomeDirectory();
}

@Override
public URI getUri() {
public synchronized URI getUri() {
return delegateFS.getUri();
}

@Override
public boolean exists(final Path f) throws IOException {
public synchronized boolean exists(final Path f) throws IOException {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(f);
if (realPath == null) {
return delegateFS.exists(f) && delegateFS.getFileStatus(f).isDir();
Expand All @@ -239,7 +240,7 @@ public boolean exists(final Path f) throws IOException {
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
public synchronized FileStatus getFileStatus(Path path) throws IOException {
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Preconditions.checkNotNull(realPath);
if (realPath.isLocal) {
Expand All @@ -249,7 +250,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
}

@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
public synchronized BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
Path path = file.getPath();
FileMappingManager.RealPath realPath = fileMappingManager.realPath(path);
Expand All @@ -262,7 +263,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
}

@Override
public FileStatus[] listStatus(Path path) throws IOException {
public synchronized FileStatus[] listStatus(Path path) throws IOException {
// mapping files
List<FileStatus> fileStatuses = new ArrayList<>();
String pathStr = path.toString();
Expand All @@ -281,7 +282,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
public synchronized boolean delete(Path path, boolean recursive) throws IOException {
boolean success = fileMappingManager.deleteFile(path, recursive);
if (fileBasedCache != null) {
// only new generated file will put into cache, no need to consider file mapping
Expand All @@ -291,16 +292,16 @@ public boolean delete(Path path, boolean recursive) throws IOException {
}

@Override
public boolean mkdirs(Path path) throws IOException {
public synchronized boolean mkdirs(Path path) throws IOException {
return delegateFS.mkdirs(path);
}

@Override
public boolean isDistributedFS() {
public synchronized boolean isDistributedFS() {
return delegateFS.isDistributedFS();
}

public int link(Path src, Path dst) throws IOException {
public synchronized int link(Path src, Path dst) throws IOException {
return fileMappingManager.link(src.toString(), dst.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public FileMappingManager(
public RealPath createFile(Path file) {
String fileName = file.toString();
Preconditions.checkState(!mappingTable.containsKey(fileName));
if (!fileName.endsWith(SST_SUFFIX) && fileName.startsWith(remoteBase)) {
if (!fileName.endsWith(SST_SUFFIX) && isParentDir(fileName, remoteBase)) {
Path localFile = new Path(localBase, file.getName());
mappingTable.put(
fileName,
Expand All @@ -92,16 +92,9 @@ public int link(String src, String dst) {
return -1;
}
MappingEntry sourceEntry = mappingTable.get(src);
if (sourceEntry != null) {
sourceEntry.retain();
mappingTable.putIfAbsent(dst, sourceEntry);
} else {
sourceEntry = new MappingEntry(0, fileSystem, src, false, false);
sourceEntry.retain();
mappingTable.put(src, sourceEntry);
sourceEntry.retain();
mappingTable.put(dst, sourceEntry);
}
Preconditions.checkNotNull(sourceEntry);
sourceEntry.retain();
mappingTable.putIfAbsent(dst, sourceEntry);
LOG.trace("link: {} -> {}", dst, src);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,10 @@ public ForStRestoreResult restore() throws Exception {
}

private void transferAllStateHandles(List<StateHandleTransferSpec> specs) throws Exception {
FileSystem forStFs = getFileSystem(optionsContainer.getBasePath());
try (ForStStateDataTransfer transfer =
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) {
new ForStStateDataTransfer(
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getFileSystem())) {
transfer.transferAllStateDataToDirectory(specs, cancelStreamRegistry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void testDefaultDbLogDir() throws Exception {
final File logFile = File.createTempFile(getClass().getSimpleName() + "-", ".log");
// set the environment variable 'log.file' with the Flink log file location
System.setProperty("log.file", logFile.getPath());
try (ForStResourceContainer container = backend.createOptionsAndResourceContainer(null)) {
try (ForStResourceContainer container =
backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) {
assertEquals(
ForStConfigurableOptions.LOG_LEVEL.defaultValue(),
container.getDbOptions().infoLogLevel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void testFileLink() throws IOException {
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));
String dst = tempDir.toString() + "/dst";
fileMappingManager.link(src, dst);
assertThat(fileMappingManager.realPath(new Path(dst)).path.toString()).isEqualTo(src);
Expand All @@ -60,6 +61,7 @@ void testNestLink() throws IOException {
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));
String dstB = tempDir.toString() + "/b";
fileMappingManager.link(src, dstB);
assertThat(fileMappingManager.realPath(new Path(dstB)).path.toString()).isEqualTo(src);
Expand Down Expand Up @@ -87,6 +89,7 @@ void testFileDelete() throws IOException {
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));
String dst = tempDir.toString() + "/dst";
fileMappingManager.link(src, dst);
// delete src
Expand All @@ -102,13 +105,15 @@ void testFileDelete() throws IOException {
void testDirectoryDelete() throws IOException {
FileSystem localFS = FileSystem.getLocalFileSystem();
FileMappingManager fileMappingManager =
new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString());
new FileMappingManager(
localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db");
String testDir = tempDir + "/testDir";
localFS.mkdirs(new Path(testDir));
String src = testDir + "/source";
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));
String dst = tempDir.toString() + "/dst";
fileMappingManager.link(src, dst);

Expand All @@ -127,13 +132,15 @@ void testDirectoryDelete() throws IOException {
void testDirectoryRename() throws IOException {
FileSystem localFS = FileSystem.getLocalFileSystem();
FileMappingManager fileMappingManager =
new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString());
new FileMappingManager(
localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db");
String testDir = tempDir + "/testDir";
localFS.mkdirs(new Path(testDir));
String src = testDir + "/source";
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));

String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
localFS.mkdirs(new Path(linkedDirTmp));
Expand Down Expand Up @@ -175,13 +182,15 @@ void testDirectoryRename() throws IOException {
void testCreateFileBeforeRename() throws IOException {
FileSystem localFS = FileSystem.getLocalFileSystem();
FileMappingManager fileMappingManager =
new FileMappingManager(localFS, localFS, tempDir.toString(), tempDir.toString());
new FileMappingManager(
localFS, localFS, tempDir.toString() + "/db", tempDir.toString() + "/db");
String testDir = tempDir + "/testDir";
localFS.mkdirs(new Path(testDir));
String src = testDir + "/source";
FSDataOutputStream os = localFS.create(new Path(src), FileSystem.WriteMode.OVERWRITE);
os.write(233);
os.close();
fileMappingManager.createFile(new Path(src));

String linkedDirTmp = tempDir.toString() + "/linkedDir.tmp";
localFS.mkdirs(new Path(linkedDirTmp));
Expand Down

0 comments on commit a79f2e2

Please sign in to comment.