Skip to content

Commit ac7750e

Browse files
liangyu-1huangxiaofeng10047
authored andcommitted
[FLINK-36112] Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
This closes apache#25226
1 parent 22a360a commit ac7750e

File tree

8 files changed

+163
-6
lines changed

8 files changed

+163
-6
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
import java.io.Serializable;
7272
import java.util.Collection;
7373
import java.util.Collections;
74+
import java.util.HashMap;
75+
import java.util.Map;
7476

7577
import static org.apache.flink.util.Preconditions.checkNotNull;
7678
import static org.apache.flink.util.Preconditions.checkState;
@@ -526,6 +528,10 @@ public static class BulkFormatBuilder<IN, T extends BulkFormatBuilder<IN, T>>
526528

527529
private CheckpointRollingPolicy<IN, String> rollingPolicy;
528530

531+
private Map<String, String> writerConfig = new HashMap<>();
532+
533+
private static final String HDFS_NO_LOCAL_WRITE = "fs.hdfs.no-local-write";
534+
529535
private OutputFileConfig outputFileConfig;
530536

531537
private boolean isCompactDisabledExplicitly = false;
@@ -580,6 +586,11 @@ public T withRollingPolicy(CheckpointRollingPolicy<IN, String> rollingPolicy) {
580586
return self();
581587
}
582588

589+
public T disableLocalWriting() {
590+
this.writerConfig.put(HDFS_NO_LOCAL_WRITE, String.valueOf(true));
591+
return self();
592+
}
593+
583594
public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
584595
this.outputFileConfig = outputFileConfig;
585596
return self();
@@ -695,7 +706,8 @@ SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer()
695706

696707
BucketWriter<IN, String> createBucketWriter() throws IOException {
697708
return new BulkBucketWriter<>(
698-
FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory);
709+
FileSystem.get(basePath.toUri()).createRecoverableWriter(writerConfig),
710+
writerFactory);
699711
}
700712
}
701713

flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.flink.annotation.Internal;
2828
import org.apache.flink.annotation.Public;
29+
import org.apache.flink.annotation.PublicEvolving;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.configuration.CoreOptions;
3132
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -57,6 +58,7 @@
5758
import java.util.HashSet;
5859
import java.util.Iterator;
5960
import java.util.List;
61+
import java.util.Map;
6062
import java.util.ServiceLoader;
6163
import java.util.Set;
6264
import java.util.concurrent.locks.ReentrantLock;
@@ -665,6 +667,16 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
665667
return IFileSystem.super.createRecoverableWriter();
666668
}
667669

670+
@PublicEvolving
671+
@Override
672+
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
673+
if (conf == null || conf.isEmpty()) {
674+
return createRecoverableWriter();
675+
} else {
676+
return IFileSystem.super.createRecoverableWriter(conf);
677+
}
678+
}
679+
668680
@Override
669681
public abstract FileStatus[] listStatus(Path f) throws IOException;
670682

flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.FileNotFoundException;
3131
import java.io.IOException;
3232
import java.net.URI;
33+
import java.util.Map;
3334

3435
/**
3536
* Interface of all file systems used by Flink. This interface may be extended to implement
@@ -237,6 +238,25 @@ default RecoverableWriter createRecoverableWriter() throws IOException {
237238
"This file system does not support recoverable writers.");
238239
}
239240

241+
/**
242+
* Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can
243+
* persist and recover their intermediate state. Persisting and recovering intermediate state is
244+
* a core building block for writing to files that span multiple checkpoints.
245+
*
246+
* <p>The returned object can act as a shared factory to open and recover multiple streams.
247+
*
248+
* <p>This method is optional on file systems and various file system implementations may not
249+
* support this method, throwing an {@code UnsupportedOperationException}.
250+
*
251+
* @param conf Map contains a flag to indicate whether the writer should not write to local
252+
* storage. and can provide more information to instantiate the writer.
253+
* @return A RecoverableWriter for this file system.
254+
* @throws IOException Thrown, if the recoverable writer cannot be instantiated.
255+
*/
256+
default RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
257+
return createRecoverableWriter();
258+
}
259+
240260
/**
241261
* List the statuses of the files/directories in the given path if the path is a directory.
242262
*

flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.net.URI;
3030
import java.util.Locale;
31+
import java.util.Map;
3132

3233
import static org.apache.flink.util.Preconditions.checkNotNull;
3334

@@ -36,6 +37,8 @@
3637
*/
3738
public class HadoopFileSystem extends FileSystem {
3839

40+
private static final String HDFS_NO_LOCAL_WRITE = "fs.hdfs.no-local-write";
41+
3942
/** The wrapped Hadoop File System. */
4043
private final org.apache.hadoop.fs.FileSystem fs;
4144

@@ -215,6 +218,21 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
215218
return new HadoopRecoverableWriter(fs);
216219
}
217220

