diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java index 6937c52c712..542d70ce93e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithStoppedNodes.java @@ -20,9 +20,12 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -33,17 +36,24 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.om.ha.OMHAMetrics; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.service.KeyDeletingService; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.tag.Flaky; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.TimeDuration; import org.junit.jupiter.api.AfterEach; @@ -52,7 +62,9 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.ConnectException; import java.util.HashMap; import java.util.Iterator; @@ -61,6 +73,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT; @@ -71,6 +84,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** @@ -80,7 +94,8 @@ @Flaky("HDDS-11352") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TestOzoneManagerHAWithStoppedNodes extends TestOzoneManagerHA { - + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( + TestOzoneManagerHAWithStoppedNodes.class); /** * After restarting OMs we need to wait * for a leader to be elected and ready. @@ -594,6 +609,97 @@ void testListVolumes() throws Exception { objectStore.listVolumesByUser(userName, prefix, "")); } + @Test + void testRetryCacheWithDownedOM() throws Exception { + // Create a volume, a bucket and a key + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = UUID.randomUUID().toString(); + String keyTo = UUID.randomUUID().toString(); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + getObjectStore().createVolume(volumeName, createVolumeArgs); + OzoneVolume ozoneVolume = getObjectStore().getVolume(volumeName); + ozoneVolume.createBucket(bucketName); + OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName); + String keyFrom = createKey(ozoneBucket); + + int callId = 10; + ClientId clientId = ClientId.randomId(); + MiniOzoneHAClusterImpl cluster = getCluster(); + OzoneManager omLeader = cluster.getOMLeader(); + + OzoneManagerProtocolProtos.KeyArgs keyArgs = + OzoneManagerProtocolProtos.KeyArgs.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyFrom) + .build(); + OzoneManagerProtocolProtos.RenameKeyRequest renameKeyRequest + = OzoneManagerProtocolProtos.RenameKeyRequest.newBuilder() + .setKeyArgs(keyArgs) + .setToKeyName(keyTo) + .build(); + OzoneManagerProtocolProtos.OMRequest omRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.RenameKey) + .setRenameKeyRequest(renameKeyRequest) + .setClientId(clientId.toString()) + .build(); + // set up the current call so that OM Ratis Server doesn't complain. + Server.getCurCall().set(new Server.Call(callId, 0, null, null, + RPC.RpcKind.RPC_BUILTIN, clientId.toByteString().toByteArray())); + // Submit rename request to OM + OzoneManagerProtocolProtos.OMResponse omResponse = + omLeader.getOmServerProtocol().processRequest(omRequest); + assertTrue(omResponse.getSuccess()); + + // Make one of the follower OM the leader, and shutdown the current leader. + OzoneManager newLeader = cluster.getOzoneManagersList().stream().filter( + om -> !om.getOMNodeId().equals(omLeader.getOMNodeId())).findFirst().get(); + transferLeader(omLeader, newLeader); + cluster.shutdownOzoneManager(omLeader); + + // Once the rename completes, the source key should no longer exist + // and the destination key should exist. + OMException omException = assertThrows(OMException.class, + () -> ozoneBucket.getKey(keyFrom)); + assertEquals(omException.getResult(), OMException.ResultCodes.KEY_NOT_FOUND); + assertTrue(ozoneBucket.getKey(keyTo).isFile()); + + // Submit rename request to OM again. The request is cached so it will succeed. + omResponse = newLeader.getOmServerProtocol().processRequest(omRequest); + assertTrue(omResponse.getSuccess()); + } + + private void transferLeader(OzoneManager omLeader, OzoneManager newLeader) throws IOException { + LOG.info("Transfer leadership from {}(raft id {}) to {}(raft id {})", + omLeader.getOMNodeId(), omLeader.getOmRatisServer().getRaftPeerId(), + newLeader.getOMNodeId(), newLeader.getOmRatisServer().getRaftPeerId()); + + final SupportedRpcType rpc = SupportedRpcType.GRPC; + final RaftProperties properties = RatisHelper.newRaftProperties(rpc); + + // For now not making anything configurable, RaftClient is only used + // in SCM for DB updates of sub-ca certs go via Ratis. + RaftClient.Builder builder = RaftClient.newBuilder() + .setRaftGroup(omLeader.getOmRatisServer().getRaftGroup()) + .setLeaderId(null) + .setProperties(properties) + .setRetryPolicy( + RetryPolicies.retryUpToMaximumCountWithFixedSleep(120, + TimeDuration.valueOf(500, TimeUnit.MILLISECONDS))); + try (RaftClient raftClient = builder.build()) { + RaftClientReply reply = raftClient.admin().transferLeadership(newLeader.getOmRatisServer() + .getRaftPeerId(), 10 * 1000); + assertTrue(reply.isSuccess()); + } + } + private void validateVolumesList(Set expectedVolumes, Iterator volumeIterator) { int expectedCount = 0; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index af4d42ad68a..9f187dd0219 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -71,6 +71,7 @@ import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; @@ -87,6 +88,7 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RetryCache; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.util.LifeCycle; @@ -460,16 +462,11 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) { * ratis server. */ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { - if (!ozoneManager.isTestSecureOmFlag()) { - Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID); - Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID); - } return RaftClientRequest.newBuilder() - .setClientId( - ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId()))) + .setClientId(getClientId()) .setServerId(server.getId()) .setGroupId(raftGroupId) - .setCallId(Server.getCallId()) + .setCallId(getCallId()) .setMessage( Message.valueOf( OMRatisHelper.convertRequestToByteString(omRequest))) @@ -477,6 +474,39 @@ private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) { .build(); } + private ClientId getClientId() { + final byte[] clientIdBytes = Server.getClientId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID); + } + return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes)); + } + + private long getCallId() { + final long callId = Server.getCallId(); + if (!ozoneManager.isTestSecureOmFlag()) { + Preconditions.checkArgument(callId != INVALID_CALL_ID); + } + return callId; + } + + public OMResponse checkRetryCache() throws ServiceException { + final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId()); + final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId); + if (cacheEntry == null) { + return null; //cache miss + } + //cache hit + try { + return getOMResponse(cacheEntry.getReplyFuture().get()); + } catch (ExecutionException ex) { + throw new ServiceException(ex.getMessage(), ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceException(ex.getMessage(), ex); + } + } + /** * Process the raftClientReply and return OMResponse. * @param omRequest @@ -538,6 +568,10 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, } } + return getOMResponse(reply); + } + + private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException { try { return OMRatisHelper.getOMResponseFromRaftClientReply(reply); } catch (IOException ex) { @@ -547,9 +581,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, throw new ServiceException(ex); } } - - // TODO: Still need to handle RaftRetry failure exception and - // NotReplicated exception. } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 4506337e54d..6b55b7384bd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -176,9 +176,7 @@ public OMResponse processRequest(OMRequest request) throws ServiceException { return response; } - private OMResponse internalProcessRequest(OMRequest request) throws - ServiceException { - OMClientRequest omClientRequest = null; + private OMResponse internalProcessRequest(OMRequest request) throws ServiceException { boolean s3Auth = false; try { @@ -207,7 +205,16 @@ private OMResponse internalProcessRequest(OMRequest request) throws if (!s3Auth) { OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); } - OMRequest requestToSubmit; + + // check retry cache + final OMResponse cached = omRatisServer.checkRetryCache(); + if (cached != null) { + return cached; + } + + // process new request + OMClientRequest omClientRequest = null; + final OMRequest requestToSubmit; try { omClientRequest = createClientRequest(request, ozoneManager); // TODO: Note: Due to HDDS-6055, createClientRequest() could now @@ -215,6 +222,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws // Added the assertion. assert (omClientRequest != null); OMClientRequest finalOmClientRequest = omClientRequest; + requestToSubmit = preExecute(finalOmClientRequest); this.lastRequestToSubmit = requestToSubmit; } catch (IOException ex) { @@ -225,7 +233,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws return createErrorResponse(request, ex); } - OMResponse response = submitRequestToRatis(requestToSubmit); + final OMResponse response = omRatisServer.submitRequest(requestToSubmit); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } @@ -246,14 +254,6 @@ public OMRequest getLastRequestToSubmit() { return lastRequestToSubmit; } - /** - * Submits request to OM's Ratis server. - */ - private OMResponse submitRequestToRatis(OMRequest request) - throws ServiceException { - return omRatisServer.submitRequest(request); - } - private OMResponse submitReadRequestToOM(OMRequest request) throws ServiceException { // Check if this OM is the leader.