Skip to content

Commit

Permalink
adding perf stats to ranking
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Jun 27, 2023
1 parent e4be79d commit 2c98a0d
Show file tree
Hide file tree
Showing 31 changed files with 270 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,16 @@ protected void doRun() {
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
threadPool.getThreadContext().getResponseHeaders();
if(threadPool.getThreadContext().getTransient("PERF_STATS") != null) {
// Map<String, NodePerfStats> nodePerfStats = (Map<String, NodePerfStats>) threadPool.getThreadContext().getTransient("PERF_STATS");
// for (NodePerfStats perfStats : nodePerfStats.values()) {
// logger.info("Response " +
// "CPU : {} , MEM : {} , IO : {}", perfStats.cpuPercentAvg, perfStats.memoryPercentAvg, perfStats.ioPercentAvg);
// }
}
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
clusterService.state().getRoutingTable().shardRoutingTable(bulkShardResponse.getShardId());
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.action.update.UpdateHelper;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.admissioncontroller.NodePerfStats;
import org.opensearch.client.transport.NoNodeAvailableException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
Expand All @@ -77,6 +78,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -108,9 +110,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -428,7 +428,7 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool, executor(primary));
}), listener, threadPool, executor(primary), clusterService.localNode().getId());
}

@Override
Expand All @@ -453,7 +453,8 @@ public static void performOnPrimary(
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
ThreadPool threadPool,
String executorName
String executorName,
String nodeId
) {
new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {

Expand Down Expand Up @@ -516,6 +517,24 @@ public boolean isForceExecution() {
}

private void finishRequest() {
Map<String, NodePerfStats> nodePerfStatsMap = new HashMap();
NodePerfStats nodePerfStats = new NodePerfStats(0.95, 0.95,0.95);
nodePerfStatsMap.put(nodeId, nodePerfStats);
ThreadContext threadContext = threadPool.getThreadContext();
threadContext.addResponseHeader("PERF_STATS", String.valueOf(nodePerfStats.cpuPercentAvg) + "-"
+ String.valueOf(nodePerfStats.memoryPercentAvg) + "-" + String.valueOf(nodePerfStats.ioPercentAvg));
// Map<String, NodePerfStats> np = new HashMap<>();
// if(threadContext.getTransient("PERF_STATS") != null ) {
// np = threadContext.getTransient("PERF_STATS");
// }
// np.put(nodeId, nodePerfStats);
//ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS"));
//ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS"));
//threadContext.putTransient("PERF_STATS", nodePerfStats);
//threadContext.putHeader("PERF_STATS", nodePerfStatsMap);
// ThreadContext.StoredContext storedContext1 = threadContext.newStoredContext(true, Collections.singletonList("T_ID"));
// threadContext.putTransient("T_ID", "nodePerfStats");

ActionListener.completeWith(
listener,
() -> new WritePrimaryResult<>(
Expand All @@ -527,6 +546,8 @@ private void finishRequest() {
logger
)
);
//storedContext.close();
// storedContext1.close();
}
}.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
}

shardsIt = clusterService.operationRouting()
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null, null);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,8 @@ private void executeSearch(
routingMap,
searchRequest.preference(),
searchService.getResponseCollectorService(),
nodeSearchCounts
nodeSearchCounts,
searchService.getAdmissionControllerService()
);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,8 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
new ChannelActionListener<>(channel, transportPrimaryAction, request),
releasable::close
);