221+
@Override
222+
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
223+
// This writer is only supported on a subset of file systems, and on
224+
// specific versions. We check these schemes and versions eagerly for better error
225+
// messages in the constructor of the writer.
226+
if (conf == null || conf.isEmpty()) {
227+
return createRecoverableWriter();
228+
} else if (conf.containsKey(HDFS_NO_LOCAL_WRITE)) {
229+
return new HadoopRecoverableWriter(
230+
fs, Boolean.parseBoolean(conf.get(HDFS_NO_LOCAL_WRITE)));
231+
} else {
232+
return new HadoopRecoverableWriter(fs);
233+
}
234+
}
235+
218236
// ------------------------------------------------------------------------
219237
// Utilities
220238
// ------------------------------------------------------------------------

flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import org.apache.flink.util.IOUtils;
3030
import org.apache.flink.util.Preconditions;
3131

32+
import org.apache.hadoop.fs.CreateFlag;
3233
import org.apache.hadoop.fs.FSDataOutputStream;
3334
import org.apache.hadoop.fs.FileStatus;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.hadoop.fs.Path;
37+
import org.apache.hadoop.fs.permission.FsPermission;
3638
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
3739
import org.apache.hadoop.hdfs.DistributedFileSystem;
3840
import org.apache.hadoop.util.VersionInfo;
@@ -43,8 +45,11 @@
4345
import java.lang.reflect.Method;
4446
import java.lang.reflect.Modifier;
4547
import java.time.Duration;
48+
import java.util.EnumSet;
4649

4750
import static org.apache.flink.util.Preconditions.checkNotNull;
51+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
52+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
4853

4954
/**
5055
* An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's file system
@@ -57,15 +62,32 @@ class HadoopRecoverableFsDataOutputStream extends BaseHadoopFsRecoverableFsDataO
5762

5863
private static Method truncateHandle;
5964

60-
HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
65+
HadoopRecoverableFsDataOutputStream(
66+
FileSystem fs, Path targetFile, Path tempFile, boolean noLocalWrite)
6167
throws IOException {
6268

6369
ensureTruncateInitialized();
6470

6571
this.fs = checkNotNull(fs);
6672
this.targetFile = checkNotNull(targetFile);
6773
this.tempFile = checkNotNull(tempFile);
68-
this.out = fs.create(tempFile);
74+
if (noLocalWrite) {
75+
this.out =
76+
fs.create(
77+
tempFile,
78+
FsPermission.getFileDefault(),
79+
EnumSet.of(
80+
CreateFlag.CREATE,
81+
CreateFlag.OVERWRITE,
82+
CreateFlag.NO_LOCAL_WRITE),
83+
fs.getConf()
84+
.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
85+
fs.getDefaultReplication(),
86+
fs.getDefaultBlockSize(),
87+
null);
88+
} else {
89+
this.out = fs.create(tempFile);
90+
}
6991
}
7092

7193
@VisibleForTesting

flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,22 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
4646
/** The Hadoop file system on which the writer operates. */
4747
protected final org.apache.hadoop.fs.FileSystem fs;
4848

49+
private final boolean noLocalWrite;
50+
4951
/**
5052
* Creates a new Recoverable writer.
5153
*
5254
* @param fs The Hadoop file system on which the writer operates.
5355
*/
5456
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
5557
this.fs = checkNotNull(fs);
58+
this.noLocalWrite = false;
59+
checkSupportedFSSchemes(fs);
60+
}
61+
62+
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs, boolean noLocalWrite) {
63+
this.fs = checkNotNull(fs);
64+
this.noLocalWrite = noLocalWrite;
5665

5766
checkSupportedFSSchemes(fs);
5867
}
@@ -86,7 +95,7 @@ public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
8695
protected RecoverableFsDataOutputStream getRecoverableFsDataOutputStream(
8796
org.apache.hadoop.fs.Path targetFile, org.apache.hadoop.fs.Path tempFile)
8897
throws IOException {
89-
return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
98+
return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile, noLocalWrite);
9099
}
91100

