Skip to content

Commit 4989549

Browse files
authored
[FLINK-38567][state/forst] Adapt ForStFileSystem to dummy mkdir() implementation (#27185)
1 parent 8f7f3e4 commit 4989549

File tree

6 files changed

+257
-19
lines changed

6 files changed

+257
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst.fs;
20+
21+
import org.apache.flink.core.fs.Path;
22+
23+
import java.io.IOException;
24+
import java.util.HashSet;
25+
import java.util.Set;
26+
27+
/**
28+
* A decorator of {@link ForStFlinkFileSystem} to adapt ForSt to the underlying FileSystems which
29+
* are implemented with incomplete mkdir(), i.e., such FileSystem implementation does not actually
30+
* create the directory when mkdir() completes. This can lead to unexpected behavior when ForSt
31+
* tries to assert the existence of directories by calling exists(). Therefore, we track the paths
32+
* of the should-be-created directories and subsequently return true for existence checks.
33+
*/
34+
public class ForStFileSystemTrackingCreatedDirDecorator extends ForStFlinkFileSystem {
35+
private final Set<Path> createdDirPaths = new HashSet<>();
36+
37+
ForStFileSystemTrackingCreatedDirDecorator(ForStFlinkFileSystem fileSystem) {
38+
super(fileSystem);
39+
}
40+
41+
@Override
42+
public synchronized boolean mkdirs(Path path) throws IOException {
43+
boolean mkdirSucceed = super.mkdirs(path);
44+
if (!mkdirSucceed) {
45+
return false;
46+
}
47+
48+
createdDirPaths.add(path);
49+
return true;
50+
}
51+
52+
@Override
53+
public synchronized boolean exists(final Path f) throws IOException {
54+
if (createdDirPaths.contains(f)) {
55+
return true;
56+
}
57+
return super.exists(f);
58+
}
59+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst.fs;
20+
21+
import org.apache.flink.core.fs.Path;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import javax.annotation.Nullable;
27+
28+
import java.io.IOException;
29+
import java.util.UUID;
30+
31+
/** Utils for ForStFileSystem. */
32+
public class ForStFileSystemUtils {
33+
private static final Logger LOG = LoggerFactory.getLogger(ForStFileSystemUtils.class);
34+
35+
private static final String DUMMY_DIR_NAME = "_dummy_dir_";
36+
37+
public static boolean isParentDir(@Nullable Path path, String dir) {
38+
if (path == null) {
39+
return false;
40+
}
41+
return isParentDir(path.toString(), dir);
42+
}
43+
44+
public static boolean isParentDir(String path, String dir) {
45+
if (dir.isEmpty()) {
46+
return false;
47+
}
48+
if (dir.charAt(dir.length() - 1) == '/') {
49+
return path.startsWith(dir);
50+
} else {
51+
return (path.startsWith(dir + "/"));
52+
}
53+
}
54+
55+
public static ForStFlinkFileSystem tryDecorate(ForStFlinkFileSystem fileSystem) {
56+
try {
57+
return isIncompleteMkdirEnabled(fileSystem)
58+
? new ForStFileSystemTrackingCreatedDirDecorator(fileSystem)
59+
: fileSystem;
60+
} catch (IOException e) {
61+
LOG.info("Cannot decorate ForStFlinkFileSystem", e);
62+
}
63+
return fileSystem;
64+
}
65+
66+
private static boolean isIncompleteMkdirEnabled(ForStFlinkFileSystem fileSystem)
67+
throws IOException {
68+
// check if the underlying FileSystem uses an incomplete mkdir implementation
69+
Path dummyDir = new Path(fileSystem.getRemoteBase(), DUMMY_DIR_NAME + UUID.randomUUID());
70+
if (fileSystem.mkdirs(dummyDir)) {
71+
if (!fileSystem.exists(dummyDir)) {
72+
return true;
73+
}
74+
fileSystem.delete(new Path(DUMMY_DIR_NAME), true);
75+
return false;
76+
} else {
77+
LOG.info(
78+
"Cannot to mkdir for "
79+
+ DUMMY_DIR_NAME
80+
+ ", skip decoration of ForStFlinkFileSystem");
81+
}
82+
return false;
83+
}
84+
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ public ForStFlinkFileSystem(
9090
this.fileMappingManager = new FileMappingManager(delegateFS, remoteBase, localBase);
9191
}
9292

93+
protected ForStFlinkFileSystem(ForStFlinkFileSystem forStFlinkFileSystem) {
94+
this.localFS = forStFlinkFileSystem.localFS;
95+
this.delegateFS = forStFlinkFileSystem.delegateFS;
96+
this.remoteBase = forStFlinkFileSystem.remoteBase;
97+
this.fileBasedCache = forStFlinkFileSystem.fileBasedCache;
98+
this.fileMappingManager = forStFlinkFileSystem.fileMappingManager;
99+
}
100+
93101
/**
94102
* Returns a reference to the {@link FileSystem} instance for accessing the file system
95103
* identified by the given {@link URI}.

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class StringifiedForStFileSystem {
3333
private ForStFlinkFileSystem fileSystem;
3434

3535
public StringifiedForStFileSystem(ForStFlinkFileSystem fileSystem) {
36-
this.fileSystem = fileSystem;
36+
this.fileSystem = ForStFileSystemUtils.tryDecorate(fileSystem);
3737
}
3838

3939
public static StringifiedForStFileSystem get(String uri) throws IOException {

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.Map;
3838
import java.util.UUID;
3939

40+
import static org.apache.flink.state.forst.fs.ForStFileSystemUtils.isParentDir;
41+
4042
/**
4143
* A manager to manage file mapping of forst file system, including misc file mapping (remote file
4244
* -> local file) and linked mapping (remote file -> remote file). Note, the key/value of mapping
@@ -309,24 +311,6 @@ public boolean deleteFileOrDirectory(Path file, boolean recursive) throws IOExce
309311
return mappingTable.getOrDefault(path, null);
310312
}
311313

312-
private boolean isParentDir(@Nullable Path path, String dir) {
313-
if (path == null) {
314-
return false;
315-
}
316-
return isParentDir(path.toString(), dir);
317-
}
318-
319-
private boolean isParentDir(String path, String dir) {
320-
if (dir.isEmpty()) {
321-
return false;
322-
}
323-
if (dir.charAt(dir.length() - 1) == '/') {
324-
return path.startsWith(dir);
325-
} else {
326-
return (path.startsWith(dir + "/"));
327-
}
328-
}
329-
330314
public void giveUpOwnership(Path path, StreamStateHandle stateHandle) {
331315
MappingEntry mappingEntry = mappingTable.getOrDefault(path.toString(), null);
332316
Preconditions.checkArgument(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.forst.fs;
20+
21+
import org.apache.flink.core.fs.local.LocalFileSystem;
22+
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.io.TempDir;
25+
26+
import java.io.IOException;
27+
import java.nio.file.Path;
28+
import java.util.UUID;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Tests for {@link ForStFileSystemTrackingCreatedDirDecorator}. */
33+
public class ForStFileSystemTrackingCreatedDirDecoratorTest {
34+
@TempDir static Path tempDir;
35+
36+
private static class MockLocalFileSystem extends LocalFileSystem {
37+
private final boolean dummyMkdirEnabled;
38+
39+
public MockLocalFileSystem(boolean dummyMkdirEnabled) {
40+
super();
41+
this.dummyMkdirEnabled = dummyMkdirEnabled;
42+
}
43+
44+
@Override
45+
public synchronized boolean mkdirs(org.apache.flink.core.fs.Path path) throws IOException {
46+
if (dummyMkdirEnabled) {
47+
return true;
48+
} else {
49+
return super.mkdirs(path);
50+
}
51+
}
52+
}
53+
54+
@Test
55+
public void testMkdirAndCheck() throws IOException {
56+
mkdirAndCheck(false);
57+
}
58+
59+
@Test
60+
public void testDummyMkdirAndCheck() throws IOException {
61+
mkdirAndCheck(true);
62+
}
63+
64+
void mkdirAndCheck(boolean enableDummyMkdir) throws IOException {
65+
org.apache.flink.core.fs.Path remotePath =
66+
new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote");
67+
org.apache.flink.core.fs.Path localPath =
68+
new org.apache.flink.core.fs.Path(tempDir.toString() + "/local");
69+
70+
MockLocalFileSystem mockLocalFileSystem = new MockLocalFileSystem(enableDummyMkdir);
71+
ForStFlinkFileSystem fileSystem =
72+
ForStFileSystemUtils.tryDecorate(
73+
new ForStFlinkFileSystem(
74+
mockLocalFileSystem,
75+
remotePath.toString(),
76+
localPath.toString(),
77+
null));
78+
if (enableDummyMkdir) {
79+
assertThat(fileSystem).isInstanceOf(ForStFileSystemTrackingCreatedDirDecorator.class);
80+
}
81+
82+
// create a directory
83+
String dirPathStr = genRandomFilePathStr();
84+
org.apache.flink.core.fs.Path testMkdirPath = new org.apache.flink.core.fs.Path(dirPathStr);
85+
fileSystem.mkdirs(testMkdirPath);
86+
assertThat(mockLocalFileSystem.exists(testMkdirPath)).isEqualTo(!enableDummyMkdir);
87+
assertThat(fileSystem.exists(testMkdirPath)).isTrue();
88+
89+
// create sub directories
90+
for (int i = 0; i < 10; i++) {
91+
String subDirName = UUID.randomUUID().toString();
92+
org.apache.flink.core.fs.Path testSubMkdirPath =
93+
new org.apache.flink.core.fs.Path(dirPathStr, subDirName);
94+
fileSystem.mkdirs(testSubMkdirPath);
95+
assertThat(mockLocalFileSystem.exists(testSubMkdirPath)).isEqualTo(!enableDummyMkdir);
96+
assertThat(fileSystem.exists(testSubMkdirPath)).isTrue();
97+
}
98+
}
99+
100+
private String genRandomFilePathStr() {
101+
return tempDir.toString() + "/" + UUID.randomUUID();
102+
}
103+
}

0 commit comments

Comments
 (0)