Skip to content

Commit

Permalink
Adds the listener for resource utilization metrics (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#687)

* Adds the listener for resource utilization metrics

Signed-off-by: Gagan Juneja <gjjuneja@amazon.com>
Co-authored-by: Gagan Juneja <gjjuneja@amazon.com>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Jul 23, 2024
1 parent bf2b5ea commit c95feb6
Show file tree
Hide file tree
Showing 16 changed files with 1,120 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction;
import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction;
import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor;
import org.opensearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -302,7 +304,10 @@ public List<ActionFilter> getActionFilters() {
public void onIndexModule(IndexModule indexModule) {
PerformanceAnalyzerSearchListener performanceanalyzerSearchListener =
new PerformanceAnalyzerSearchListener(performanceAnalyzerController);
RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener =
new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController);
indexModule.addSearchOperationListener(performanceanalyzerSearchListener);
indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener);
}

// follower check, leader check
Expand Down Expand Up @@ -330,8 +335,9 @@ public void onDiscovery(Discovery discovery) {
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
return singletonList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
return Arrays.asList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController),
new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,13 @@ public boolean isCollectorDisabled(

return disabledCollectorsList.contains(collectorName);
}

/**
* Collectors Setting value.
*
* @return collectorsSettingValue
*/
public int getCollectorsRunModeValue() {
return collectorsSettingValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.shard.SearchOperationListener;
Expand All @@ -16,6 +17,7 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;

Expand All @@ -36,8 +38,16 @@ public String toString() {
return PerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER;
@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.RCA.getValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.listener;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* {@link SearchOperationListener} to capture the resource utilization of a shard search operation.
* This will be getting the resource tracking information from the {@link
* org.opensearch.tasks.TaskResourceTrackingService}.
*/
public class RTFPerformanceAnalyzerSearchListener
implements SearchOperationListener, SearchListener {

private static final Logger LOG =
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String SHARD_FETCH_PHASE = "shard_fetch";
private static final String SHARD_QUERY_PHASE = "shard_query";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
public static final String QUERY_TASK_ID = "query_task_id";
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();

private final PerformanceAnalyzerController controller;
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final int numProcessors;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.cpuUtilizationHistogram =
createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.heapUsedHistogram =
createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}

private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for a search phase",
RTFMetrics.MetricUnits.RATE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(),
"Heap used per shard for a search phase",
RTFMetrics.MetricUnits.BYTE.toString());
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
}

@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
return OpenSearchResources.INSTANCE.getMetricsRegistry() != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.TELEMETRY.getValue());
}

@Override
public void onPreQueryPhase(SearchContext searchContext) {
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void preQueryPhase(SearchContext searchContext) {
threadLocal.get().put(QUERY_START_TIME, System.nanoTime());
threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId());
}

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, true);
}

@Override
public void preFetchPhase(SearchContext searchContext) {
threadLocal.get().put(FETCH_START_TIME, System.nanoTime());
}

@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true);
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext,
long startTime,
long queryTime,
String phase,
boolean isFailed) {
addCompletionListener(searchContext, startTime, queryTime, phase, isFailed);
}

private void addResourceTrackingCompletionListenerForFetchPhase(
SearchContext searchContext,
long fetchStartTime,
long fetchTime,
String phase,
boolean isFailed) {
long startTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, -1l);
/**
* There are scenarios where both query and fetch phases run in the same task for an
* optimization. Adding a special handling for that case to divide the CPU usage between
* these 2 operations by their runTime.
*/
if (queryTaskId == searchContext.getTask().getId()) {
startTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
}
addCompletionListener(searchContext, startTime, fetchTime, phase, isFailed);
}

private void addCompletionListener(
SearchContext searchContext,
long startTime,
long phaseTookTime,
String phase,
boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(searchContext, startTime, phaseTookTime, phase, isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext,
long startTime,
long phaseTookTime,
String phase,
boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
/**
* There are scenarios where cpuUsageTime consists of the total of CPU of multiple
* phases. In that case we are computing the cpuShareFactor by dividing the
* particular phaseTime and the total time till this calculation happen from the
* overall start time.
*/
long totalTime = System.nanoTime() - startTime;
double shareFactor = computeShareFactor(phaseTookTime, totalTime);
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos(),
shareFactor),
createTags());
heapUsedHistogram.record(
Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor),
createTags());
}

private Tags createTags() {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

@Override
protected void innerOnFailure(Exception e) {
LOG.error("Error is executing the the listener", e);
}
};
}

@VisibleForTesting
static double computeShareFactor(long phaseTookTime, long totalTime) {
return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime));
}
}
Loading

0 comments on commit c95feb6

Please sign in to comment.