Skip to content

Commit

Permalink
HDDS-11558. Make OM client retry idempotent (apache#7329)
Browse files Browse the repository at this point in the history
Co-authored-by: Sumit Agrawal <sumit.jecrc@gmail.com>
(cherry picked from commit afed6d9)
  • Loading branch information
jojochuang authored Oct 29, 2024
1 parent 6d05736 commit bec5a69
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -33,17 +36,24 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -52,7 +62,9 @@
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -61,6 +73,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT;
Expand All @@ -71,6 +84,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
Expand All @@ -79,7 +93,8 @@
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class TestOzoneManagerHAWithStoppedNodes extends TestOzoneManagerHA {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
TestOzoneManagerHAWithStoppedNodes.class);
/**
* After restarting OMs we need to wait
* for a leader to be elected and ready.
Expand Down Expand Up @@ -593,6 +608,97 @@ void testListVolumes() throws Exception {
objectStore.listVolumesByUser(userName, prefix, ""));
}

@Test
void testRetryCacheWithDownedOM() throws Exception {
// Create a volume, a bucket and a key
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = UUID.randomUUID().toString();
String keyTo = UUID.randomUUID().toString();

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
getObjectStore().createVolume(volumeName, createVolumeArgs);
OzoneVolume ozoneVolume = getObjectStore().getVolume(volumeName);
ozoneVolume.createBucket(bucketName);
OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
String keyFrom = createKey(ozoneBucket);

int callId = 10;
ClientId clientId = ClientId.randomId();
MiniOzoneHAClusterImpl cluster = getCluster();
OzoneManager omLeader = cluster.getOMLeader();

OzoneManagerProtocolProtos.KeyArgs keyArgs =
OzoneManagerProtocolProtos.KeyArgs.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyFrom)
.build();
OzoneManagerProtocolProtos.RenameKeyRequest renameKeyRequest
= OzoneManagerProtocolProtos.RenameKeyRequest.newBuilder()
.setKeyArgs(keyArgs)
.setToKeyName(keyTo)
.build();
OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.RenameKey)
.setRenameKeyRequest(renameKeyRequest)
.setClientId(clientId.toString())
.build();
// set up the current call so that OM Ratis Server doesn't complain.
Server.getCurCall().set(new Server.Call(callId, 0, null, null,
RPC.RpcKind.RPC_BUILTIN, clientId.toByteString().toByteArray()));
// Submit rename request to OM
OzoneManagerProtocolProtos.OMResponse omResponse =
omLeader.getOmServerProtocol().processRequest(omRequest);
assertTrue(omResponse.getSuccess());

// Make one of the follower OM the leader, and shutdown the current leader.
OzoneManager newLeader = cluster.getOzoneManagersList().stream().filter(
om -> !om.getOMNodeId().equals(omLeader.getOMNodeId())).findFirst().get();
transferLeader(omLeader, newLeader);
cluster.shutdownOzoneManager(omLeader);

// Once the rename completes, the source key should no longer exist
// and the destination key should exist.
OMException omException = assertThrows(OMException.class,
() -> ozoneBucket.getKey(keyFrom));
assertEquals(omException.getResult(), OMException.ResultCodes.KEY_NOT_FOUND);
assertTrue(ozoneBucket.getKey(keyTo).isFile());

// Submit rename request to OM again. The request is cached so it will succeed.
omResponse = newLeader.getOmServerProtocol().processRequest(omRequest);
assertTrue(omResponse.getSuccess());
}

private void transferLeader(OzoneManager omLeader, OzoneManager newLeader) throws IOException {
LOG.info("Transfer leadership from {}(raft id {}) to {}(raft id {})",
omLeader.getOMNodeId(), omLeader.getOmRatisServer().getRaftPeerId(),
newLeader.getOMNodeId(), newLeader.getOmRatisServer().getRaftPeerId());

final SupportedRpcType rpc = SupportedRpcType.GRPC;
final RaftProperties properties = RatisHelper.newRaftProperties(rpc);

// For now not making anything configurable, RaftClient is only used
// in SCM for DB updates of sub-ca certs go via Ratis.
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(omLeader.getOmRatisServer().getRaftGroup())
.setLeaderId(null)
.setProperties(properties)
.setRetryPolicy(
RetryPolicies.retryUpToMaximumCountWithFixedSleep(120,
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS)));
try (RaftClient raftClient = builder.build()) {
RaftClientReply reply = raftClient.admin().transferLeadership(newLeader.getOmRatisServer()
.getRaftPeerId(), 10 * 1000);
assertTrue(reply.isSuccess());
}
}

private void validateVolumesList(Set<String> expectedVolumes,
Iterator<? extends OzoneVolume> volumeIterator) {
int expectedCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
Expand All @@ -87,6 +88,7 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.LifeCycle;
Expand Down Expand Up @@ -460,23 +462,51 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) {
* ratis server.
*/
private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) {
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
}
return RaftClientRequest.newBuilder()
.setClientId(
ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())))
.setClientId(getClientId())
.setServerId(server.getId())
.setGroupId(raftGroupId)
.setCallId(Server.getCallId())
.setCallId(getCallId())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

