Skip to content

Commit dd774d8

Browse files
tasanumaHarshitGupta11
authored andcommitted
HDFS-16293. Client sleeps and holds 'dataQueue' when DataNodes are congested. Contributed by Yuanxin Zhu.
1 parent 2c5d639 commit dd774d8

File tree

2 files changed

+89
-5
lines changed

2 files changed

+89
-5
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -687,11 +687,6 @@ public void run() {
687687
continue;
688688
}
689689
// get packet to be sent.
690-
try {
691-
backOffIfNecessary();
692-
} catch (InterruptedException e) {
693-
LOG.debug("Thread interrupted", e);
694-
}
695690
one = dataQueue.getFirst(); // regular data packet
696691
SpanContext[] parents = one.getTraceParents();
697692
if (parents != null && parents.length > 0) {
@@ -704,6 +699,14 @@ public void run() {
704699
}
705700
}
706701

702+
// The DataStreamer has to release the dataQueue before sleeping,
703+
// otherwise it will cause the ResponseProcessor to accept the ACK delay.
704+
try {
705+
backOffIfNecessary();
706+
} catch (InterruptedException e) {
707+
LOG.debug("Thread interrupted", e);
708+
}
709+
707710
// get new block from namenode.
708711
LOG.debug("stage={}, {}", stage, this);
709712

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.Random;
3333
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3435

3536
import org.apache.hadoop.conf.Configuration;
3637
import org.apache.hadoop.fs.CreateFlag;
@@ -300,6 +301,86 @@ public void testCongestionBackoff() throws IOException {
300301
Assert.assertTrue(congestedNodes.isEmpty());
301302
}
302303

304+
@Test(timeout=60000)
305+
public void testCongestionAckDelay() {
306+
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
307+
DFSClient client = mock(DFSClient.class);
308+
when(client.getConf()).thenReturn(dfsClientConf);
309+
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
310+
client.clientRunning = true;
311+
DataStreamer stream = new DataStreamer(
312+
mock(HdfsFileStatus.class),
313+
mock(ExtendedBlock.class),
314+
client,
315+
"foo", null, null, null, null, null, null);
316+
DataOutputStream blockStream = mock(DataOutputStream.class);
317+
Whitebox.setInternalState(stream, "blockStream", blockStream);
318+
Whitebox.setInternalState(stream, "stage",
319+
BlockConstructionStage.PIPELINE_CLOSE);
320+
@SuppressWarnings("unchecked")
321+
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
322+
Whitebox.getInternalState(stream, "dataQueue");
323+
@SuppressWarnings("unchecked")
324+
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
325+
Whitebox.getInternalState(stream, "congestedNodes");
326+
int backOffMaxTime = (int)
327+
Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
328+
DFSPacket[] packet = new DFSPacket[100];
329+
AtomicBoolean isDelay = new AtomicBoolean(true);
330+
331+
// ResponseProcessor needs the dataQueue for the next step.
332+
new Thread(() -> {
333+
for (int i = 0; i < 10; i++) {
334+
// In order to ensure that other threads run for a period of time to prevent affecting
335+
// the results.
336+
try {
337+
Thread.sleep(backOffMaxTime / 50);
338+
} catch (InterruptedException e) {
339+
e.printStackTrace();
340+
}
341+
synchronized (dataQueue) {
342+
congestedNodes.add(mock(DatanodeInfo.class));
343+
// The DataStreamer releases the dataQueue before sleeping, and the ResponseProcessor
344+
// has time to hold the dataQueue to continuously accept ACKs and add congestedNodes
345+
// to the list. Therefore, congestedNodes.size() is greater than 1.
346+
if (congestedNodes.size() > 1){
347+
isDelay.set(false);
348+
try {
349+
doThrow(new IOException()).when(blockStream).flush();
350+
} catch (Exception e) {
351+
e.printStackTrace();
352+
}
353+
}
354+
}
355+
}
356+
try {
357+
doThrow(new IOException()).when(blockStream).flush();
358+
} catch (Exception e) {
359+
e.printStackTrace();
360+
}
361+
// Prevent the DataStreamer from always waiting because the
362+
// dataQueue may be empty, so that the unit test cannot exit.
363+
DFSPacket endPacket = mock(DFSPacket.class);
364+
dataQueue.add(endPacket);
365+
}).start();
366+
367+
// The purpose of adding packets to the dataQueue is to make the DataStreamer run
368+
// normally and judge whether to enter the sleep state according to the congestion.
369+
new Thread(() -> {
370+
for (int i = 0; i < 100; i++) {
371+
packet[i] = mock(DFSPacket.class);
372+
dataQueue.add(packet[i]);
373+
try {
374+
Thread.sleep(backOffMaxTime / 100);
375+
} catch (InterruptedException e) {
376+
e.printStackTrace();
377+
}
378+
}
379+
}).start();
380+
stream.run();
381+
Assert.assertFalse(isDelay.get());
382+
}
383+
303384
@Test
304385
public void testNoLocalWriteFlag() throws IOException {
305386
DistributedFileSystem fs = cluster.getFileSystem();

0 commit comments

Comments
 (0)