Skip to content

Commit e16cda9

Browse files
authored
Merge branch 'apache:trunk' into HADOOP-18058
2 parents 8711514 + 3bf014d commit e16cda9

File tree

22 files changed

+856
-34
lines changed

22 files changed

+856
-34
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ private void setInputFromSavedData() {
113113
compressedDirectBuf.put(
114114
userBuf, userBufOff, bytesInCompressedBuffer);
115115

116+
// Set the finished to false when compressedDirectBuf still
117+
// contains some bytes.
118+
if (compressedDirectBuf.position() > 0 && finished) {
119+
finished = false;
120+
}
121+
116122
userBufOff += bytesInCompressedBuffer;
117123
userBufferBytesToConsume -= bytesInCompressedBuffer;
118124
}
@@ -186,6 +192,13 @@ public int decompress(byte[] b, int off, int len)
186192
0,
187193
directBufferSize
188194
);
195+
196+
// Set the finished to false when compressedDirectBuf still
197+
// contains some bytes.
198+
if (remaining > 0 && finished) {
199+
finished = false;
200+
}
201+
189202
uncompressedDirectBuf.limit(n);
190203

191204
// Get at most 'len' bytes

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ protected ResponseBuffer initialValue() {
498498
private Map<Integer, Listener> auxiliaryListenerMap;
499499
private Responder responder = null;
500500
private Handler[] handlers = null;
501+
private final AtomicInteger numInProcessHandler = new AtomicInteger();
501502

502503
private boolean logSlowRPC = false;
503504

@@ -509,6 +510,10 @@ protected boolean isLogSlowRPC() {
509510
return logSlowRPC;
510511
}
511512

513+
public int getNumInProcessHandler() {
514+
return numInProcessHandler.get();
515+
}
516+
512517
/**
513518
* Sets slow RPC flag.
514519
* @param logSlowRPCFlag
@@ -3080,6 +3085,7 @@ public void run() {
30803085

30813086
try {
30823087
call = callQueue.take(); // pop the queue; maybe blocked here
3088+
numInProcessHandler.incrementAndGet();
30833089
startTimeNanos = Time.monotonicNowNanos();
30843090
if (alignmentContext != null && call.isCallCoordinated() &&
30853091
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
@@ -3133,6 +3139,7 @@ public void run() {
31333139
}
31343140
} finally {
31353141
CurCall.set(null);
3142+
numInProcessHandler.decrementAndGet();
31363143
IOUtils.cleanupWithLogger(LOG, traceScope);
31373144
if (call != null) {
31383145
updateMetrics(call, startTimeNanos, connDropped);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public static RpcMetrics create(Server server, Configuration conf) {
133133
return server.getNumOpenConnections();
134134
}
135135

136+
@Metric("Number of in process handlers")
137+
public int getNumInProcessHandler() {
138+
return server.getNumInProcessHandler();
139+
}
140+
136141
@Metric("Number of open connections per user")
137142
public String numOpenConnectionsPerUser() {
138143
return server.getNumOpenConnectionsPerUser();
@@ -288,6 +293,7 @@ public void incrClientBackoff() {
288293
public void incrSlowRpc() {
289294
rpcSlowCalls.incr();
290295
}
296+
291297
/**
292298
* Returns a MutableRate Counter.
293299
* @return Mutable Rate

hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
8383
| `RpcAuthorizationFailures` | Total number of authorization failures |
8484
| `RpcAuthorizationSuccesses` | Total number of authorization successes |
8585
| `NumOpenConnections` | Current number of open connections |
86+
| `NumInProcessHandler` | Current number of handlers on working |
8687
| `CallQueueLength` | Current length of the call queue |
8788
| `numDroppedConnections` | Total number of dropped connections |
8889
| `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,65 @@ public void testCompressorDecompressorLogicWithCompressionStreams()
231231
}
232232
}
233233

234+
/**
235+
* Verify decompressor logic with some finish operation in compress.
236+
*/
237+
@Test
238+
public void testCompressorDecompressorWithFinish() throws Exception {
239+
DataOutputStream deflateOut = null;
240+
DataInputStream inflateIn = null;
241+
int byteSize = 1024 * 100;
242+
byte[] bytes = generate(byteSize);
243+
int firstLength = 1024 * 30;
244+
245+
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
246+
try {
247+
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
248+
CompressionOutputStream deflateFilter =
249+
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
250+
bufferSize);
251+
252+
deflateOut =
253+
new DataOutputStream(new BufferedOutputStream(deflateFilter));
254+
255+
// Write some data and finish.
256+
deflateOut.write(bytes, 0, firstLength);
257+
deflateFilter.finish();
258+
deflateOut.flush();
259+
260+
// ResetState then write some data and finish.
261+
deflateFilter.resetState();
262+
deflateOut.write(bytes, firstLength, firstLength);
263+
deflateFilter.finish();
264+
deflateOut.flush();
265+
266+
// ResetState then write some data and finish.
267+
deflateFilter.resetState();
268+
deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
269+
deflateFilter.finish();
270+
deflateOut.flush();
271+
272+
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
273+
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
274+
compressedDataBuffer.getLength());
275+
276+
CompressionInputStream inflateFilter =
277+
new DecompressorStream(deCompressedDataBuffer,
278+
new ZStandardDecompressor(bufferSize), bufferSize);
279+
280+
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
281+
282+
byte[] result = new byte[byteSize];
283+
inflateIn.read(result);
284+
assertArrayEquals(
285+
"original array not equals compress/decompressed array", bytes,
286+
result);
287+
} finally {
288+
IOUtils.closeStream(deflateOut);
289+
IOUtils.closeStream(inflateIn);
290+
}
291+
}
292+
234293
@Test
235294
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
236295
MultithreadedTestUtil.TestContext ctx =

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import org.apache.hadoop.ipc.metrics.RpcMetrics;
2122
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
2223
import org.apache.hadoop.HadoopIllegalArgumentException;
2324
import org.apache.hadoop.conf.Configuration;
@@ -1107,6 +1108,37 @@ public TestRpcService run() {
11071108
}
11081109
}
11091110

