From 93db4ffdc04c9f4d8992b008d794b284d1de6b1f Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 18 Oct 2024 15:29:41 -0700 Subject: [PATCH 01/12] If OM's Hadoop RPC handler receives a request that has been executed before, return its cached result. Change-Id: I132f403464f3eab3cde0fa0f9efe8b0cb7ddb2cc (cherry picked from commit 1dfd0dad320eea24798783291d4b7b308d84e88c) --- ...ManagerProtocolServerSideTranslatorPB.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 4506337e54d..1ae45e96c9e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -23,16 +23,19 @@ import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; +import com.sun.corba.se.pept.protocol.ClientInvocationInfo; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ipc.ProcessingDetails.Timing; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; @@ -40,6 +43,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -57,7 +61,11 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.ozone.security.S3SecurityUtil; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; @@ -215,6 +223,28 @@ private OMResponse internalProcessRequest(OMRequest request) throws // Added the assertion. assert (omClientRequest != null); OMClientRequest finalOmClientRequest = omClientRequest; + + ClientInvocationId clientInvocationId = ClientInvocationId.valueOf( + ClientId.valueOf(UUID.nameUUIDFromBytes(ProtobufRpcEngine.Server.getClientId())), + ProtobufRpcEngine.Server.getCallId()); + RetryCache.Entry cacheEntry = + omRatisServer.getServerDivision().getRetryCache().getIfPresent(clientInvocationId); + // if this request has been executed before, return the cached result. + if (cacheEntry != null) { + if (cacheEntry.getReplyFuture().isDone()) { + RaftClientReply raftClientReply; + try { + raftClientReply = cacheEntry.getReplyFuture().get(); + } catch (ExecutionException ex) { + throw new ServiceException(ex.getMessage(), ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceException(ex.getMessage(), ex); + } + return OMRatisHelper.getOMResponseFromRaftClientReply(raftClientReply); + } + } + requestToSubmit = preExecute(finalOmClientRequest); this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { From ac5e3e3926ac98ef421e6ab560212932e026960c Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 18 Oct 2024 16:21:31 -0700 Subject: [PATCH 02/12] Fix checkstyle. Change-Id: If6f8df09ca611911ebfd6c67d6b0f1df12628cd7 --- .../protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 1ae45e96c9e..17feddfd3f7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; -import com.sun.corba.se.pept.protocol.ClientInvocationInfo; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; From 1439f357dde2ddc8aab99ea862687d398f3e2cfa Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 14:00:10 -0700 Subject: [PATCH 03/12] Test case. Change-Id: Idf92c2b6ea787e0134fb4e8415bb237d2997aca7 --- .../TestOzoneManagerHAWithStoppedNodes.java | 79 ++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index 6937c52c712..55b4887f9b2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.om; +import com.google.protobuf.ServiceException; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -38,12 +39,15 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.service.KeyDeletingService; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.tag.Flaky; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeDuration; import org.junit.jupiter.api.AfterEach; @@ -52,6 +56,7 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.util.HashMap; @@ -71,6 +76,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -80,7 +86,8 @@ @Flaky("HDDS-11352") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TestOzoneManagerHAWithStoppedNodes extends TestOzoneManagerHA { - + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( + TestOzoneManagerHAWithStoppedNodes.class); /** * After restarting OMs we need to wait * for a leader to be elected and ready. @@ -594,6 +601,76 @@ void testListVolumes() throws Exception { objectStore.listVolumesByUser(userName, prefix, "")); } + @Test + void testRetryCacheWithDownedOM() throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + getObjectStore().createVolume(volumeName, createVolumeArgs); + + OzoneVolume retVolumeinfo = getObjectStore().getVolume(volumeName); + + String bucketName = UUID.randomUUID().toString(); + + retVolumeinfo.createBucket(bucketName); + + String keyFrom = UUID.randomUUID().toString(); + String keyTo = UUID.randomUUID().toString(); + + long runCount = 10; + ClientId clientId = ClientId.randomId(); + MiniOzoneHAClusterImpl cluster = getCluster(); + OzoneManager omLeader = cluster.getOMLeader(); + + OzoneManagerProtocolProtos.KeyArgs keyArgs = + OzoneManagerProtocolProtos.KeyArgs.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyFrom) + .build(); + OzoneManagerProtocolProtos.RenameKeyRequest renameKeyRequest + = OzoneManagerProtocolProtos.RenameKeyRequest.newBuilder() + .setKeyArgs(keyArgs) + .setToKeyName(keyTo) + .build(); + OzoneManagerProtocolProtos.OMRequest omRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.RenameKey) + .setRenameKeyRequest(renameKeyRequest) + .setClientId(clientId.toString()) + .build(); + // Submit Purge paths request to OM + OzoneManagerProtocolProtos.OMResponse omResponse = + OzoneManagerRatisUtils.submitRequest(omLeader, omRequest, clientId, runCount); + assertTrue(omResponse.getSuccess()); + Thread.sleep(10000); + + cluster.shutdownOzoneManager(omLeader); + OzoneManager newOmLeader = null; + while (true) { + Thread.sleep(10000); + for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) { + if (ozoneManager.isLeaderReady()) { + if (!omLeader.getOMNodeId().equals(ozoneManager.getOMNodeId())) { + newOmLeader = ozoneManager; + break; + } + } + } + if (newOmLeader != null) { + break; + } + } + + omResponse = OzoneManagerRatisUtils.submitRequest(newOmLeader, omRequest, clientId, runCount); + assertTrue(omResponse.getSuccess()); + } + private void validateVolumesList(Set expectedVolumes, Iterator volumeIterator) { int expectedCount = 0; From d39f6a9a104536e516c9d4bcf22f44dfbb56d8e3 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 14:48:58 -0700 Subject: [PATCH 04/12] Update test code. Change-Id: I22600fd65514c7b14d8794d25df418b7a3291bf1 --- .../TestOzoneManagerHAWithStoppedNodes.java | 88 ++++++++++++------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index 55b4887f9b2..3015a1f23ed 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -21,6 +21,8 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdfs.LogVerificationAppender; @@ -34,6 +36,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMHAMetrics; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; @@ -46,8 +49,12 @@ import org.apache.log4j.Logger; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.tag.Flaky; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeDuration; import org.junit.jupiter.api.AfterEach; @@ -58,6 +65,7 @@ import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.ConnectException; import java.util.HashMap; import java.util.Iterator; @@ -66,6 +74,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT; @@ -603,26 +613,24 @@ void testListVolumes() throws Exception { @Test void testRetryCacheWithDownedOM() throws Exception { + // Create a volume, a bucket and a key String userName = "user" + RandomStringUtils.randomNumeric(5); String adminName = "admin" + RandomStringUtils.randomNumeric(5); String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = UUID.randomUUID().toString(); + String keyTo = UUID.randomUUID().toString(); VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() .setOwner(userName) .setAdmin(adminName) .build(); getObjectStore().createVolume(volumeName, createVolumeArgs); + OzoneVolume ozoneVolume = getObjectStore().getVolume(volumeName); + ozoneVolume.createBucket(bucketName); + OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName); + String keyFrom = createKey(ozoneBucket); - OzoneVolume retVolumeinfo = getObjectStore().getVolume(volumeName); - - String bucketName = UUID.randomUUID().toString(); - - retVolumeinfo.createBucket(bucketName); - - String keyFrom = UUID.randomUUID().toString(); - String keyTo = UUID.randomUUID().toString(); - - long runCount = 10; + long callId = 10; ClientId clientId = ClientId.randomId(); MiniOzoneHAClusterImpl cluster = getCluster(); OzoneManager omLeader = cluster.getOMLeader(); @@ -644,33 +652,53 @@ void testRetryCacheWithDownedOM() throws Exception { .setRenameKeyRequest(renameKeyRequest) .setClientId(clientId.toString()) .build(); - // Submit Purge paths request to OM + // Submit rename request to OM OzoneManagerProtocolProtos.OMResponse omResponse = - OzoneManagerRatisUtils.submitRequest(omLeader, omRequest, clientId, runCount); + OzoneManagerRatisUtils.submitRequest(omLeader, omRequest, clientId, callId); assertTrue(omResponse.getSuccess()); - Thread.sleep(10000); + // Make one of the follower OM the leader, and shutdown the current leader. + OzoneManager newLeader = cluster.getOzoneManagersList().stream().filter( + om -> !om.getOMNodeId().equals(omLeader.getOMNodeId())).findFirst().get(); + transferLeader(omLeader, newLeader); cluster.shutdownOzoneManager(omLeader); - OzoneManager newOmLeader = null; - while (true) { - Thread.sleep(10000); - for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) { - if (ozoneManager.isLeaderReady()) { - if (!omLeader.getOMNodeId().equals(ozoneManager.getOMNodeId())) { - newOmLeader = ozoneManager; - break; - } - } - } - if (newOmLeader != null) { - break; - } - } - - omResponse = OzoneManagerRatisUtils.submitRequest(newOmLeader, omRequest, clientId, runCount); + + // Once the rename completes, the source key should no longer exist + // and the destination key should exist. + OMException omException = assertThrows(OMException.class, + () -> ozoneBucket.getKey(keyFrom)); + assertEquals(omException.getResult(), OMException.ResultCodes.KEY_NOT_FOUND); + assertTrue(ozoneBucket.getKey(keyTo).isFile()); + + // Submit rename request to OM again. The request is cached so it will succeed. + omResponse = OzoneManagerRatisUtils.submitRequest(newLeader, omRequest, clientId, callId); assertTrue(omResponse.getSuccess()); } + private void transferLeader(OzoneManager omLeader, OzoneManager newLeader) throws IOException { + LOG.info("Transfer leadership from {}(raft id {}) to {}(raft id {})", + omLeader.getOMNodeId(), omLeader.getOmRatisServer().getRaftPeerId(), + newLeader.getOMNodeId(), newLeader.getOmRatisServer().getRaftPeerId()); + + final SupportedRpcType rpc = SupportedRpcType.GRPC; + final RaftProperties properties = RatisHelper.newRaftProperties(rpc); + + // For now not making anything configurable, RaftClient is only used + // in SCM for DB updates of sub-ca certs go via Ratis. + RaftClient.Builder builder = RaftClient.newBuilder() + .setRaftGroup(omLeader.getOmRatisServer().getRaftGroup()) + .setLeaderId(null) + .setProperties(properties) + .setRetryPolicy( + RetryPolicies.retryUpToMaximumCountWithFixedSleep(120, + TimeDuration.valueOf(500, TimeUnit.MILLISECONDS))); + try (RaftClient raftClient = builder.build()) { + RaftClientReply reply = raftClient.admin().transferLeadership(newLeader.getOmRatisServer() + .getRaftPeerId(), 10 * 1000); + assertTrue(reply.isSuccess()); + } + } + private void validateVolumesList(Set expectedVolumes, Iterator volumeIterator) { int expectedCount = 0; From 0b409b397b30208f12cbc02dc67ebdff8ea9f033 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 14:50:22 -0700 Subject: [PATCH 05/12] Checkstyle Change-Id: I605ab87723b4f92f0a65d6d88b03959150ed1ac0 --- .../hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index 3015a1f23ed..1ff5ecfce52 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -16,13 +16,11 @@ */ package org.apache.hadoop.ozone.om; -import com.google.protobuf.ServiceException; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdfs.LogVerificationAppender; @@ -74,7 +72,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; From 9e6b523fa09725892ee88f0bfba7d9dfd63b5ca9 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 15:16:39 -0700 Subject: [PATCH 06/12] Submit request directly through OM Server Protocol handler. Change-Id: Iea4a45478985e5e2cfd4925e94543afc0b7397b6 --- .../ozone/om/TestOzoneManagerHAWithStoppedNodes.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index 1ff5ecfce52..baa5f765c3e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -627,7 +629,7 @@ void testRetryCacheWithDownedOM() throws Exception { OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName); String keyFrom = createKey(ozoneBucket); - long callId = 10; + int callId = 10; ClientId clientId = ClientId.randomId(); MiniOzoneHAClusterImpl cluster = getCluster(); OzoneManager omLeader = cluster.getOMLeader(); @@ -649,9 +651,12 @@ void testRetryCacheWithDownedOM() throws Exception { .setRenameKeyRequest(renameKeyRequest) .setClientId(clientId.toString()) .build(); + // set up the current call so that OM Ratis Server doesn't complain. + Server.getCurCall().set(new Server.Call(callId, 0, null, null, + RPC.RpcKind.RPC_BUILTIN, clientId.toByteString().toByteArray())); // Submit rename request to OM OzoneManagerProtocolProtos.OMResponse omResponse = - OzoneManagerRatisUtils.submitRequest(omLeader, omRequest, clientId, callId); + omLeader.getOmServerProtocol().processRequest(omRequest); assertTrue(omResponse.getSuccess()); // Make one of the follower OM the leader, and shutdown the current leader. @@ -668,7 +673,7 @@ void testRetryCacheWithDownedOM() throws Exception { assertTrue(ozoneBucket.getKey(keyTo).isFile()); // Submit rename request to OM again. The request is cached so it will succeed. - omResponse = OzoneManagerRatisUtils.submitRequest(newLeader, omRequest, clientId, callId); + omResponse = newLeader.getOmServerProtocol().processRequest(omRequest); assertTrue(omResponse.getSuccess()); } From 9c0ddbadfe199ce9f8720411a37874240aab28c3 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 15:36:27 -0700 Subject: [PATCH 07/12] Checkstyle Change-Id: I319764e6a6da4ccec4d07823110368c32d786fb9 --- .../hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index baa5f765c3e..542d70ce93e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -42,7 +42,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.security.UserGroupInformation; From 2e728fdadca256823fbfae2559b116dc1398bb99 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 17:09:55 -0700 Subject: [PATCH 08/12] Apply Nichola's update Change-Id: If2434f6c3a19e41fd04bde2f2842fe382d524e27 --- .../om/ratis/OzoneManagerRatisServer.java | 51 +++++++++++++++---- ...ManagerProtocolServerSideTranslatorPB.java | 25 +++++---- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index af4d42ad68a..9f187dd0219 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -71,6 +71,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; @@ -87,6 +88,7 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.util.LifeCycle; @@ -460,16 +462,11 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) { * ratis server. */ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { - if (!ozoneManager.isTestSecureOmFlag()) { - Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID); - Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID); - } return RaftClientRequest.newBuilder() - .setClientId( - ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId()))) + .setClientId(getClientId()) .setServerId(server.getId()) .setGroupId(raftGroupId) - .setCallId(Server.getCallId()) + .setCallId(getCallId()) .setMessage( Message.valueOf( OMRatisHelper.convertRequestToByteString(omRequest))) @@ -477,6 +474,39 @@ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { .build(); } + private ClientId getClientId() { + final byte[] clientIdBytes = Server.getClientId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID); + } + return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes)); + } + + private long getCallId() { + final long callId = Server.getCallId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(callId != INVALID_CALL_ID); + } + return callId; + } + + public OMResponse checkRetryCache() throws ServiceException { + final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId()); + final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId); + if (cacheEntry == null) { + return null; //cache miss + } + //cache hit + try { + return getOMResponse(cacheEntry.getReplyFuture().get()); + } catch (ExecutionException ex) { + throw new ServiceException(ex.getMessage(), ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceException(ex.getMessage(), ex); + } + } + /** * Process the raftClientReply and return OMResponse. * @param omRequest @@ -538,6 +568,10 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, } } + return getOMResponse(reply); + } + + private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { return OMRatisHelper.getOMResponseFromRaftClientReply(reply); } catch (IOException ex) { @@ -547,9 +581,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, throw new ServiceException(ex); } } - - // TODO: Still need to handle RaftRetry failure exception and - // NotReplicated exception. } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 17feddfd3f7..b678233c6b1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -183,9 +183,7 @@ public OMResponse processRequest(OMRequest request) throws ServiceException { return response; } - private OMResponse internalProcessRequest(OMRequest request) throws - ServiceException { - OMClientRequest omClientRequest = null; + private OMResponse internalProcessRequest(OMRequest request) throws ServiceException { boolean s3Auth = false; try { @@ -214,7 +212,16 @@ private OMResponse internalProcessRequest(OMRequest request) throws if (!s3Auth) { OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); } - OMRequest requestToSubmit; + + // check retry cache + final OMResponse cached = omRatisServer.checkRetryCache(); + if (cached != null) { + return cached; + } + + // process new request + OMClientRequest omClientRequest = null; + final OMRequest requestToSubmit; try { omClientRequest = createClientRequest(request, ozoneManager); // TODO: Note: Due to HDDS-6055, createClientRequest() could now @@ -254,7 +261,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws return createErrorResponse(request, ex); } - OMResponse response = submitRequestToRatis(requestToSubmit); + final OMResponse response = omRatisServer.submitRequest(request); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } @@ -275,14 +282,6 @@ public OMRequest getLastRequestToSubmit() { return lastRequestToSubmit; } - /** - * Submits request to OM's Ratis server. - */ - private OMResponse submitRequestToRatis(OMRequest request) - throws ServiceException { - return omRatisServer.submitRequest(request); - } - private OMResponse submitReadRequestToOM(OMRequest request) throws ServiceException { // Check if this OM is the leader. From bbe2fce04ca177823791328e6ecd00bfd92c45f8 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 17:13:45 -0700 Subject: [PATCH 09/12] Remove duplicated code. Change-Id: If91b396305508424423bb9fd360da332b24c42ba --- ...ManagerProtocolServerSideTranslatorPB.java | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index b678233c6b1..237c2a34bcf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -34,7 +33,6 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ipc.ProcessingDetails.Timing; -import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; @@ -42,7 +40,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -60,11 +57,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.ozone.security.S3SecurityUtil; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.ClientInvocationId; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; @@ -230,27 +223,6 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep assert (omClientRequest != null); OMClientRequest finalOmClientRequest = omClientRequest; - ClientInvocationId clientInvocationId = ClientInvocationId.valueOf( - ClientId.valueOf(UUID.nameUUIDFromBytes(ProtobufRpcEngine.Server.getClientId())), - ProtobufRpcEngine.Server.getCallId()); - RetryCache.Entry cacheEntry = - omRatisServer.getServerDivision().getRetryCache().getIfPresent(clientInvocationId); - // if this request has been executed before, return the cached result. - if (cacheEntry != null) { - if (cacheEntry.getReplyFuture().isDone()) { - RaftClientReply raftClientReply; - try { - raftClientReply = cacheEntry.getReplyFuture().get(); - } catch (ExecutionException ex) { - throw new ServiceException(ex.getMessage(), ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new ServiceException(ex.getMessage(), ex); - } - return OMRatisHelper.getOMResponseFromRaftClientReply(raftClientReply); - } - } - requestToSubmit = preExecute(finalOmClientRequest); this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { From 15342482415cf6cf4c553d6b1c04ad9699fb045c Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 28 Oct 2024 12:58:36 -0700 Subject: [PATCH 10/12] Revert "Remove duplicated code." This reverts commit bbe2fce04ca177823791328e6ecd00bfd92c45f8. --- ...ManagerProtocolServerSideTranslatorPB.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 237c2a34bcf..b678233c6b1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ipc.ProcessingDetails.Timing; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; @@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -57,7 +60,11 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.ozone.security.S3SecurityUtil; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; @@ -223,6 +230,27 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep assert (omClientRequest != null); OMClientRequest finalOmClientRequest = omClientRequest; + ClientInvocationId clientInvocationId = ClientInvocationId.valueOf( + ClientId.valueOf(UUID.nameUUIDFromBytes(ProtobufRpcEngine.Server.getClientId())), + ProtobufRpcEngine.Server.getCallId()); + RetryCache.Entry cacheEntry = + omRatisServer.getServerDivision().getRetryCache().getIfPresent(clientInvocationId); + // if this request has been executed before, return the cached result. + if (cacheEntry != null) { + if (cacheEntry.getReplyFuture().isDone()) { + RaftClientReply raftClientReply; + try { + raftClientReply = cacheEntry.getReplyFuture().get(); + } catch (ExecutionException ex) { + throw new ServiceException(ex.getMessage(), ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceException(ex.getMessage(), ex); + } + return OMRatisHelper.getOMResponseFromRaftClientReply(raftClientReply); + } + } + requestToSubmit = preExecute(finalOmClientRequest); this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { From 31544409632b34ea8da18a55730ca03f296472f1 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 28 Oct 2024 12:58:43 -0700 Subject: [PATCH 11/12] Revert "Apply Nichola's update" This reverts commit 2e728fdadca256823fbfae2559b116dc1398bb99. --- .../om/ratis/OzoneManagerRatisServer.java | 51 ++++--------------- ...ManagerProtocolServerSideTranslatorPB.java | 25 ++++----- 2 files changed, 23 insertions(+), 53 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 9f187dd0219..af4d42ad68a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -71,7 +71,6 @@ import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; @@ -88,7 +87,6 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.util.LifeCycle; @@ -462,11 +460,16 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) { * ratis server. */ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID); + Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID); + } return RaftClientRequest.newBuilder() - .setClientId(getClientId()) + .setClientId( + ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId()))) .setServerId(server.getId()) .setGroupId(raftGroupId) - .setCallId(getCallId()) + .setCallId(Server.getCallId()) .setMessage( Message.valueOf( OMRatisHelper.convertRequestToByteString(omRequest))) @@ -474,39 +477,6 @@ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { .build(); } - private ClientId getClientId() { - final byte[] clientIdBytes = Server.getClientId(); - if (!ozoneManager.isTestSecureOmFlag()) { - Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID); - } - return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes)); - } - - private long getCallId() { - final long callId = Server.getCallId(); - if (!ozoneManager.isTestSecureOmFlag()) { - Preconditions.checkArgument(callId != INVALID_CALL_ID); - } - return callId; - } - - public OMResponse checkRetryCache() throws ServiceException { - final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId()); - final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId); - if (cacheEntry == null) { - return null; //cache miss - } - //cache hit - try { - return getOMResponse(cacheEntry.getReplyFuture().get()); - } catch (ExecutionException ex) { - throw new ServiceException(ex.getMessage(), ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new ServiceException(ex.getMessage(), ex); - } - } - /** * Process the raftClientReply and return OMResponse. * @param omRequest @@ -568,10 +538,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, } } - return getOMResponse(reply); - } - - private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { return OMRatisHelper.getOMResponseFromRaftClientReply(reply); } catch (IOException ex) { @@ -581,6 +547,9 @@ private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException throw new ServiceException(ex); } } + + // TODO: Still need to handle RaftRetry failure exception and + // NotReplicated exception. } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index b678233c6b1..17feddfd3f7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -183,7 +183,9 @@ public OMResponse processRequest(OMRequest request) throws ServiceException { return response; } - private OMResponse internalProcessRequest(OMRequest request) throws ServiceException { + private OMResponse internalProcessRequest(OMRequest request) throws + ServiceException { + OMClientRequest omClientRequest = null; boolean s3Auth = false; try { @@ -212,16 +214,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep if (!s3Auth) { OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); } - - // check retry cache - final OMResponse cached = omRatisServer.checkRetryCache(); - if (cached != null) { - return cached; - } - - // process new request - OMClientRequest omClientRequest = null; - final OMRequest requestToSubmit; + OMRequest requestToSubmit; try { omClientRequest = createClientRequest(request, ozoneManager); // TODO: Note: Due to HDDS-6055, createClientRequest() could now @@ -261,7 +254,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep return createErrorResponse(request, ex); } - final OMResponse response = omRatisServer.submitRequest(request); + OMResponse response = submitRequestToRatis(requestToSubmit); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } @@ -282,6 +275,14 @@ public OMRequest getLastRequestToSubmit() { return lastRequestToSubmit; } + /** + * Submits request to OM's Ratis server. + */ + private OMResponse submitRequestToRatis(OMRequest request) + throws ServiceException { + return omRatisServer.submitRequest(request); + } + private OMResponse submitReadRequestToOM(OMRequest request) throws ServiceException { // Check if this OM is the leader. From 63b4efee5c8d07d3028e06a887942f97f6bd926b Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 28 Oct 2024 15:04:42 -0700 Subject: [PATCH 12/12] Fix test error. Change-Id: Iaf0a88d32316f6ada585612b4fb393910261b638 --- .../om/ratis/OzoneManagerRatisServer.java | 51 ++++++++++++++---- ...ManagerProtocolServerSideTranslatorPB.java | 53 +++++-------------- 2 files changed, 53 insertions(+), 51 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index af4d42ad68a..9f187dd0219 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -71,6 +71,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; @@ -87,6 +88,7 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.util.LifeCycle; @@ -460,16 +462,11 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) { * ratis server. */ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { - if (!ozoneManager.isTestSecureOmFlag()) { - Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID); - Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID); - } return RaftClientRequest.newBuilder() - .setClientId( - ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId()))) + .setClientId(getClientId()) .setServerId(server.getId()) .setGroupId(raftGroupId) - .setCallId(Server.getCallId()) + .setCallId(getCallId()) .setMessage( Message.valueOf( OMRatisHelper.convertRequestToByteString(omRequest))) @@ -477,6 +474,39 @@ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { .build(); } + private ClientId getClientId() { + final byte[] clientIdBytes = Server.getClientId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID); + } + return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes)); + } + + private long getCallId() { + final long callId = Server.getCallId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(callId != INVALID_CALL_ID); + } + return callId; + } + + public OMResponse checkRetryCache() throws ServiceException { + final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId()); + final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId); + if (cacheEntry == null) { + return null; //cache miss + } + //cache hit + try { + return getOMResponse(cacheEntry.getReplyFuture().get()); + } catch (ExecutionException ex) { + throw new ServiceException(ex.getMessage(), ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceException(ex.getMessage(), ex); + } + } + /** * Process the raftClientReply and return OMResponse. * @param omRequest @@ -538,6 +568,10 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, } } + return getOMResponse(reply); + } + + private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { return OMRatisHelper.getOMResponseFromRaftClientReply(reply); } catch (IOException ex) { @@ -547,9 +581,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, throw new ServiceException(ex); } } - - // TODO: Still need to handle RaftRetry failure exception and - // NotReplicated exception. } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 17feddfd3f7..6b55b7384bd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -34,7 +33,6 @@ import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ipc.ProcessingDetails.Timing; -import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; @@ -42,7 +40,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; @@ -60,11 +57,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.ozone.security.S3SecurityUtil; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.ClientInvocationId; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; @@ -183,9 +176,7 @@ public OMResponse processRequest(OMRequest request) throws ServiceException { return response; } - private OMResponse internalProcessRequest(OMRequest request) throws - ServiceException { - OMClientRequest omClientRequest = null; + private OMResponse internalProcessRequest(OMRequest request) throws ServiceException { boolean s3Auth = false; try { @@ -214,7 +205,16 @@ private OMResponse internalProcessRequest(OMRequest request) throws if (!s3Auth) { OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); } - OMRequest requestToSubmit; + + // check retry cache + final OMResponse cached = omRatisServer.checkRetryCache(); + if (cached != null) { + return cached; + } + + // process new request + OMClientRequest omClientRequest = null; + final OMRequest requestToSubmit; try { omClientRequest = createClientRequest(request, ozoneManager); // TODO: Note: Due to HDDS-6055, createClientRequest() could now @@ -223,27 +223,6 @@ private OMResponse internalProcessRequest(OMRequest request) throws assert (omClientRequest != null); OMClientRequest finalOmClientRequest = omClientRequest; - ClientInvocationId clientInvocationId = ClientInvocationId.valueOf( - ClientId.valueOf(UUID.nameUUIDFromBytes(ProtobufRpcEngine.Server.getClientId())), - ProtobufRpcEngine.Server.getCallId()); - RetryCache.Entry cacheEntry = - omRatisServer.getServerDivision().getRetryCache().getIfPresent(clientInvocationId); - // if this request has been executed before, return the cached result. - if (cacheEntry != null) { - if (cacheEntry.getReplyFuture().isDone()) { - RaftClientReply raftClientReply; - try { - raftClientReply = cacheEntry.getReplyFuture().get(); - } catch (ExecutionException ex) { - throw new ServiceException(ex.getMessage(), ex); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new ServiceException(ex.getMessage(), ex); - } - return OMRatisHelper.getOMResponseFromRaftClientReply(raftClientReply); - } - } - requestToSubmit = preExecute(finalOmClientRequest); this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { @@ -254,7 +233,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws return createErrorResponse(request, ex); } - OMResponse response = submitRequestToRatis(requestToSubmit); + final OMResponse response = omRatisServer.submitRequest(requestToSubmit); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } @@ -275,14 +254,6 @@ public OMRequest getLastRequestToSubmit() { return lastRequestToSubmit; } - /** - * Submits request to OM's Ratis server. - */ - private OMResponse submitRequestToRatis(OMRequest request) - throws ServiceException { - return omRatisServer.submitRequest(request); - } - private OMResponse submitReadRequestToOM(OMRequest request) throws ServiceException { // Check if this OM is the leader.