92101
@Override

flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,35 @@
2121
import org.apache.flink.core.fs.AbstractRecoverableWriterTest;
2222
import org.apache.flink.core.fs.FileSystem;
2323
import org.apache.flink.core.fs.Path;
24+
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
25+
import org.apache.flink.core.fs.RecoverableWriter;
26+
import org.apache.flink.mock.Whitebox;
2427
import org.apache.flink.runtime.util.HadoopUtils;
2528
import org.apache.flink.testutils.junit.utils.TempDirUtils;
2629
import org.apache.flink.util.OperatingSystem;
2730

2831
import org.apache.hadoop.conf.Configuration;
2932
import org.apache.hadoop.hdfs.MiniDFSCluster;
33+
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
34+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
35+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
36+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
37+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
38+
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
3039
import org.junit.jupiter.api.AfterAll;
40+
import org.junit.jupiter.api.Assertions;
3141
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.Test;
3243
import org.junit.jupiter.api.io.TempDir;
3344

3445
import java.io.File;
46+
import java.util.HashMap;
47+
import java.util.Map;
48+
import java.util.Random;
3549

3650
import static org.assertj.core.api.Assumptions.assumeThat;
51+
import static org.mockito.Mockito.doReturn;
52+
import static org.mockito.Mockito.spy;
3753

3854
/** Tests for the {@link HadoopRecoverableWriter}. */
3955
class HadoopRecoverableWriterTest extends AbstractRecoverableWriterTest {
@@ -63,8 +79,11 @@ static void createHDFS() throws Exception {
6379

6480
final Configuration hdConf = new Configuration();
6581
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
82+
hdConf.set("dfs.replication", "2");
83+
hdConf.set("dfs.blocksize", String.valueOf(512));
84+
hdConf.set("dfs.namenode.fs-limits.min-block-size", String.valueOf(512));
6685

67-
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
86+
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf).numDataNodes(3);
6887
hdfsCluster = builder.build();
6988

7089
final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
@@ -83,6 +102,51 @@ static void destroyHDFS() throws Exception {
83102
}
84103
}
85104

