diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c0596e4a92e..02747f53ca6 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -374,9 +374,17 @@ private XceiverClientReply sendCommandWithRetry( } if (blockID != null) { + if (request.getCmdType() != ContainerProtos.Type.ReadChunk) { + datanodeList = pipeline.getNodes(); + int getBlockDNLeaderIndex = datanodeList.indexOf(pipeline.getLeaderNode()); + if (getBlockDNLeaderIndex > 0) { + // Pull the leader DN to the top of the DN list + Collections.swap(datanodeList, 0, getBlockDNLeaderIndex); + } + } // Check if the DN to which the GetBlock command was sent has been cached. DatanodeDetails cachedDN = getBlockDNcache.get(blockID); - if (cachedDN != null) { + if (cachedDN != null && !topologyAwareRead) { datanodeList = pipeline.getNodes(); int getBlockDNCacheIndex = datanodeList.indexOf(cachedDN); if (getBlockDNCacheIndex > 0) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 99095f55b00..9ef0baf72ba 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -88,27 +88,27 @@ public void testCorrectDnsReturnedFromPipeline() throws IOException { @Test @Timeout(5) - public void testRandomFirstNodeIsCommandTarget() throws IOException { - final ArrayList allDNs = new ArrayList<>(dns); + public void testLeaderNodeIsCommandTarget() throws IOException { + final Set seenDN = new HashSet<>(); conf.setBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, false); - // Using a new Xceiver Client, call it repeatedly until all DNs in the - // pipeline have been the target of the command, indicating it is shuffling - // the DNs on each call with a new client. This test will timeout if this - // is not happening. - while (allDNs.size() > 0) { + // Using a new Xceiver Client, make 100 calls and ensure leader node is used + // each time. The logic should always use the leader node, so we can check + // only a single DN is ever seen after 100 calls. + for (int i = 0; i < 100; i++) { try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, DatanodeDetails dn) { - allDNs.remove(dn); + seenDN.add(dn); return buildValidResponse(); } }) { invokeXceiverClientGetBlock(client); } } + assertEquals(1, seenDN.size()); } @Test