Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11558. Make OM client retry idempotent #7329

Merged
merged 12 commits into from
Oct 29, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -33,17 +36,24 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.TimeDuration;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -52,7 +62,9 @@
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -61,6 +73,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOUT;
Expand All @@ -71,6 +84,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
Expand All @@ -80,7 +94,8 @@
@Flaky("HDDS-11352")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class TestOzoneManagerHAWithStoppedNodes extends TestOzoneManagerHA {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
TestOzoneManagerHAWithStoppedNodes.class);
/**
* After restarting OMs we need to wait
* for a leader to be elected and ready.
Expand Down Expand Up @@ -594,6 +609,97 @@ void testListVolumes() throws Exception {
objectStore.listVolumesByUser(userName, prefix, ""));
}

@Test
void testRetryCacheWithDownedOM() throws Exception {
// Create a volume, a bucket and a key
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = UUID.randomUUID().toString();
String keyTo = UUID.randomUUID().toString();

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner(userName)
.setAdmin(adminName)
.build();
getObjectStore().createVolume(volumeName, createVolumeArgs);
OzoneVolume ozoneVolume = getObjectStore().getVolume(volumeName);
ozoneVolume.createBucket(bucketName);
OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
String keyFrom = createKey(ozoneBucket);

int callId = 10;
ClientId clientId = ClientId.randomId();
MiniOzoneHAClusterImpl cluster = getCluster();
OzoneManager omLeader = cluster.getOMLeader();

OzoneManagerProtocolProtos.KeyArgs keyArgs =
OzoneManagerProtocolProtos.KeyArgs.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyFrom)
.build();
OzoneManagerProtocolProtos.RenameKeyRequest renameKeyRequest
= OzoneManagerProtocolProtos.RenameKeyRequest.newBuilder()
.setKeyArgs(keyArgs)
.setToKeyName(keyTo)
.build();
OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.RenameKey)
.setRenameKeyRequest(renameKeyRequest)
.setClientId(clientId.toString())
.build();
// set up the current call so that OM Ratis Server doesn't complain.
Server.getCurCall().set(new Server.Call(callId, 0, null, null,
RPC.RpcKind.RPC_BUILTIN, clientId.toByteString().toByteArray()));
// Submit rename request to OM
OzoneManagerProtocolProtos.OMResponse omResponse =
omLeader.getOmServerProtocol().processRequest(omRequest);
assertTrue(omResponse.getSuccess());

// Make one of the follower OM the leader, and shutdown the current leader.
OzoneManager newLeader = cluster.getOzoneManagersList().stream().filter(
om -> !om.getOMNodeId().equals(omLeader.getOMNodeId())).findFirst().get();
transferLeader(omLeader, newLeader);
cluster.shutdownOzoneManager(omLeader);

// Once the rename completes, the source key should no longer exist
// and the destination key should exist.
OMException omException = assertThrows(OMException.class,
() -> ozoneBucket.getKey(keyFrom));
assertEquals(omException.getResult(), OMException.ResultCodes.KEY_NOT_FOUND);
assertTrue(ozoneBucket.getKey(keyTo).isFile());

// Submit rename request to OM again. The request is cached so it will succeed.
omResponse = newLeader.getOmServerProtocol().processRequest(omRequest);
assertTrue(omResponse.getSuccess());
}

private void transferLeader(OzoneManager omLeader, OzoneManager newLeader) throws IOException {
LOG.info("Transfer leadership from {}(raft id {}) to {}(raft id {})",
omLeader.getOMNodeId(), omLeader.getOmRatisServer().getRaftPeerId(),
newLeader.getOMNodeId(), newLeader.getOmRatisServer().getRaftPeerId());

final SupportedRpcType rpc = SupportedRpcType.GRPC;
final RaftProperties properties = RatisHelper.newRaftProperties(rpc);

// For now not making anything configurable, RaftClient is only used
// in SCM for DB updates of sub-ca certs go via Ratis.
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(omLeader.getOmRatisServer().getRaftGroup())
.setLeaderId(null)
.setProperties(properties)
.setRetryPolicy(
RetryPolicies.retryUpToMaximumCountWithFixedSleep(120,
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS)));
try (RaftClient raftClient = builder.build()) {
RaftClientReply reply = raftClient.admin().transferLeadership(newLeader.getOmRatisServer()
.getRaftPeerId(), 10 * 1000);
assertTrue(reply.isSuccess());
}
}

private void validateVolumesList(Set<String> expectedVolumes,
Iterator<? extends OzoneVolume> volumeIterator) {
int expectedCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
Expand All @@ -87,6 +88,7 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.LifeCycle;
Expand Down Expand Up @@ -460,23 +462,51 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) {
* ratis server.
*/
private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) {
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
}
return RaftClientRequest.newBuilder()
.setClientId(
ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())))
.setClientId(getClientId())
.setServerId(server.getId())
.setGroupId(raftGroupId)
.setCallId(Server.getCallId())
.setCallId(getCallId())
.setMessage(
Message.valueOf(
OMRatisHelper.convertRequestToByteString(omRequest)))
.setType(RaftClientRequest.writeRequestType())
.build();
}

