Skip to content

Commit

Permalink
Add support for tracking failures at query group level (#15527)
Browse files Browse the repository at this point in the history
* add workload managementRequestFailureListener

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add unit tests

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add missing javadoc

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* refactor

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* rename listener instance

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 authored Aug 30, 2024
1 parent 1e9fdb4 commit 579f2aa
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
15 changes: 8 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1019,11 +1019,12 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener =
new QueryGroupRequestRejectionOperationListener(
new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService
threadPool
);
final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
Expand All @@ -1033,7 +1034,7 @@ protected Node(
searchRequestStats,
searchRequestSlowLog,
searchTaskRequestOperationsListener,
queryGroupRequestRejectionListener
queryGroupRequestOperationListener
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
Expand Down
50 changes: 49 additions & 1 deletion server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,59 @@
package org.opensearch.wlm;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;

import java.util.HashMap;
import java.util.Map;

/**
* This is stub at this point in time and will be replace by an acutal one in couple of days
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
*/
public class QueryGroupService {
// This map does not need to be concurrent since we will process the cluster state change serially and update
// this map with new additions and deletions of entries. QueryGroupState is thread safe
private final Map<String, QueryGroupState> queryGroupStateMap;

public QueryGroupService() {
this(new HashMap<>());
}

public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
this.queryGroupStateMap = queryGroupStateMap;
}

/**
* updates the failure stats for the query group
* @param queryGroupId query group identifier
*/
public void incrementFailuresFor(final String queryGroupId) {
QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId);
// This can happen if the request failed for a deleted query group
// or new queryGroup is being created and has not been acknowledged yet
if (queryGroupState == null) {
return;
}
queryGroupState.failures.inc();
}

/**
*
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
for (Map.Entry<String, QueryGroupState> queryGroupsState : queryGroupStateMap.entrySet()) {
final String queryGroupId = queryGroupsState.getKey();
final QueryGroupState currentState = queryGroupsState.getValue();

statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
}

return new QueryGroupStats(statsHolderMap);
}

/**
*
* @param queryGroupId query group identifier
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/wlm/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

/**
Expand All @@ -30,6 +31,8 @@ public enum ResourceType {
private final Function<Task, Long> getResourceUsage;
private final boolean statsEnabled;

private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);

ResourceType(String name, Function<Task, Long> getResourceUsage, boolean statsEnabled) {
this.name = name;
this.getResourceUsage = getResourceUsage;
Expand Down Expand Up @@ -71,4 +74,8 @@ public long getResourceUsage(Task task) {
public boolean hasStatsEnabled() {
return statsEnabled;
}

public static List<ResourceType> getSortedValues() {
return sortedValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@

package org.opensearch.wlm.listeners;

import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupTask;

/**
* This listener is used to perform the rejections for incoming requests into a queryGroup
* This listener is used to listen for request lifecycle events for a queryGroup
*/
public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener {
public class QueryGroupRequestOperationListener extends SearchRequestOperationsListener {

private final QueryGroupService queryGroupService;
private final ThreadPool threadPool;

public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
public QueryGroupRequestOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
this.queryGroupService = queryGroupService;
this.threadPool = threadPool;
}
Expand All @@ -36,4 +37,10 @@ protected void onRequestStart(SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
queryGroupService.rejectIfNeeded(queryGroupId);
}

@Override
protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
queryGroupService.incrementFailuresFor(queryGroupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class QueryGroupState {
/**
* this will track the cumulative failures in a query group
*/
final CounterMetric failures = new CounterMetric();
public final CounterMetric failures = new CounterMetric();

/**
* This will track total number of cancellations in the query group due to all resource type breaches
Expand Down Expand Up @@ -95,9 +95,18 @@ public static class ResourceTypeState {
final ResourceType resourceType;
final CounterMetric cancellations = new CounterMetric();
final CounterMetric rejections = new CounterMetric();
private double lastRecordedUsage = 0;

public ResourceTypeState(ResourceType resourceType) {
this.resourceType = resourceType;
}

public void setLastRecordedUsage(double recordedUsage) {
lastRecordedUsage = recordedUsage;
}

public double getLastRecordedUsage() {
return lastRecordedUsage;
}
}
}
65 changes: 56 additions & 9 deletions server/src/main/java/org/opensearch/wlm/stats/QueryGroupStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.stats.QueryGroupState.ResourceTypeState;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -52,7 +56,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("query_groups");
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : stats.entrySet()) {
// to keep the toXContent consistent
List<Map.Entry<String, QueryGroupStatsHolder>> entryList = new ArrayList<>(stats.entrySet());
entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey()));

for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : entryList) {
builder.startObject(queryGroupStats.getKey());
queryGroupStats.getValue().toXContent(builder, params);
builder.endObject();
Expand Down Expand Up @@ -83,11 +91,14 @@ public static class QueryGroupStatsHolder implements ToXContentObject, Writeable
public static final String REJECTIONS = "rejections";
public static final String TOTAL_CANCELLATIONS = "total_cancellations";
public static final String FAILURES = "failures";
private final long completions;
private final long rejections;
private final long failures;
private final long totalCancellations;
private final Map<ResourceType, ResourceStats> resourceStats;
private long completions;
private long rejections;
private long failures;
private long totalCancellations;
private Map<ResourceType, ResourceStats> resourceStats;

// this is needed to support the factory method
public QueryGroupStatsHolder() {}

public QueryGroupStatsHolder(
long completions,
Expand All @@ -111,6 +122,28 @@ public QueryGroupStatsHolder(StreamInput in) throws IOException {
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
}

/**
* static factory method to convert {@link QueryGroupState} into {@link QueryGroupStatsHolder}
* @param queryGroupState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) {
final QueryGroupStatsHolder statsHolder = new QueryGroupStatsHolder();

Map<ResourceType, ResourceStats> resourceStatsMap = new HashMap<>();

for (Map.Entry<ResourceType, ResourceTypeState> resourceTypeStateEntry : queryGroupState.getResourceState().entrySet()) {
resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue()));
}

statsHolder.completions = queryGroupState.getCompletions();
statsHolder.rejections = queryGroupState.getTotalRejections();
statsHolder.failures = queryGroupState.getFailures();
statsHolder.totalCancellations = queryGroupState.getTotalCancellations();
statsHolder.resourceStats = resourceStatsMap;
return statsHolder;
}

/**
* Writes the {@param statsHolder} to {@param out}
* @param out StreamOutput
Expand All @@ -136,9 +169,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(REJECTIONS, rejections);
builder.field(FAILURES, failures);
builder.field(TOTAL_CANCELLATIONS, totalCancellations);
for (Map.Entry<ResourceType, ResourceStats> resourceStat : resourceStats.entrySet()) {
ResourceType resourceType = resourceStat.getKey();
ResourceStats resourceStats1 = resourceStat.getValue();

for (ResourceType resourceType : ResourceType.getSortedValues()) {
ResourceStats resourceStats1 = resourceStats.get(resourceType);
if (resourceStats1 == null) continue;
builder.startObject(resourceType.getName());
resourceStats1.toXContent(builder, params);
builder.endObject();
Expand Down Expand Up @@ -187,6 +221,19 @@ public ResourceStats(StreamInput in) throws IOException {
this.rejections = in.readVLong();
}

/**
* static factory method to convert {@link ResourceTypeState} into {@link ResourceStats}
* @param resourceTypeState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static ResourceStats from(ResourceTypeState resourceTypeState) {
return new ResourceStats(
resourceTypeState.getLastRecordedUsage(),
resourceTypeState.cancellations.count(),
resourceTypeState.rejections.count()
);
}

/**
* Writes the {@param stats} to {@param out}
* @param out StreamOutput
Expand Down
Loading

0 comments on commit 579f2aa

Please sign in to comment.