105+
private RecoverableWriter getNoLocalWriteFileSystemWriter() throws Exception {
106+
Map<String, String> conf = new HashMap<>();
107+
conf.put("fs.hdfs.no-local-write", "true");
108+
return fileSystem.createRecoverableWriter(conf);
109+
}
110+
111+
@Test
112+
void testNoLocalWriteFlag() throws Exception {
113+
createHDFS();
114+
final HadoopRecoverableWriter writer =
115+
(HadoopRecoverableWriter) getNoLocalWriteFileSystemWriter();
116+
BlockManager bm = hdfsCluster.getNameNode().getNamesystem().getBlockManager();
117+
DatanodeManager dm = bm.getDatanodeManager();
118+
try (RecoverableFsDataOutputStream os = writer.open(new Path("/tests/test-no-local"))) {
119+
// Inject a DatanodeManager that returns one DataNode as local node for
120+
// the client.
121+
DatanodeManager spyDm = spy(dm);
122+
DatanodeDescriptor dn1 =
123+
dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.LIVE).get(0);
124+
doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1");
125+
Whitebox.setInternalState(bm, "datanodeManager", spyDm);
126+
byte[] buf = new byte[512 * 16];
127+
new Random().nextBytes(buf);
128+
os.write(buf);
129+
} finally {
130+
Whitebox.setInternalState(bm, "datanodeManager", dm);
131+
}
132+
hdfsCluster.triggerBlockReports();
133+
final String bpid = hdfsCluster.getNamesystem().getBlockPoolId();
134+
// Total number of DataNodes is 3.
135+
Assertions.assertEquals(3, hdfsCluster.getAllBlockReports(bpid).size());
136+
int numDataNodesWithData = 0;
137+
for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks :
138+
hdfsCluster.getAllBlockReports(bpid)) {
139+
for (BlockListAsLongs blocks : dnBlocks.values()) {
140+
if (blocks.getNumberOfBlocks() > 0) {
141+
numDataNodesWithData++;
142+
break;
143+
}
144+
}
145+
}
146+
// Verify that only one DN has no data.
147+
Assertions.assertEquals(1, 3 - numDataNodesWithData);
148+
}
149+
86150
@Override
87151
public Path getBasePath() {
88152
return basePath;

tools/maven/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ under the License.
4343

4444
<!-- Legacy mockito usages -->
4545
<!-- Updated this suppress files section due to FLINK-36063, which migrated many classes from flink-streaming-java to flink-runtime. This allows runtime access to stream graph. -->
46-
<suppress files="BufferDataOverWindowOperatorTest.java|CEPOperatorTest.java|CepRuntimeContextTest.java|CliFrontendListTest.java|CliFrontendPackageProgramTest.java|CliFrontendSavepointTest.java|DemultiplexingRecordDeserializerTest.java|DropwizardMeterWrapperTest.java|DynamicEventTimeSessionWindowsTest.java|DynamicProcessingTimeSessionWindowsTest.java|EmbeddedRocksDBStateBackendTest.java|EventTimeSessionWindowsTest.java|FlinkCalciteCatalogReaderTest.java|FlinkMeterWrapperTest.java|HadoopDataInputStreamTest.java|HadoopInputFormatTest.java|HadoopOutputFormatTest.java|HadoopUtilsTest.java|HiveTableSourceITCase.java|HybridSourceReaderTest.java|HybridSourceSplitEnumeratorTest.java|InternalWindowFunctionTest.java|LocalStateForwardingTest.java|MergingWindowSetTest.java|NFAITCase.java|NonBufferOverWindowOperatorTest.java|PatternTest.java|ProcessingTimeSessionWindowsTest.java|PythonOperatorChainingOptimizerTest.java|PythonTestUtils.java|RawFormatSerDeSchemaTest.java|RegisterApplicationMasterResponseReflectorTest.java|RichAsyncFunctionTest.java|RocksDBIncrementalCheckpointUtilsTest.java|RocksDBKeyedStateBackendTestFactory.java|RocksDBStateBackendConfigTest.java|TestS3FileSystemFactory.java|SessionWindowAssignerTest.java|StreamSourceOperatorWatermarksTest.java|StreamTaskCancellationBarrierTest.java|StreamTaskSystemExitTest.java|StreamTaskTest.java|TestPartitionDiscoverer.java|Whitebox.java|WindowOperatorContractTest.java|WindowOperatorTest.java|WindowReaderTest.java"
46+
<suppress files="BufferDataOverWindowOperatorTest.java|CEPOperatorTest.java|CepRuntimeContextTest.java|CliFrontendListTest.java|CliFrontendPackageProgramTest.java|CliFrontendSavepointTest.java|DemultiplexingRecordDeserializerTest.java|DropwizardMeterWrapperTest.java|DynamicEventTimeSessionWindowsTest.java|DynamicProcessingTimeSessionWindowsTest.java|EmbeddedRocksDBStateBackendTest.java|EventTimeSessionWindowsTest.java|FlinkCalciteCatalogReaderTest.java|FlinkMeterWrapperTest.java|HadoopDataInputStreamTest.java|HadoopInputFormatTest.java|HadoopOutputFormatTest.java|HadoopRecoverableWriterTest.java|HadoopUtilsTest.java|HiveTableSourceITCase.java|HybridSourceReaderTest.java|HybridSourceSplitEnumeratorTest.java|InternalWindowFunctionTest.java|LocalStateForwardingTest.java|MergingWindowSetTest.java|NFAITCase.java|NonBufferOverWindowOperatorTest.java|PatternTest.java|ProcessingTimeSessionWindowsTest.java|PythonOperatorChainingOptimizerTest.java|PythonTestUtils.java|RawFormatSerDeSchemaTest.java|RegisterApplicationMasterResponseReflectorTest.java|RichAsyncFunctionTest.java|RocksDBIncrementalCheckpointUtilsTest.java|RocksDBKeyedStateBackendTestFactory.java|RocksDBStateBackendConfigTest.java|TestS3FileSystemFactory.java|SessionWindowAssignerTest.java|StreamSourceOperatorWatermarksTest.java|StreamTaskCancellationBarrierTest.java|StreamTaskSystemExitTest.java|StreamTaskTest.java|TestPartitionDiscoverer.java|Whitebox.java|WindowOperatorContractTest.java|WindowOperatorTest.java|WindowReaderTest.java"
4747
checks="IllegalImport"/>
4848

4949
<suppress files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/>

0 commit comments

Comments
 (0)