private ClientId getClientId() {
final byte[] clientIdBytes = Server.getClientId();
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID);
}
return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes));
}

private long getCallId() {
final long callId = Server.getCallId();
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(callId != INVALID_CALL_ID);
}
return callId;
}

public OMResponse checkRetryCache() throws ServiceException {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId());
final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId);
if (cacheEntry == null) {
return null; //cache miss
}
//cache hit
try {
return getOMResponse(cacheEntry.getReplyFuture().get());
} catch (ExecutionException ex) {
throw new ServiceException(ex.getMessage(), ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new ServiceException(ex.getMessage(), ex);
}
}

/**
* Process the raftClientReply and return OMResponse.
* @param omRequest
Expand Down Expand Up @@ -538,6 +568,10 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
}
}

return getOMResponse(reply);
}

private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException {
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
} catch (IOException ex) {
Expand All @@ -547,9 +581,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
throw new ServiceException(ex);
}
}

// TODO: Still need to handle RaftRetry failure exception and
// NotReplicated exception.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ public OMResponse processRequest(OMRequest request) throws ServiceException {
return response;
}

private OMResponse internalProcessRequest(OMRequest request) throws
ServiceException {
OMClientRequest omClientRequest = null;
private OMResponse internalProcessRequest(OMRequest request) throws ServiceException {
boolean s3Auth = false;

try {
Expand Down Expand Up @@ -207,14 +205,24 @@ private OMResponse internalProcessRequest(OMRequest request) throws
if (!s3Auth) {
OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager);
}
OMRequest requestToSubmit;

// check retry cache
final OMResponse cached = omRatisServer.checkRetryCache();
if (cached != null) {
return cached;
}

// process new request
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = createClientRequest(request, ozoneManager);
// TODO: Note: Due to HDDS-6055, createClientRequest() could now
// return null, which triggered the findbugs warning.
// Added the assertion.
assert (omClientRequest != null);
OMClientRequest finalOmClientRequest = omClientRequest;

requestToSubmit = preExecute(finalOmClientRequest);
this.lastRequestToSubmit = requestToSubmit;
} catch (IOException ex) {
Expand All @@ -225,7 +233,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws
return createErrorResponse(request, ex);
}

OMResponse response = submitRequestToRatis(requestToSubmit);
final OMResponse response = omRatisServer.submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
Expand All @@ -246,14 +254,6 @@ public OMRequest getLastRequestToSubmit() {
return lastRequestToSubmit;
}

/**
* Submits request to OM's Ratis server.
*/
private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
return omRatisServer.submitRequest(request);
}

private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
Expand Down

0 comments on commit bec5a69

Please sign in to comment.