private ClientId getClientId() {
final byte[] clientIdBytes = Server.getClientId();
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(clientIdBytes != DUMMY_CLIENT_ID);
}
return ClientId.valueOf(UUID.nameUUIDFromBytes(clientIdBytes));
}

private long getCallId() {
final long callId = Server.getCallId();
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(callId != INVALID_CALL_ID);
}
return callId;
}

public OMResponse checkRetryCache() throws ServiceException {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(getClientId(), getCallId());
final RetryCache.Entry cacheEntry = getServerDivision().getRetryCache().getIfPresent(invocationId);
if (cacheEntry == null) {
return null; //cache miss
}
//cache hit
try {
return getOMResponse(cacheEntry.getReplyFuture().get());
} catch (ExecutionException ex) {
throw new ServiceException(ex.getMessage(), ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new ServiceException(ex.getMessage(), ex);
}
}

/**
* Process the raftClientReply and return OMResponse.
* @param omRequest
Expand Down Expand Up @@ -538,6 +568,10 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
}
}

return getOMResponse(reply);
}

private OMResponse getOMResponse(RaftClientReply reply) throws ServiceException {
try {
return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
} catch (IOException ex) {
Expand All @@ -547,9 +581,6 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
throw new ServiceException(ex);
}
}

// TODO: Still need to handle RaftRetry failure exception and
// NotReplicated exception.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ public OMResponse processRequest(OMRequest request) throws ServiceException {
return response;
}

private OMResponse internalProcessRequest(OMRequest request) throws
ServiceException {
OMClientRequest omClientRequest = null;
private OMResponse internalProcessRequest(OMRequest request) throws ServiceException {
boolean s3Auth = false;

try {
Expand Down Expand Up @@ -207,14 +205,24 @@ private OMResponse internalProcessRequest(OMRequest request) throws
if (!s3Auth) {
OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager);
}
OMRequest requestToSubmit;

// check retry cache
final OMResponse cached = omRatisServer.checkRetryCache();
if (cached != null) {
return cached;
}

// process new request
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = createClientRequest(request, ozoneManager);
// TODO: Note: Due to HDDS-6055, createClientRequest() could now
// return null, which triggered the findbugs warning.
// Added the assertion.
assert (omClientRequest != null);
OMClientRequest finalOmClientRequest = omClientRequest;

requestToSubmit = preExecute(finalOmClientRequest);
this.lastRequestToSubmit = requestToSubmit;
} catch (IOException ex) {
Expand All @@ -225,7 +233,7 @@ private OMResponse internalProcessRequest(OMRequest request) throws
return createErrorResponse(request, ex);
}

OMResponse response = submitRequestToRatis(requestToSubmit);
final OMResponse response = omRatisServer.submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
Expand All @@ -246,14 +254,6 @@ public OMRequest getLastRequestToSubmit() {
return lastRequestToSubmit;
}

/**
* Submits request to OM's Ratis server.
*/
private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
return omRatisServer.submitRequest(request);
}

private OMResponse submitReadRequestToOM(OMRequest request)
throws ServiceException {
// Check if this OM is the leader.
Expand Down