1111+
@Test
1112+
public void testNumInProcessHandlerMetrics() throws Exception {
1113+
UserGroupInformation ugi = UserGroupInformation.
1114+
createUserForTesting("user123", new String[0]);
1115+
// use 1 handler so the callq can be plugged
1116+
final Server server = setupTestServer(conf, 1);
1117+
try {
1118+
RpcMetrics rpcMetrics = server.getRpcMetrics();
1119+
assertEquals(0, rpcMetrics.getNumInProcessHandler());
1120+
1121+
ExternalCall<String> call1 = newExtCall(ugi, () -> {
1122+
assertEquals(1, rpcMetrics.getNumInProcessHandler());
1123+
return UserGroupInformation.getCurrentUser().getUserName();
1124+
});
1125+
ExternalCall<Void> call2 = newExtCall(ugi, () -> {
1126+
assertEquals(1, rpcMetrics.getNumInProcessHandler());
1127+
return null;
1128+
});
1129+
1130+
server.queueCall(call1);
1131+
server.queueCall(call2);
1132+
1133+
// Wait for call1 and call2 to enter the handler.
1134+
call1.get();
1135+
call2.get();
1136+
assertEquals(0, rpcMetrics.getNumInProcessHandler());
1137+
} finally {
1138+
server.stop();
1139+
}
1140+
}
1141+
11101142
/**
11111143
* Test RPC backoff by queue full.
11121144
*/

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.LinkedList;
3939
import java.util.List;
4040
import java.util.Map;
41+
import java.util.Set;
4142
import java.util.concurrent.TimeUnit;
4243
import java.util.concurrent.atomic.AtomicBoolean;
4344
import java.util.concurrent.atomic.AtomicReference;
@@ -525,11 +526,13 @@ boolean doWaitForRestart() {
525526
// List of congested data nodes. The stream will back off if the DataNodes
526527
// are congested
527528
private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
529+
private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>();
528530
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
529531
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
530532
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
531533
private int lastCongestionBackoffTime;
532534
private int maxPipelineRecoveryRetries;
535+
private int markSlowNodeAsBadNodeThreshold;
533536

534537
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
535538
private final String[] favoredNodes;
@@ -559,6 +562,7 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
559562
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
560563
this.addBlockFlags = flags;
561564
this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
565+
this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold();
562566
}
563567

