Skip to content

Commit acf0e38

Browse files
author
zengqiang.xu
committed
HDFS-16641. [SBN read] Add a metric to quantify the number of re-enter rpcs
1 parent 0af4bb3 commit acf0e38

File tree

4 files changed

+74
-0
lines changed

4 files changed

+74
-0
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3167,6 +3167,7 @@ private void requeueCall(Call call)
31673167
throws IOException, InterruptedException {
31683168
try {
31693169
internalQueueCall(call, false);
3170+
rpcMetrics.incrRequeueCalls();
31703171
} catch (RpcServerException rse) {
31713172
call.doResponse(rse.getCause(), rse.getRpcStatusProto());
31723173
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
128128
MutableCounterLong rpcClientBackoff;
129129
@Metric("Number of Slow RPC calls")
130130
MutableCounterLong rpcSlowCalls;
131+
@Metric("Number of requeue calls")
132+
MutableCounterLong rpcRequeueCalls;
131133

132134
@Metric("Number of open connections") public int numOpenConnections() {
133135
return server.getNumOpenConnections();
@@ -304,6 +306,13 @@ public void incrSlowRpc() {
304306
rpcSlowCalls.incr();
305307
}
306308

309+
/**
310+
* Increments the Requeue Calls counter.
311+
*/
312+
public void incrRequeueCalls() {
313+
rpcRequeueCalls.incr();
314+
}
315+
307316
/**
308317
* Returns a MutableRate Counter.
309318
* @return Mutable Rate
@@ -344,6 +353,15 @@ public long getRpcSlowCalls() {
344353
return rpcSlowCalls.value();
345354
}
346355

356+
/**
357+
* Returns the number of requeue calls;
358+
* @return long
359+
*/
360+
@VisibleForTesting
361+
public long getRpcRequeueCalls() {
362+
return rpcRequeueCalls.value();
363+
}
364+
347365
public MutableRate getDeferredRpcProcessingTime() {
348366
return deferredRpcProcessingTime;
349367
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6993,6 +6993,16 @@ public synchronized void verifyToken(DelegationTokenIdentifier identifier,
69936993
public EditLogTailer getEditLogTailer() {
69946994
return editLogTailer;
69956995
}
6996+
6997+
@VisibleForTesting
6998+
public void startNewEditLogTailer(Configuration conf) throws IOException {
6999+
if (this.editLogTailer != null) {
7000+
this.editLogTailer.stop();
7001+
}
7002+
7003+
this.editLogTailer = new EditLogTailer(this, conf);
7004+
this.editLogTailer.start();
7005+
}
69967006

69977007
@VisibleForTesting
69987008
public void setEditLogTailerForTests(EditLogTailer tailer) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.namenode.ha;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
2021
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
2122
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
2223
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
2324
import static org.junit.Assert.assertEquals;
25+
import static org.junit.Assert.assertNotNull;
2426
import static org.junit.Assert.assertTrue;
2527
import static org.junit.Assert.fail;
2628
import static org.mockito.ArgumentMatchers.any;
@@ -36,7 +38,10 @@
3638
import java.util.List;
3739
import java.util.concurrent.ExecutionException;
3840
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executors;
3942
import java.util.concurrent.Future;
43+
import java.util.concurrent.ScheduledExecutorService;
44+
import java.util.concurrent.ScheduledFuture;
4045
import java.util.concurrent.TimeUnit;
4146

4247
import org.apache.hadoop.conf.Configuration;
@@ -61,9 +66,11 @@
6166
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
6267
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
6368
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
69+
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
6470
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
6571
import org.apache.hadoop.hdfs.tools.GetGroups;
6672
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
73+
import org.apache.hadoop.ipc.metrics.RpcMetrics;
6774
import org.apache.hadoop.util.Time;
6875
import org.apache.hadoop.util.concurrent.HadoopExecutors;
6976
import org.junit.After;
@@ -124,6 +131,44 @@ public static void shutDownCluster() throws IOException {
124131
}
125132
}
126133

134+
@Test
135+
public void testObserverRequeue() throws Exception {
136+
ScheduledExecutorService interruptor =
137+
Executors.newScheduledThreadPool(1);
138+
139+
EditLogTailer observerEditlogTailer = dfsCluster.getNameNode(2)
140+
.getNamesystem().getEditLogTailer();
141+
RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
142+
.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
143+
144+
// Stop EditlogTailer of Observer NameNode.
145+
observerEditlogTailer.stop();
146+
147+
long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();
148+
149+
ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(
150+
() -> {
151+
Path tmpTestPath = new Path("/TestObserverRequeue");
152+
dfs.create(tmpTestPath, (short)1).close();
153+
assertSentTo(0);
154+
// This operation will be blocked in ObserverNameNode
155+
// until EditlogTailer tailed edits from journalNode.
156+
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
157+
assertSentTo(2);
158+
return fileStatus;
159+
}, 0, TimeUnit.SECONDS);
160+
161+
Thread.sleep(1000);
162+
163+
observerEditlogTailer.doTailEdits();
164+
FileStatus fileStatus = scheduledFuture.get(1000, TimeUnit.MILLISECONDS);
165+
assertNotNull(fileStatus);
166+
167+
assertTrue(obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum);
168+
dfsCluster.getNameNode(2).getNamesystem()
169+
.startNewEditLogTailer(dfsCluster.getConfiguration(2));
170+
}
171+
127172
@Test
128173
public void testNoActiveToObserver() throws Exception {
129174
try {

0 commit comments

Comments
 (0)