Skip to content

Commit

Permalink
metrics, configuration and fix for performance test
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl committed Oct 22, 2024
1 parent fae0f2e commit 2ce5b04
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableRate;

/**
* This class is for maintaining Ozone Manager statistics.
Expand Down Expand Up @@ -235,7 +236,16 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong ecKeyCreateFailsTotal;
private @Metric MutableCounterLong ecBucketCreateTotal;
private @Metric MutableCounterLong ecBucketCreateFailsTotal;

@Metric(about = "request commit request get key")
private MutableRate keyCommitGetKeyRate;
public MutableRate getKeyCommitGetKeyRate() {
return keyCommitGetKeyRate;
}
@Metric(about = "request commit request get open key")
private MutableRate keyCommitGetOpenKeyRate;
public MutableRate getKeyCommitGetOpenKeyRate() {
return keyCommitGetOpenKeyRate;
}
private final DBCheckpointMetrics dbCheckpointMetrics;

public OMMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final BucketUtilizationMetrics bucketUtilizationMetrics;

private boolean fsSnapshotEnabled;
private final boolean isLeaderExecutorFlag;

/**
* OM Startup mode.
Expand Down Expand Up @@ -582,6 +583,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
this.isStrictS3 = conf.getBoolean(
OZONE_OM_NAMESPACE_STRICT_S3,
OZONE_OM_NAMESPACE_STRICT_S3_DEFAULT);
this.isLeaderExecutorFlag = configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);

// TODO: This is a temporary check. Once fully implemented, all OM state
// change should go through Ratis - be it standalone (for non-HA) or
Expand Down Expand Up @@ -5053,7 +5056,6 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
}

public boolean isLeaderExecutorEnabled() {
return configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
return isLeaderExecutorFlag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@
*/
public class KeyLocking {
private static final Logger LOG = LoggerFactory.getLogger(KeyLocking.class);
private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 10240;
private static final int DEFAULT_FILE_LOCK_STRIPED_SIZE = 102400;
private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
private Striped<ReadWriteLock> fileStripedLock = Striped.readWriteLock(DEFAULT_FILE_LOCK_STRIPED_SIZE);
private final Striped<ReadWriteLock> fileStripedLock;
private AtomicLong writeLockCount = new AtomicLong();
private AtomicLong readLockCount = new AtomicLong();
private AtomicLong failedLockCount = new AtomicLong();
private AtomicLong failedUnlockCount = new AtomicLong();

public KeyLocking(int stripLockSize) {
fileStripedLock = Striped.readWriteLock(stripLockSize);
}

public void lock(List<String> keyList) throws IOException {
for (String key : keyList) {
lock(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public OmLockOpr(LockType type, String volume, String bucket, List<String> keyNa
Collections.sort(this.keyNameList);
}
public static void init(String threadNamePrefix) {
keyLocking = new KeyLocking();
volumeLocking = new KeyLocking();
snapshotLocking = new KeyLocking();
bucketLocking = new KeyLocking();
keyLocking = new KeyLocking(102400);
volumeLocking = new KeyLocking(1024);
snapshotLocking = new KeyLocking(1024);
bucketLocking = new KeyLocking(1024);
prefixLocking = new FSOPrefixLocking(threadNamePrefix);
lockedObjMap = new ConcurrentHashMap<>();
// init scheduler to check and monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ private void runExecuteCommand(
}

private void mergeSubmit(RequestContext ctx) throws InterruptedException {
if (mergeTaskPoolSize == 0) {
requestMergeCommand(Collections.singletonList(ctx), this::ratisSubmit);
return;
}
int nxtIndex = Math.abs(mergeCurrentPool.getAndIncrement() % mergeTaskPoolSize);
requestMerger.submit(nxtIndex, ctx);
}
Expand Down Expand Up @@ -221,6 +225,8 @@ private void requestMergeCommandInternal(
OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
= OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
long size = 0;
omGatewayMetrics.incRequestMergeCombineCount(ctxs.size());
omGatewayMetrics.incRequestMergeCallCount();
for (RequestContext ctx : ctxs) {
DbChangesRecorder recorder = ctx.getRequestBase().changeRecorder();
int tmpSize = 0;
Expand All @@ -235,6 +241,7 @@ private void requestMergeCommandInternal(
// send current batched request
appendBucketQuotaChanges(reqBuilder, bucketChangeMap);
prepareAndSendRequest(sendList, reqBuilder, nxtPool);
omGatewayMetrics.incRequestMergeOverflowCount();

// reinit and continue
reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
Expand Down Expand Up @@ -272,6 +279,10 @@ private void requestMergeCommandInternal(

private void ratisSubmit(RatisContext ctx) throws InterruptedException {
// follow simple strategy to submit to ratis for next set of merge request
if (ratisTaskPoolSize == 0) {
ratisCommand(Collections.singletonList(ctx), null);
return;
}
int nxtIndex = Math.abs(ratisCurrentPool.getAndIncrement() % ratisTaskPoolSize);
ratisSubmitter.submit(nxtIndex, ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ public class OMGateway {
private AtomicLong uniqueIndex = new AtomicLong();
private OMGatewayMetrics omGatewayMetrics;
private final ScheduledExecutorService executorService;
private boolean isAuthorize;

public OMGateway(OzoneManager om) throws IOException {
this.om = om;
omGatewayMetrics = OMGatewayMetrics.create();
OmLockOpr.init(om.getThreadNamePrefix());
OmRequestLockUtils.init();
isAuthorize = om.getConfiguration().getBoolean("ozone.om.leader.request.is.authorize", true);
this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex, omGatewayMetrics);
this.leaderCompatibleExecutor = new LeaderCompatibleRequestExecutor(om, uniqueIndex);
this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex);
Expand Down Expand Up @@ -138,27 +140,31 @@ public OMResponse submitInternal(OMRequest omRequest) throws ServiceException {
executorType = captureLatencyNs(omGatewayMetrics.getGatewayPreExecute(), () -> {
OmRequestBase requestBase = OzoneManagerRatisUtils.createLeaderClientRequest(omRequest, om);
if (null != requestBase) {
requestBase.preProcess(om);
OMRequest request = requestBase.preProcess(om);
requestContext.setRequest(request);
requestContext.setRequestBase(requestBase);
return ExecutorType.LEADER_OPTIMIZED;
} else {
return ExecutorType.LEADER_COMPATIBLE;
}
});
if (executorType == ExecutorType.LEADER_OPTIMIZED) {
if (executorType == ExecutorType.LEADER_OPTIMIZED && isAuthorize) {
captureLatencyNs(omGatewayMetrics.getGatewayAuthorize(), () -> requestContext.getRequestBase().authorize(om));
}
}
if (executorType == ExecutorType.LEADER_COMPATIBLE) {
OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, om);
OMRequest request = omClientRequest.preExecute(om);
omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, om);
requestContext.setClientRequest(omClientRequest);
requestContext.setRequest(request);
}
lockOperation = OmRequestLockUtils.getLockOperation(om, omRequest);
lockOperation = OmRequestLockUtils.getLockOperation(om, requestContext.getRequest());
final OmLockOpr tmpLockOpr = lockOperation;
captureLatencyNs(omGatewayMetrics.getGatewayLock(), () -> tmpLockOpr.lock(om));

validate(omRequest);
ensurePreviousRequestCompletionForPrepare(omRequest);
validate(requestContext.getRequest());
ensurePreviousRequestCompletionForPrepare(requestContext.getRequest());

// submit request
if (executorType == ExecutorType.LEADER_COMPATIBLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public class OMGatewayMetrics {
@Metric(about = "request gateway request at any time captured per sec")
private MutableRate gatewayRequestInProgress;
private @Metric MutableCounterLong requestCount;
@Metric(about = "request gateway merge call count")
private MutableCounterLong gatewayMergeCombineCount;
@Metric(about = "request gateway merge combine count")
private MutableCounterLong gatewayMergeCallCount;
@Metric(about = "request gateway merge overflow count")
private MutableCounterLong gatewayMergeOverflowCount;

public OMGatewayMetrics() {
}
Expand Down Expand Up @@ -113,4 +119,16 @@ public MutableRate getGatewayRequestInProgress() {
public MutableRate getGatewayMergeWait() {
return gatewayMergeWait;
}

public void incRequestMergeCombineCount(int size) {
gatewayMergeCombineCount.incr(size);
}

public void incRequestMergeCallCount() {
gatewayMergeCallCount.incr();
}

public void incRequestMergeOverflowCount() {
gatewayMergeOverflowCount.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

/**
* Handles CommitKey request.
Expand Down Expand Up @@ -183,7 +184,12 @@ public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex)
// creation and key commit, old versions will be just overwritten and
// not kept. Bucket versioning will be effective from the first key
// creation after the knob turned on.
OmKeyInfo keyToDelete = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
OmKeyInfo keyToDelete = null;
if (ozoneManager.getConfiguration().getBoolean("ozone.om.leader.commit.request.old.key.get", true)) {
keyToDelete = captureLatencyNs(omMetrics.getKeyCommitGetKeyRate(),
() -> omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey));
}
// OmKeyInfo keyToDelete = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
long writerClientId = commitKeyRequest.getClientID();
boolean isSameHsyncKey = false;
boolean isOverwrittenHsyncKey = false;
Expand All @@ -205,7 +211,8 @@ public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex)
writerClientId = Long.parseLong(clientId);
}
String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName, keyName, writerClientId);
omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey);
omKeyInfo = captureLatencyNs(omMetrics.getKeyCommitGetOpenKeyRate(),
() -> omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey));
if (omKeyInfo == null) {
String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.audit.OMAction;
Expand Down Expand Up @@ -60,11 +66,14 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
import static org.apache.hadoop.ozone.om.request.OMClientRequest.validateAndNormalizeKey;

/**
Expand Down Expand Up @@ -132,8 +141,8 @@ public OMClientResponse process(OzoneManager ozoneManager, TermIndex termIndex)
validateAtomicRewrite(dbKeyInfo, keyArgs);*/

// authorize
OmKeyUtils.checkKeyAcls(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(),
IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY, getOmRequest());
//OmKeyUtils.checkKeyAcls(ozoneManager, keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(),
// IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY, getOmRequest());

// prepare
FileEncryptionInfo encInfo;
Expand Down Expand Up @@ -201,21 +210,46 @@ public void addBlockInfo(OzoneManager ozoneManager, KeyArgs keyArgs,
// block which is the current size, if read by the client.
final long requestedSize = keyArgs.getDataSize() > 0 ? keyArgs.getDataSize() : scmBlockSize;

UserInfo userInfo = getOmRequest().getUserInfo();
List<OmKeyLocationInfo> omKeyLocationInfoList = OmKeyUtils.allocateBlock(ozoneManager.getScmClient(),
ozoneManager.getBlockTokenSecretManager(), repConfig,
new ExcludeList(), requestedSize, scmBlockSize,
ozoneManager.getPreallocateBlocksMax(), ozoneManager.isGrpcBlockTokenEnabled(),
ozoneManager.getOMServiceId(), ozoneManager.getMetrics(),
keyArgs.getSortDatanodes(), userInfo);
// convert to proto and convert back as to filter out in existing logic
List<OmKeyLocationInfo> newLocationList = omKeyLocationInfoList.stream()
.map(info -> info.getProtobuf(false, getOmRequest().getVersion())).map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());
List<OmKeyLocationInfo> newLocationList = null;
if (!ozoneManager.getConfiguration().getBoolean("ozone.om.leader.request.dummy.block", true)) {
UserInfo userInfo = getOmRequest().getUserInfo();
List<OmKeyLocationInfo> omKeyLocationInfoList = OmKeyUtils.allocateBlock(ozoneManager.getScmClient(),
ozoneManager.getBlockTokenSecretManager(), repConfig,
new ExcludeList(), requestedSize, scmBlockSize,
ozoneManager.getPreallocateBlocksMax(), ozoneManager.isGrpcBlockTokenEnabled(),
ozoneManager.getOMServiceId(), ozoneManager.getMetrics(),
keyArgs.getSortDatanodes(), userInfo);
// convert to proto and convert back as to filter out in existing logic
newLocationList = omKeyLocationInfoList.stream()
.map(info -> info.getProtobuf(false, getOmRequest().getVersion())).map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());
} else {
newLocationList = allocateDummyBlocks(scmBlockSize, ozoneManager);
}
omKeyInfo.appendNewBlocks(newLocationList, false);
omKeyInfo.setDataSize(requestedSize + omKeyInfo.getDataSize());
}

private List<OmKeyLocationInfo> allocateDummyBlocks(long scmBlockSize, OzoneManager ozoneManager) throws IOException {
BlockID blockID = new BlockID(1L, 1L);
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(blockID)
.setLength(scmBlockSize)
.setOffset(0)
.setPipeline(Pipeline.newBuilder().setId(PipelineID.randomId())
.setReplicationConfig(ozoneManager.getDefaultReplicationConfig()).setState(Pipeline.PipelineState.OPEN)
.setNodes(Collections.emptyList()).build());
if (ozoneManager.isGrpcBlockTokenEnabled()) {
UserGroupInformation ugi = Server.getRemoteUser();
builder.setToken(ozoneManager.getBlockTokenSecretManager().generateToken(
((ugi != null) ? ugi : UserGroupInformation.getCurrentUser()).getShortUserName(), blockID,
EnumSet.of(READ, WRITE), scmBlockSize));
}
List<OmKeyLocationInfo> locationInfos = new ArrayList<>();
locationInfos.add(builder.build());
return locationInfos;
}

protected void logResult(CreateKeyRequest createKeyRequest,
OMMetrics omMetrics, Exception exception, OMClientRequest.Result result,
int numMissingParents) {
Expand Down

0 comments on commit 2ce5b04

Please sign in to comment.