try {
new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
// here
try {new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.ArrayList;
import java.util.List;

/**
* Plugin
*/
public class AdmissionControllerPlugin extends Plugin implements NetworkPlugin {

public AdmissionControllerService admissionControllerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;

/**
* Handler
* @param <T>
*/
public class AdmissionControllerRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
private final String action;
private final TransportRequestHandler<T> actualHandler;
Expand All @@ -37,6 +41,7 @@ protected ThreadContext getThreadContext() {
if(threadPool == null) {
return null;
}
threadPool.getThreadContext().getTransient("PERF_STATS");
return threadPool.getThreadContext();
}

Expand All @@ -58,7 +63,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
return;
}
}else {
log.info("Admission controller service responded with IO is in healthy state");
//log.info("Admission controller service responded with IO is in healthy state");
}
this.messageReceivedDecorate(request, actualHandler, channel, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Service
*/
public class AdmissionControllerService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class);
Expand Down Expand Up @@ -90,6 +93,8 @@ public double getMemoryEWMA() {
return memoryExecutionEWMA.getAverage();
}

public double getIoEWMA() { return ioExecutionEWMA.getAverage(); }

@Override
protected void doClose() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;

/**
* Interceptor
*/
public class AdmissionControllerTransportInterceptor implements TransportInterceptor {

protected final Logger log = LogManager.getLogger(this.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;

import java.io.IOException;

public class NodePerfStats {
/**
* Node perf stats
*/
public class NodePerfStats implements Writeable {
public double cpuPercentAvg;
public double memoryPercentAvg;
public double ioPercentAvg;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* package
*/
package org.opensearch.admissioncontroller;
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.admissioncontroller.AdmissionControllerService;
import org.opensearch.admissioncontroller.NodePerfStats;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -292,8 +294,9 @@ public ShardIterator activeInitializingShardsIt(int seed) {
*/
public ShardIterator activeInitializingShardsRankedIt(
@Nullable ResponseCollectorService collector,
@Nullable Map<String, Long> nodeSearchCounts
) {
@Nullable Map<String, Long> nodeSearchCounts,
@Nullable AdmissionControllerService admissionControllerService
) {
final int seed = shuffler.nextSeed();
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(
Expand Down Expand Up @@ -483,7 +486,12 @@ private static void adjustStats(
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
// revisit this - basically reset stats based on time
final double cpuPercentAvg = stats.nodePerfStats.cpuPercentAvg * 0.99;
final double memPercentAvg = stats.nodePerfStats.memoryPercentAvg * 0.99;
final double ioPercentAvg = stats.nodePerfStats.ioPercentAvg * 0.99;
NodePerfStats nodePerfStats = new NodePerfStats(cpuPercentAvg, memPercentAvg, ioPercentAvg);
collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService, nodePerfStats);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing;

import org.opensearch.admissioncontroller.AdmissionControllerService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
Expand Down Expand Up @@ -202,7 +203,8 @@ public ShardIterator getShards(
preference,
null,
null,
clusterState.getMetadata().weightedRoutingMetadata()
clusterState.getMetadata().weightedRoutingMetadata(),
null
);
}

Expand All @@ -215,7 +217,8 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar
preference,
null,
null,
clusterState.metadata().weightedRoutingMetadata()
clusterState.metadata().weightedRoutingMetadata(),
null
);
}

Expand All @@ -225,7 +228,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable Map<String, Set<String>> routing,
@Nullable String preference
) {
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
}

public GroupShardsIterator<ShardIterator> searchShards(
Expand All @@ -236,6 +239,17 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
return searchShards(clusterState, concreteIndices, routing, preference, collectorService, nodeCounts, null);
}
public GroupShardsIterator<ShardIterator> searchShards(
ClusterState clusterState,
String[] concreteIndices,
@Nullable Map<String, Set<String>> routing,
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts,
@Nullable AdmissionControllerService admissionControllerService
) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
Expand All @@ -253,7 +267,8 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference,
collectorService,
nodeCounts,
clusterState.metadata().weightedRoutingMetadata()
clusterState.metadata().weightedRoutingMetadata(),
admissionControllerService
);
if (iterator != null) {
set.add(iterator);
Expand All @@ -276,16 +291,21 @@ private Set<IndexShardRoutingTable> computeTargetedShards(
) {
routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map
final Set<IndexShardRoutingTable> set = new HashSet<>();
final Set<String> nodeIds = new HashSet<>();
// we use set here and not list since we might get duplicates
for (String index : concreteIndices) {
// this is where we calculate shard information which contains nodes associated with each shard
final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
final IndexMetadata indexMetadata = indexMetadata(clusterState, index);
final Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
final IndexShardRoutingTable indexShardRoutingTable = RoutingTable.shardRoutingTable(indexRouting,
calculateScaledShardId(indexMetadata, r, partitionOffset));
//nodeIds.add(indexShardRoutingTable.s)
set.add(indexShardRoutingTable);
}
}
} else {
Expand All @@ -304,10 +324,11 @@ private ShardIterator preferenceActiveShardIterator(
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts,
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
@Nullable WeightedRoutingMetadata weightedRoutingMetadata,
@Nullable AdmissionControllerService admissionControllerService
) {
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService);
}

if (preference.charAt(0) == '_') {
Expand Down Expand Up @@ -335,12 +356,13 @@ private ShardIterator preferenceActiveShardIterator(
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService);
} else {
// update the preference and continue
preference = preference.substring(index + 1);
}
}
// there are preference based routings as well
preferenceType = Preference.parse(preference);
checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata);
switch (preferenceType) {
Expand Down Expand Up @@ -400,7 +422,8 @@ private ShardIterator shardRoutings(
DiscoveryNodes nodes,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts,
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
@Nullable WeightedRoutingMetadata weightedRoutingMetadata,
@Nullable AdmissionControllerService admissionControllerService
) {
if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) {
return indexShard.activeInitializingShardsWeightedIt(
Expand All @@ -412,7 +435,7 @@ private ShardIterator shardRoutings(
);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts, admissionControllerService);
} else {
return indexShard.activeInitializingShardsRandomIt();
}
Expand Down
Loading

0 comments on commit 2c98a0d

Please sign in to comment.