Skip to content

Commit fa74091

Browse files
committed
[FLINK-36112][Connector/Filesystem].Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
1 parent 4a28fa0 commit fa74091

File tree

6 files changed

+78
-15
lines changed

6 files changed

+78
-15
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ public T withRollingPolicy(CheckpointRollingPolicy<IN, String> rollingPolicy) {
582582
return self();
583583
}
584584

585-
public T enableNoLocalWriting() {
585+
public T disableLocalWriting() {
586586
this.noLocalWrite = true;
587587
return self();
588588
}

flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.HashSet;
5959
import java.util.Iterator;
6060
import java.util.List;
61+
import java.util.Map;
6162
import java.util.ServiceLoader;
6263
import java.util.Set;
6364
import java.util.concurrent.locks.ReentrantLock;
@@ -668,11 +669,11 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
668669

669670
@PublicEvolving
670671
@Override
671-
public RecoverableWriter createRecoverableWriter(boolean noLocalWrite) throws IOException {
672-
if (noLocalWrite) {
673-
return IFileSystem.super.createRecoverableWriter(noLocalWrite);
674-
} else {
672+
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
673+
if (conf == null || conf.isEmpty()) {
675674
return createRecoverableWriter();
675+
} else {
676+
return IFileSystem.super.createRecoverableWriter(conf);
676677
}
677678
}
678679

flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.FileNotFoundException;
3131
import java.io.IOException;
3232
import java.net.URI;
33+
import java.util.Map;
3334

3435
/**
3536
* Interface of all file systems used by Flink. This interface may be extended to implement
@@ -247,11 +248,12 @@ default RecoverableWriter createRecoverableWriter() throws IOException {
247248
* <p>This method is optional on file systems and various file system implementations may not
248249
* support this method, throwing an {@code UnsupportedOperationException}.
249250
*
250-
* @param b Flag to indicate whether the writer should not write to local storage.
251+
* @param conf Map contains a flag to indicate whether the writer should not write to local
252+
* storage. and can provide more information to instantiate the writer.
251253
* @return A RecoverableWriter for this file system.
252254
* @throws IOException Thrown, if the recoverable writer cannot be instantiated.
253255
*/
254-
default RecoverableWriter createRecoverableWriter(boolean b) throws IOException {
256+
default RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
255257
throw new UnsupportedOperationException(
256258
"This file system does not support recoverable writers that does not write on local machine.");
257259
}

flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.net.URI;
3030
import java.util.Locale;
31+
import java.util.Map;
3132

3233
import static org.apache.flink.util.Preconditions.checkNotNull;
3334

@@ -216,11 +217,18 @@ public RecoverableWriter createRecoverableWriter() throws IOException {
216217
}
217218

218219
@Override
219-
public RecoverableWriter createRecoverableWriter(boolean noLocalWrite) throws IOException {
220+
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException {
220221
// This writer is only supported on a subset of file systems, and on
221222
// specific versions. We check these schemes and versions eagerly for better error
222223
// messages in the constructor of the writer.
223-
return new HadoopRecoverableWriter(fs, noLocalWrite);
224+
if (conf == null || conf.isEmpty()) {
225+
return createRecoverableWriter();
226+
} else if (conf.containsKey("fs.hdfs.no-local-write")) {
227+
return new HadoopRecoverableWriter(
228+
fs, Boolean.parseBoolean(conf.get("fs.hdfs.no-local-write")));
229+
} else {
230+
return new HadoopRecoverableWriter(fs);
231+
}
224232
}
225233

226234
// ------------------------------------------------------------------------

flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
4646
/** The Hadoop file system on which the writer operates. */
4747
protected final org.apache.hadoop.fs.FileSystem fs;
4848

49-
protected final boolean noLocalWrite;
49+
private final boolean noLocalWrite;
5050

5151
/**
5252
* Creates a new Recoverable writer.

flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,37 @@
2121
import org.apache.flink.core.fs.AbstractRecoverableWriterTest;
2222
import org.apache.flink.core.fs.FileSystem;
2323
import org.apache.flink.core.fs.Path;
24+
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
2425
import org.apache.flink.core.fs.RecoverableWriter;
26+
import org.apache.flink.mock.Whitebox;
2527
import org.apache.flink.runtime.util.HadoopUtils;
2628
import org.apache.flink.testutils.junit.utils.TempDirUtils;
2729
import org.apache.flink.util.OperatingSystem;
2830

2931
import org.apache.hadoop.conf.Configuration;
3032
import org.apache.hadoop.hdfs.MiniDFSCluster;
33+
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
34+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
35+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
36+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
37+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
38+
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
39+
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
3140
import org.junit.jupiter.api.AfterAll;
3241
import org.junit.jupiter.api.Assertions;
3342
import org.junit.jupiter.api.BeforeAll;
3443
import org.junit.jupiter.api.Test;
3544
import org.junit.jupiter.api.io.TempDir;
3645

3746
import java.io.File;
47+
import java.util.HashMap;
48+
import java.util.Map;
49+
import java.util.Random;
3850

3951
import static org.assertj.core.api.Assumptions.assumeThat;
52+
import static org.mockito.Mockito.spy;
53+
//import static org.mockito.Mockito.doReturn;
54+
//import static org.mockito.Mockito.spy;
4055

4156
/** Tests for the {@link HadoopRecoverableWriter}. */
4257
class HadoopRecoverableWriterTest extends AbstractRecoverableWriterTest {
@@ -66,8 +81,11 @@ static void createHDFS() throws Exception {
6681

6782
final Configuration hdConf = new Configuration();
6883
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
84+
hdConf.set("dfs.replication", "2");
85+
hdConf.set("dfs.blocksize", String.valueOf(512));
86+
hdConf.set("dfs.namenode.fs-limits.min-block-size", String.valueOf(512));
6987

70-
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
88+
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf).numDataNodes(3);
7189
hdfsCluster = builder.build();
7290

7391
final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
@@ -87,15 +105,49 @@ static void destroyHDFS() throws Exception {
87105
}
88106

89107
private RecoverableWriter getNoLocalWriteFileSystemWriter() throws Exception {
90-
return fileSystem.createRecoverableWriter(true);
108+
Map<String, String> conf = new HashMap<>();
109+
conf.put("fs.hdfs.no-local-write", "true");
110+
return fileSystem.createRecoverableWriter(conf);
91111
}
92112

93113
@Test
94-
void testNoLocalWrite() throws Exception {
114+
void testNoLocalWriteFlag() throws Exception {
115+
createHDFS();
95116
final HadoopRecoverableWriter writer =
96117
(HadoopRecoverableWriter) getNoLocalWriteFileSystemWriter();
97-
98-
Assertions.assertTrue(writer.noLocalWrite);
118+
BlockManager bm = hdfsCluster.getNameNode().getNamesystem().getBlockManager();
119+
DatanodeManager dm = bm.getDatanodeManager();
120+
try (RecoverableFsDataOutputStream os = writer.open(new Path("/tests/test-no-local"))) {
121+
// Inject a DatanodeManager that returns one DataNode as local node for
122+
// the client.
123+
DatanodeManager spyDm = spy(dm);
124+
DatanodeDescriptor dn1 =
125+
dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.LIVE).get(0);
126+
Whitebox.setInternalState(bm, "datanodeManager", spyDm);
127+
byte[] buf = new byte[512 * 16];
128+
new Random().nextBytes(buf);
129+
os.write(buf);
130+
} finally {
131+
Whitebox.setInternalState(bm, "datanodeManager", dm);
132+
}
133+
hdfsCluster.triggerBlockReports();
134+
final String bpid = hdfsCluster.getNamesystem().getBlockPoolId();
135+
// Total number of DataNodes is 3.
136+
Assertions.assertEquals(3, hdfsCluster.getAllBlockReports(bpid).size());
137+
int numDataNodesWithData = 0;
138+
for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks :
139+
hdfsCluster.getAllBlockReports(bpid)) {
140+
System.out.println(dnBlocks);
141+
for (BlockListAsLongs blocks : dnBlocks.values()) {
142+
if (blocks.getNumberOfBlocks() > 0) {
143+
System.out.println(dnBlocks);
144+
numDataNodesWithData++;
145+
break;
146+
}
147+
}
148+
}
149+
// Verify that only one DN has no data.
150+
Assertions.assertEquals(1, 3 - numDataNodesWithData);
99151
}
100152

101153
@Override

0 commit comments

Comments
 (0)