564568
/**
@@ -1155,13 +1159,18 @@ public void run() {
11551159
long seqno = ack.getSeqno();
11561160
// processes response status from datanodes.
11571161
ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
1162+
ArrayList<DatanodeInfo> slownodesFromAck = new ArrayList<>();
11581163
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
11591164
final Status reply = PipelineAck.getStatusFromHeader(ack
11601165
.getHeaderFlag(i));
11611166
if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
11621167
PipelineAck.ECN.CONGESTED) {
11631168
congestedNodesFromAck.add(targets[i]);
11641169
}
1170+
if (PipelineAck.getSLOWFromHeader(ack.getHeaderFlag(i)) ==
1171+
PipelineAck.SLOW.SLOW) {
1172+
slownodesFromAck.add(targets[i]);
1173+
}
11651174
// Restart will not be treated differently unless it is
11661175
// the local node or the only one in the pipeline.
11671176
if (PipelineAck.isRestartOOBStatus(reply)) {
@@ -1191,6 +1200,16 @@ public void run() {
11911200
}
11921201
}
11931202

1203+
if (slownodesFromAck.isEmpty()) {
1204+
if (!slowNodeMap.isEmpty()) {
1205+
slowNodeMap.clear();
1206+
}
1207+
} else {
1208+
markSlowNode(slownodesFromAck);
1209+
LOG.debug("SlowNodeMap content: {}.", slowNodeMap);
1210+
}
1211+
1212+
11941213
assert seqno != PipelineAck.UNKOWN_SEQNO :
11951214
"Ack for unknown seqno should be a failed ack: " + ack;
11961215
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
@@ -1257,10 +1276,51 @@ public void run() {
12571276
}
12581277
}
12591278

1279+
void markSlowNode(List<DatanodeInfo> slownodesFromAck) throws IOException {
1280+
Set<DatanodeInfo> discontinuousNodes = new HashSet<>(slowNodeMap.keySet());
1281+
for (DatanodeInfo slowNode : slownodesFromAck) {
1282+
if (!slowNodeMap.containsKey(slowNode)) {
1283+
slowNodeMap.put(slowNode, 1);
1284+
} else {
1285+
int oldCount = slowNodeMap.get(slowNode);
1286+
slowNodeMap.put(slowNode, ++oldCount);
1287+
}
1288+
discontinuousNodes.remove(slowNode);
1289+
}
1290+
for (DatanodeInfo discontinuousNode : discontinuousNodes) {
1291+
slowNodeMap.remove(discontinuousNode);
1292+
}
1293+
1294+
if (!slowNodeMap.isEmpty()) {
1295+
for (Map.Entry<DatanodeInfo, Integer> entry : slowNodeMap.entrySet()) {
1296+
if (entry.getValue() >= markSlowNodeAsBadNodeThreshold) {
1297+
DatanodeInfo slowNode = entry.getKey();
1298+
int index = getDatanodeIndex(slowNode);
1299+
if (index >= 0) {
1300+
errorState.setBadNodeIndex(index);
1301+
throw new IOException("Receive reply from slowNode " + slowNode +
1302+
" for continuous " + markSlowNodeAsBadNodeThreshold +
1303+
" times, treating it as badNode");
1304+
}
1305+
slowNodeMap.remove(entry.getKey());
1306+
}
1307+
}
1308+
}
1309+
}
1310+
12601311
void close() {
12611312
responderClosed = true;
12621313
this.interrupt();
12631314
}
1315+
1316+
int getDatanodeIndex(DatanodeInfo datanodeInfo) {
1317+
for (int i = 0; i < targets.length; i++) {
1318+
if (targets[i].equals(datanodeInfo)) {
1319+
return i;
1320+
}
1321+
}
1322+
return -1;
1323+
}
12641324
}
12651325

12661326
private boolean shouldHandleExternalError(){

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ public interface HdfsClientConfigKeys {
154154
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
155155
"dfs.client.slow.io.warning.threshold.ms";
156156
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
157+
String DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY =
158+
"dfs.client.mark.slownode.as.badnode.threshold";
159+
int DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT = 10;
157160
String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
158161
"dfs.client.key.provider.cache.expiry";
159162
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
6161
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
6262
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
63+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT;
64+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY;
6365
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
6466
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
6567
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
@@ -142,6 +144,7 @@ public class DfsClientConf {
142144
private final int retryIntervalForGetLastBlockLength;
143145
private final long datanodeRestartTimeout;
144146
private final long slowIoWarningThresholdMs;
147+
private final int markSlowNodeAsBadNodeThreshold;
145148

146149
/** wait time window before refreshing blocklocation for inputstream. */
147150
private final long refreshReadBlockLocationsMS;
@@ -261,6 +264,9 @@ public DfsClientConf(Configuration conf) {
261264
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
262265
readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY,
263266
DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT);
267+
markSlowNodeAsBadNodeThreshold = conf.getInt(
268+
DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_KEY,
269+
DFS_CLIENT_MARK_SLOWNODE_AS_BADNODE_THRESHOLD_DEFAULT);
264270

265271
refreshReadBlockLocationsMS = conf.getLong(
266272
HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
@@ -644,6 +650,13 @@ public long getSlowIoWarningThresholdMs() {
644650
return slowIoWarningThresholdMs;
645651
}
646652

653+
/**
654+
* @return the continuous slowNode replies received to mark slowNode as badNode
655+
*/
656+
public int getMarkSlowNodeAsBadNodeThreshold() {
657+
return markSlowNodeAsBadNodeThreshold;
658+
}
659+
647660
/*
648661
* @return the clientShortCircuitNum
649662
*/

0 commit comments

Comments
 (0)