From d7d566054b887c89707975725daa14da8e94ba2f Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Sat, 20 Jul 2024 17:35:19 +0530 Subject: [PATCH 1/8] Adds the listener for resource utilization metrics Signed-off-by: Gagan Juneja --- .../PerformanceAnalyzerPlugin.java | 10 +- .../config/PerformanceAnalyzerController.java | 9 + .../PerformanceAnalyzerSearchListener.java | 14 +- .../RTFPerformanceAnalyzerSearchListener.java | 255 ++++++++++++++++++ ...rmanceAnalyzerTransportRequestHandler.java | 10 +- ...TFPerformanceAnalyzerTransportChannel.java | 129 +++++++++ ...rformanceAnalyzerTransportInterceptor.java | 34 +++ ...rmanceAnalyzerTransportRequestHandler.java | 124 +++++++++ .../performanceanalyzer/util/Utils.java | 15 ++ .../PerformanceAnalyzerPluginTests.java | 10 +- ...erformanceAnalyzerSearchListenerTests.java | 16 ++ ...erformanceAnalyzerSearchListenerTests.java | 163 +++++++++++ ...eAnalyzerTransportRequestHandlerTests.java | 28 ++ ...formanceAnalyzerTransportChannelTests.java | 82 ++++++ ...eAnalyzerTransportRequestHandlerTests.java | 120 +++++++++ .../performanceanalyzer/util/UtilsTests.java | 27 ++ 16 files changed, 1033 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 143d1f1a..06f19fa2 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -86,7 +86,9 @@ import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction; import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction; import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener; +import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener; import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor; +import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor; import org.opensearch.plugins.ActionPlugin; @@ -302,7 +304,10 @@ public List getActionFilters() { public void onIndexModule(IndexModule indexModule) { PerformanceAnalyzerSearchListener performanceanalyzerSearchListener = new PerformanceAnalyzerSearchListener(performanceAnalyzerController); + RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener = + new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController); indexModule.addSearchOperationListener(performanceanalyzerSearchListener); + indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener); } // follower check, leader check @@ -330,8 +335,9 @@ public void onDiscovery(Discovery discovery) { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { - return singletonList( - new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); + return Arrays.asList( + new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController), + new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index faaebc7c..74cc3e26 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -383,4 +383,13 @@ public boolean isCollectorDisabled( return disabledCollectorsList.contains(collectorName); } + + /** + * Collectors Setting value. + * + * @return collectorsSettingValue + */ + public int getCollectorsSettingValue() { + return collectorsSettingValue; + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index 7719cdd6..f07415f8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -7,6 +7,7 @@ import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.index.shard.SearchOperationListener; @@ -16,6 +17,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.search.internal.SearchContext; @@ -36,8 +38,16 @@ public String toString() { return PerformanceAnalyzerSearchListener.class.getSimpleName(); } - private SearchListener getSearchListener() { - return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER; + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.RCA.getValue()); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java new file mode 100644 index 00000000..059b8750 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -0,0 +1,255 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.index.shard.SearchOperationListener; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * {@link SearchOperationListener} to capture the resource utilization of a shard search operation. + * This will be getting the resource tracking information from the {@link + * org.opensearch.tasks.TaskResourceTrackingService}. + */ +public class RTFPerformanceAnalyzerSearchListener + implements SearchOperationListener, SearchListener { + + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); + private static final String OPERATION_SHARD_FETCH = "shard_fetch"; + private static final String OPERATION_SHARD_QUERY = "shard_query"; + public static final String QUERY_START_TIME = "query_start_time"; + public static final String FETCH_START_TIME = "fetch_start_time"; + private final ThreadLocal> threadLocal; + private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); + + private final PerformanceAnalyzerController controller; + private final Histogram cpuUtilizationHistogram; + private final Histogram heapUsedHistogram; + private final int numProcessors; + + public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { + this.controller = controller; + this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + heapUsedHistogram = createHeapUsedHistogram(); + this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + } + + private Histogram createCPUUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for an operation", + RTFMetrics.MetricUnits.RATE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.HeapValue.HEAP_USED.toString(), + "Heap used per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + @Override + public String toString() { + return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); + } + + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + LOG.debug( + "Controller enable status {}, CollectorMode value {}", + controller.isPerformanceAnalyzerEnabled(), + controller.getCollectorsSettingValue()); + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + @Override + public void onPreQueryPhase(SearchContext searchContext) { + try { + getSearchListener().preQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onQueryPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().queryPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedQueryPhase(SearchContext searchContext) { + try { + getSearchListener().failedQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onPreFetchPhase(SearchContext searchContext) { + try { + getSearchListener().preFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFetchPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().fetchPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedFetchPhase(SearchContext searchContext) { + try { + getSearchListener().failedFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void preQueryPhase(SearchContext searchContext) { + threadLocal.get().put(QUERY_START_TIME, System.nanoTime()); + } + + @Override + public void queryPhase(SearchContext searchContext, long tookInNanos) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, OPERATION_SHARD_QUERY, false); + } + + @Override + public void failedQueryPhase(SearchContext searchContext) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, OPERATION_SHARD_QUERY, true); + } + + @Override + public void preFetchPhase(SearchContext searchContext) { + threadLocal.get().put(FETCH_START_TIME, System.nanoTime()); + } + + @Override + public void fetchPhase(SearchContext searchContext, long tookInNanos) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false); + } + + @Override + public void failedFetchPhase(SearchContext searchContext) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); + } + + private void addResourceTrackingCompletionListener( + SearchContext searchContext, long startTime, String operation, boolean isFailed) { + searchContext + .getTask() + .addResourceTrackingCompletionListener( + createListener( + searchContext, + (System.nanoTime() - startTime), + operation, + isFailed)); + } + + @VisibleForTesting + NotifyOnceListener createListener( + SearchContext searchContext, long totalTime, String operation, boolean isFailed) { + return new NotifyOnceListener() { + @Override + protected void innerOnResponse(Task task) { + LOG.debug("Updating the counter for task {}", task.getId()); + cpuUtilizationHistogram.record( + Utils.calculateCPUUtilization( + numProcessors, + totalTime, + task.getTotalResourceStats().getCpuTimeInNanos()), + createTags()); + heapUsedHistogram.record( + Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags()); + } + + private Tags createTags() { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + + @Override + protected void innerOnFailure(Exception e) { + LOG.error("Error is executing the the listener", e); + } + }; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index a4cd946e..ad517c74 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportChannel; @@ -45,7 +46,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro @VisibleForTesting TransportChannel getChannel(T request, TransportChannel channel, Task task) { - if (!controller.isPerformanceAnalyzerEnabled()) { + if (!isCollectorEnabled()) { return channel; } @@ -56,6 +57,13 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { } } + private boolean isCollectorEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.RCA.getValue()); + } + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { String className = request.getClass().getName(); boolean bPrimary = false; diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java new file mode 100644 index 00000000..40465497 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.management.ThreadMXBean; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +/** + * {@link TransportChannel} implementation to override the sendResponse behavior to have handle of + * the {@link org.opensearch.action.bulk.BulkShardRequest} completion. + */ +public final class RTFPerformanceAnalyzerTransportChannel implements TransportChannel { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportChannel.class); + + private static final ThreadMXBean threadMXBean = + (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private static final String OPERATION_SHARD_BULK = "shardbulk"; + private static final String SHARD_ROLE_PRIMARY = "primary"; + private static final String SHARD_ROLE_REPLICA = "replica"; + + private long cpuStartTime; + private long operationStartTime; + + private Histogram cpuUtilizationHistogram; + + private TransportChannel original; + private String indexName; + private ShardId shardId; + private boolean primary; + + private long threadID; + private int numProcessors; + + void set( + TransportChannel original, + Histogram cpuUtilizationHistogram, + String indexName, + ShardId shardId, + boolean bPrimary) { + this.original = original; + this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexName = indexName; + this.shardId = shardId; + this.primary = bPrimary; + + this.operationStartTime = System.nanoTime(); + threadID = Thread.currentThread().getId(); + this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + LOG.debug("Thread Name {}", Thread.currentThread().getName()); + } + + @Override + public String getProfileName() { + return "RTFPerformanceAnalyzerTransportChannelProfile"; + } + + @Override + public String getChannelType() { + return "RTFPerformanceAnalyzerTransportChannelType"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + emitMetrics(null); + original.sendResponse(response); + } + + @Override + public void sendResponse(Exception exception) throws IOException { + emitMetrics(exception); + original.sendResponse(exception); + } + + private void emitMetrics(Exception exception) { + double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); + recordCPUUtilizationMetric( + shardId, cpuUtilization, OPERATION_SHARD_BULK, exception != null); + } + + private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { + LOG.debug("Completion Thread Name {}", Thread.currentThread().getName()); + long totalCpuTime = + Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime)); + return Utils.calculateCPUUtilization( + numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime); + } + + @VisibleForTesting + void recordCPUUtilizationMetric( + ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { + cpuUtilizationHistogram.record( + cpuUtilization, + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + } + + // This function is called from the security plugin using reflection. Do not + // remove this function without changing the security plugin. + public TransportChannel getInnerChannel() { + return this.original; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java new file mode 100644 index 00000000..5d2de2b6 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * Transport Interceptor to intercept the Indexing requests and populate the resource utilization + * metrics. + */ +public final class RTFPerformanceAnalyzerTransportInterceptor implements TransportInterceptor { + + private final PerformanceAnalyzerController controller; + + public RTFPerformanceAnalyzerTransportInterceptor( + final PerformanceAnalyzerController controller) { + this.controller = controller; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler) { + return new RTFPerformanceAnalyzerTransportRequestHandler<>(actualHandler, controller); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java new file mode 100644 index 00000000..c1e5b55f --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * {@link TransportRequestHandler} implementation to intercept only the {@link BulkShardRequest} and + * skip other transport calls. + * + * @param {@link TransportRequest} + */ +public final class RTFPerformanceAnalyzerTransportRequestHandler + implements TransportRequestHandler { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportRequestHandler.class); + private final PerformanceAnalyzerController controller; + private TransportRequestHandler actualHandler; + boolean logOnce = false; + private final Histogram cpuUtilizationHistogram; + + RTFPerformanceAnalyzerTransportRequestHandler( + TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { + this.actualHandler = actualHandler; + this.controller = controller; + this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + } + + private Histogram createCPUUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for an operation", + "rate"); + } else { + return null; + } + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + actualHandler.messageReceived(request, getChannel(request, channel, task), task); + } + + @VisibleForTesting + TransportChannel getChannel(T request, TransportChannel channel, Task task) { + if (!isCollectorEnabled()) { + return channel; + } + + if (request instanceof ConcreteShardRequest) { + return getShardBulkChannel(request, channel, task); + } else { + return channel; + } + } + + private boolean isCollectorEnabled() { + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { + String className = request.getClass().getName(); + boolean bPrimary = false; + + if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteShardRequest")) { + bPrimary = true; + } else if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteReplicaRequest")) { + bPrimary = false; + } else { + return channel; + } + + TransportRequest transportRequest = ((ConcreteShardRequest) request).getRequest(); + + if (!(transportRequest instanceof BulkShardRequest)) { + return channel; + } + + BulkShardRequest bsr = (BulkShardRequest) transportRequest; + RTFPerformanceAnalyzerTransportChannel rtfPerformanceAnalyzerTransportChannel = + new RTFPerformanceAnalyzerTransportChannel(); + + try { + rtfPerformanceAnalyzerTransportChannel.set( + channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + } catch (Exception ex) { + if (!logOnce) { + LOG.error(ex); + logOnce = true; + } + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + + return rtfPerformanceAnalyzerTransportChannel; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index e4963ebc..aca1b295 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -8,6 +8,8 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; @@ -27,6 +29,7 @@ import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics; public class Utils { + private static final Logger LOG = LogManager.getLogger(Utils.class); public static void configureMetrics() { ServiceMetrics.initStatsReporter(); @@ -108,4 +111,16 @@ public static HashMap getShards() { IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + + public static double calculateCPUUtilization( + int numProcessors, long totalOperationTime, long cpuUsageTime) { + LOG.debug("totalCpuTime {}", cpuUsageTime); + LOG.debug("totalOperationTime {}", totalOperationTime); + LOG.debug("numProcessors {}", numProcessors); + if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { + return 0.0d; + } + double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); + return cpuUsageTime / totalAvailableCPUTime; + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java index fc5f95fa..554eca27 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java @@ -22,7 +22,6 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilter; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -90,12 +89,7 @@ public void setup() { } catch (Exception e) { e.printStackTrace(); } - clusterService = - new ClusterService( - settings, - clusterSettings, - threadPool, - new ClusterManagerMetrics(metricsRegistry)); + clusterService = new ClusterService(settings, clusterSettings, threadPool); identityService = new IdentityService(Settings.EMPTY, List.of()); restController = new RestController( @@ -131,7 +125,7 @@ public void testGetActions() { @Test public void testGetTransportInterceptors() { List list = plugin.getTransportInterceptors(null, null); - assertEquals(1, list.size()); + assertEquals(2, list.size()); assertEquals(PerformanceAnalyzerTransportInterceptor.class, list.get(0).getClass()); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index cfa8bf58..fdd3aa49 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -25,6 +25,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.TestUtil; import org.opensearch.performanceanalyzer.util.Utils; @@ -69,6 +70,21 @@ public void init() { TestUtil.readEvents(); } + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + } + @Test public void testGetMetricsPath() { String expectedPath = diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java new file mode 100644 index 00000000..208b47c6 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFPerformanceAnalyzerSearchListenerTests { + + private RTFPerformanceAnalyzerSearchListener searchListener; + + @Mock private SearchContext searchContext; + @Mock private ShardSearchRequest shardSearchRequest; + @Mock private ShardId shardId; + @Mock private PerformanceAnalyzerController controller; + @Mock private SearchShardTask task; + @Mock private MetricsRegistry metricsRegistry; + @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram heapUsedHistogram; + @Mock private Index index; + + @Mock private TaskResourceUsage taskResourceUsage; + + @BeforeClass + public static void setup() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + } + + @Before + public void init() { + initMocks(this); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(cpuUtilizationHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("heap_used"), Mockito.anyString(), Mockito.eq("B"))) + .thenReturn(heapUsedHistogram); + searchListener = new RTFPerformanceAnalyzerSearchListener(controller); + assertEquals( + RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), + searchListener.toString()); + } + + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + } + + @Test + public void testQueryPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.queryPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testQueryPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.failedQueryPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.failedFetchPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testTaskCompletionListener() { + initializeValidSearchContext(true); + RTFPerformanceAnalyzerSearchListener rtfSearchListener = + new RTFPerformanceAnalyzerSearchListener(controller); + + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(task.getTotalResourceStats()).thenReturn(taskResourceUsage); + Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); + + NotifyOnceListener taskCompletionListener = + rtfSearchListener.createListener(searchContext, 0l, "test", false); + taskCompletionListener.onResponse(task); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } + + private void initializeValidSearchContext(boolean isValid) { + if (isValid) { + Mockito.when(searchContext.request()).thenReturn(shardSearchRequest); + Mockito.when(searchContext.getTask()).thenReturn(task); + Mockito.when(shardSearchRequest.shardId()).thenReturn(shardId); + } else { + Mockito.when(searchContext.request()).thenReturn(null); + } + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java index 56dc289a..e14b778d 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.transport; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; @@ -18,6 +19,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.tasks.Task; @@ -67,4 +69,30 @@ public void testGetChannel() { TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); } + + @Test + public void testGetChannelIfRCAModeIsDisabled() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelIfDualModeIsEnabled() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java new file mode 100644 index 00000000..aa4e425b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.io.IOException; +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +public class RTFPerformanceAnalyzerTransportChannelTests { + private RTFPerformanceAnalyzerTransportChannel channel; + + @Mock private TransportChannel originalChannel; + @Mock private TransportResponse response; + @Mock private Histogram cpuUtilizationHistogram; + private ShardId shardId; + @Mock private ShardId mockedShardId; + @Mock private Index index; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + String indexName = "testIndex"; + shardId = new ShardId(new Index(indexName, "uuid"), 1); + channel = new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); + assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); + assertEquals(originalChannel, channel.getInnerChannel()); + } + + @Test + public void testResponse() throws IOException { + channel.sendResponse(response); + verify(originalChannel).sendResponse(response); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testResponseWithException() throws IOException { + Exception exception = new Exception("dummy exception"); + channel.sendResponse(exception); + verify(originalChannel).sendResponse(exception); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testRecordCPUUtilizationMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java new file mode 100644 index 00000000..7c4ed412 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.bulk.BulkItemRequest; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +@SuppressWarnings("unchecked") +public class RTFPerformanceAnalyzerTransportRequestHandlerTests { + private RTFPerformanceAnalyzerTransportRequestHandler handler; + private ConcreteShardRequest concreteShardRequest; + + @Mock private TransportRequestHandler transportRequestHandler; + @Mock private PerformanceAnalyzerController controller; + @Mock private TransportChannel channel; + @Mock private TransportRequest request; + @Mock private BulkShardRequest bulkShardRequest; + @Mock private Task task; + @Mock private ShardId shardId; + @Mock private MetricsRegistry metricsRegistry; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + handler = + new RTFPerformanceAnalyzerTransportRequestHandler( + transportRequestHandler, controller); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + } + + @Test + public void testMessageReceived() throws Exception { + handler.messageReceived(request, channel, task); + verify(transportRequestHandler).messageReceived(request, channel, task); + } + + @Test + public void testGetChannel() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelTelemetryIsDisabled() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelDualMode() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelMetricRegistryIsNull() { + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java new file mode 100644 index 00000000..2ee91987 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.util; + +import org.junit.Assert; +import org.junit.Test; + +public class UtilsTests { + + @Test + public void testCPUUtilization() { + Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0); + Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0); + Assert.assertEquals( + Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0); + } + + @Test + public void testCPUUtilizationZeroValue() { + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0); + } +} From aad2bfd3476253e4db85acd97a8142c68f1d1dcd Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 22 Jul 2024 19:46:21 +0530 Subject: [PATCH 2/8] Fixes the same task issue Signed-off-by: Gagan Juneja --- .../RTFPerformanceAnalyzerSearchListener.java | 81 ++++++++++++++++--- ...TFPerformanceAnalyzerTransportChannel.java | 2 +- .../performanceanalyzer/util/Utils.java | 7 +- ...erformanceAnalyzerSearchListenerTests.java | 14 +++- .../performanceanalyzer/util/UtilsTests.java | 17 ++-- 5 files changed, 99 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 059b8750..ea326b81 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -40,6 +40,8 @@ public class RTFPerformanceAnalyzerSearchListener private static final String OPERATION_SHARD_QUERY = "shard_query"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; + public static final String QUERY_TIME = "query_time"; + public static final String QUERY_TASK_ID = "query_task_id"; private final ThreadLocal> threadLocal; private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); @@ -73,7 +75,7 @@ private Histogram createHeapUsedHistogram() { MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); if (metricsRegistry != null) { return metricsRegistry.createHistogram( - RTFMetrics.HeapValue.HEAP_USED.toString(), + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), "Heap used per shard for an operation", RTFMetrics.MetricUnits.BYTE.toString()); } else { @@ -167,20 +169,24 @@ public void onFailedFetchPhase(SearchContext searchContext) { @Override public void preQueryPhase(SearchContext searchContext) { threadLocal.get().put(QUERY_START_TIME, System.nanoTime()); + threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId()); } @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryTime = (System.nanoTime() - queryStartTime); + threadLocal.get().put(QUERY_TIME, queryTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, OPERATION_SHARD_QUERY, false); + searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); } @Override public void failedQueryPhase(SearchContext searchContext) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, OPERATION_SHARD_QUERY, true); + searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); } @Override @@ -191,44 +197,92 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); - addResourceTrackingCompletionListener( + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false); } @Override public void failedFetchPhase(SearchContext searchContext) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); - addResourceTrackingCompletionListener( + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); } private void addResourceTrackingCompletionListener( - SearchContext searchContext, long startTime, String operation, boolean isFailed) { + SearchContext searchContext, + long startTime, + long queryTime, + String operation, + boolean isFailed) { + addCompletionListener(searchContext, startTime, queryTime, operation, isFailed); + } + + private void addResourceTrackingCompletionListenerForFetchPhase( + SearchContext searchContext, long fetchStartTime, String operation, boolean isFailed) { + long startTime = fetchStartTime; + long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); + /** + * There are scenarios where both query and fetch pahses run in the same task for an + * optimization. Adding a special handling for that case to divide the CPU usage between + * these 2 operations by their runTime. + */ + if (queryTaskId == searchContext.getTask().getId()) { + startTime = threadLocal.get().getOrDefault(QUERY_TIME, 0l); + } + long fetchTime = System.nanoTime() - fetchStartTime; + addCompletionListener(searchContext, startTime, fetchTime, operation, isFailed); + } + + private void addCompletionListener( + SearchContext searchContext, + long overallStartTime, + long operationTime, + String operation, + boolean isFailed) { searchContext .getTask() .addResourceTrackingCompletionListener( createListener( searchContext, - (System.nanoTime() - startTime), + overallStartTime, + operationTime, operation, isFailed)); } @VisibleForTesting NotifyOnceListener createListener( - SearchContext searchContext, long totalTime, String operation, boolean isFailed) { + SearchContext searchContext, + long overallStartTime, + long totalOperationTime, + String operation, + boolean isFailed) { return new NotifyOnceListener() { @Override protected void innerOnResponse(Task task) { LOG.debug("Updating the counter for task {}", task.getId()); + /** + * There are scenarios where cpuUsageTime consists of the total of CPU of multiple + * operations. In that case we are computing the cpuShareFactor by dividing the + * particular operationTime and the total time till this calculation happen from the + * overall start time. + */ + double operationShareFactor = + computeShareFactor( + totalOperationTime, System.nanoTime() - overallStartTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, - totalTime, - task.getTotalResourceStats().getCpuTimeInNanos()), + totalOperationTime, + task.getTotalResourceStats().getCpuTimeInNanos(), + operationShareFactor), createTags()); heapUsedHistogram.record( - Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags()); + Math.max( + 0, + task.getTotalResourceStats().getMemoryInBytes() + * operationShareFactor), + createTags()); } private Tags createTags() { @@ -252,4 +306,9 @@ protected void innerOnFailure(Exception e) { } }; } + + @VisibleForTesting + static double computeShareFactor(long totalOperationTime, long totalTime) { + return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime)); + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 40465497..3cc4b353 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -98,7 +98,7 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi long totalCpuTime = Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime)); return Utils.calculateCPUUtilization( - numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime); + numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } @VisibleForTesting diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index aca1b295..946a52bd 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -113,14 +113,15 @@ public static HashMap getShards() { IndexShardState.STARTED); public static double calculateCPUUtilization( - int numProcessors, long totalOperationTime, long cpuUsageTime) { + int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { + LOG.debug("numProcessors {}", numProcessors); + LOG.debug("cpuShareFactor {}", cpuShareFactor); LOG.debug("totalCpuTime {}", cpuUsageTime); LOG.debug("totalOperationTime {}", totalOperationTime); - LOG.debug("numProcessors {}", numProcessors); if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { return 0.0d; } double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); - return cpuUsageTime / totalAvailableCPUTime; + return cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 208b47c6..9629abb2 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -131,6 +131,18 @@ public void testFetchPhaseFailed() { Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); } + @Test + public void testOperationShareFactor() { + assertEquals( + Double.valueOf(10.0 / 15), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15), + 0); + assertEquals( + Double.valueOf(1), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10), + 0); + } + @Test public void testTaskCompletionListener() { initializeValidSearchContext(true); @@ -144,7 +156,7 @@ public void testTaskCompletionListener() { Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); NotifyOnceListener taskCompletionListener = - rtfSearchListener.createListener(searchContext, 0l, "test", false); + rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java index 2ee91987..b1e2490e 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java @@ -12,16 +12,21 @@ public class UtilsTests { @Test public void testCPUUtilization() { - Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0); - Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0); + Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5, 1.0), 0.0); + Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5, 1.0), 0.0); Assert.assertEquals( - Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0); + Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10, 1.0), 0.0); + Assert.assertEquals( + Double.valueOf(0.50 * (20 / 30.0)), + Utils.calculateCPUUtilization(3, 10, 20, 0.5d), + 0.0); } @Test public void testCPUUtilizationZeroValue() { - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0); - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0); - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 0.0), 0.0); } } From cf42943a33540456cfb39600da0aa92175f1943d Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 22 Jul 2024 19:57:48 +0530 Subject: [PATCH 3/8] Fixes the failing test Signed-off-by: Gagan Juneja --- .../listener/RTFPerformanceAnalyzerSearchListenerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 9629abb2..0c19492a 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -60,13 +60,13 @@ public void init() { Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); Mockito.when( metricsRegistry.createHistogram( - Mockito.eq("CPU_Utilization"), + Mockito.eq("cpu_utilization"), Mockito.anyString(), Mockito.eq("rate"))) .thenReturn(cpuUtilizationHistogram); Mockito.when( metricsRegistry.createHistogram( - Mockito.eq("heap_used"), Mockito.anyString(), Mockito.eq("B"))) + Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) .thenReturn(heapUsedHistogram); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( From 1cb5c43a5867f50f44e11bd61908aa90e380ced0 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 06:23:09 +0530 Subject: [PATCH 4/8] Fixes the failing test Signed-off-by: Gagan Juneja --- .../RTFPerformanceAnalyzerSearchListener.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index ea326b81..f88870ef 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -40,7 +40,6 @@ public class RTFPerformanceAnalyzerSearchListener private static final String OPERATION_SHARD_QUERY = "shard_query"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; - public static final String QUERY_TIME = "query_time"; public static final String QUERY_TASK_ID = "query_task_id"; private final ThreadLocal> threadLocal; private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); @@ -176,7 +175,6 @@ public void preQueryPhase(SearchContext searchContext) { public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); long queryTime = (System.nanoTime() - queryStartTime); - threadLocal.get().put(QUERY_TIME, queryTime); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); } @@ -197,15 +195,17 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false); + searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false); } @Override public void failedFetchPhase(SearchContext searchContext) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); + searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true); } private void addResourceTrackingCompletionListener( @@ -218,8 +218,8 @@ private void addResourceTrackingCompletionListener( } private void addResourceTrackingCompletionListenerForFetchPhase( - SearchContext searchContext, long fetchStartTime, String operation, boolean isFailed) { - long startTime = fetchStartTime; + SearchContext searchContext, long fetchStartTime, long fetchTime, String operation, boolean isFailed) { + long overallStartTime = fetchStartTime; long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); /** * There are scenarios where both query and fetch pahses run in the same task for an @@ -227,10 +227,9 @@ private void addResourceTrackingCompletionListenerForFetchPhase( * these 2 operations by their runTime. */ if (queryTaskId == searchContext.getTask().getId()) { - startTime = threadLocal.get().getOrDefault(QUERY_TIME, 0l); + overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); } - long fetchTime = System.nanoTime() - fetchStartTime; - addCompletionListener(searchContext, startTime, fetchTime, operation, isFailed); + addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed); } private void addCompletionListener( @@ -267,9 +266,10 @@ protected void innerOnResponse(Task task) { * particular operationTime and the total time till this calculation happen from the * overall start time. */ + long totalTime = System.nanoTime() - overallStartTime; double operationShareFactor = computeShareFactor( - totalOperationTime, System.nanoTime() - overallStartTime); + totalOperationTime, totalTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, From b137a47026792df7f79a0a33f405da82f8dfe916 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 06:25:42 +0530 Subject: [PATCH 5/8] Fixes the failing test Signed-off-by: Gagan Juneja --- .../opensearch/performanceanalyzer/util/Utils.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 946a52bd..97eeb460 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -114,14 +114,16 @@ public static HashMap getShards() { public static double calculateCPUUtilization( int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { - LOG.debug("numProcessors {}", numProcessors); - LOG.debug("cpuShareFactor {}", cpuShareFactor); - LOG.debug("totalCpuTime {}", cpuUsageTime); - LOG.debug("totalOperationTime {}", totalOperationTime); + LOG.debug("CPUUtilization calculation - numProcessors {}", numProcessors); + LOG.debug("CPUUtilization calculation - cpuShareFactor {}", cpuShareFactor); + LOG.debug("CPUUtilization calculation - totalCpuTime {}", cpuUsageTime); + LOG.debug("CPUUtilization calculation - totalOperationTime {}", totalOperationTime); if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { return 0.0d; } double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); - return cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); + double cpuUtil = cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); + LOG.debug("CPUUtilization calculation - cpuUtil {}", cpuUtil); + return cpuUtil; } } From 68996237967ea62b77270b268ab5b0c9a1f9fe5c Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 07:25:04 +0530 Subject: [PATCH 6/8] Fixes the failing test Signed-off-by: Gagan Juneja --- .../RTFPerformanceAnalyzerSearchListener.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index f88870ef..123d994c 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -218,7 +218,11 @@ private void addResourceTrackingCompletionListener( } private void addResourceTrackingCompletionListenerForFetchPhase( - SearchContext searchContext, long fetchStartTime, long fetchTime, String operation, boolean isFailed) { + SearchContext searchContext, + long fetchStartTime, + long fetchTime, + String operation, + boolean isFailed) { long overallStartTime = fetchStartTime; long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); /** @@ -267,13 +271,11 @@ protected void innerOnResponse(Task task) { * overall start time. */ long totalTime = System.nanoTime() - overallStartTime; - double operationShareFactor = - computeShareFactor( - totalOperationTime, totalTime); + double operationShareFactor = computeShareFactor(totalOperationTime, totalTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, - totalOperationTime, + totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), operationShareFactor), createTags()); From 34ebcc44fd24190e3822b325620e3ce4bba62ab8 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 19:41:33 +0530 Subject: [PATCH 7/8] Addresses the review comments Signed-off-by: Gagan Juneja --- .../config/PerformanceAnalyzerController.java | 2 +- .../PerformanceAnalyzerSearchListener.java | 4 +-- .../RTFPerformanceAnalyzerSearchListener.java | 32 ++++++++----------- ...rmanceAnalyzerTransportRequestHandler.java | 4 +-- ...rmanceAnalyzerTransportRequestHandler.java | 8 ++--- .../performanceanalyzer/util/Utils.java | 29 ++++++++++++++--- ...erformanceAnalyzerSearchListenerTests.java | 6 ++-- ...erformanceAnalyzerSearchListenerTests.java | 14 ++++---- ...eAnalyzerTransportRequestHandlerTests.java | 4 +-- ...eAnalyzerTransportRequestHandlerTests.java | 8 ++--- 10 files changed, 63 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index 74cc3e26..6915e0fe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -389,7 +389,7 @@ public boolean isCollectorDisabled( * * @return collectorsSettingValue */ - public int getCollectorsSettingValue() { + public int getCollectorsRunModeValue() { return collectorsSettingValue; } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index f07415f8..99d46e3d 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -45,8 +45,8 @@ SearchListener getSearchListener() { private boolean isSearchListenerEnabled() { return controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.RCA.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 123d994c..a636444b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -51,14 +51,15 @@ public class RTFPerformanceAnalyzerSearchListener public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { this.controller = controller; - this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); - heapUsedHistogram = createHeapUsedHistogram(); + this.cpuUtilizationHistogram = + createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.heapUsedHistogram = + createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); this.numProcessors = Runtime.getRuntime().availableProcessors(); } - private Histogram createCPUUtilizationHistogram() { - MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), @@ -70,8 +71,7 @@ private Histogram createCPUUtilizationHistogram() { } } - private Histogram createHeapUsedHistogram() { - MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), @@ -94,14 +94,10 @@ SearchListener getSearchListener() { } private boolean isSearchListenerEnabled() { - LOG.debug( - "Controller enable status {}, CollectorMode value {}", - controller.isPerformanceAnalyzerEnabled(), - controller.getCollectorsSettingValue()); return OpenSearchResources.INSTANCE.getMetricsRegistry() != null && controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.TELEMETRY.getValue()); } @@ -173,7 +169,7 @@ public void preQueryPhase(SearchContext searchContext) { @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { - long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); @@ -181,7 +177,7 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) { @Override public void failedQueryPhase(SearchContext searchContext) { - long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); @@ -194,7 +190,7 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { - long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false); @@ -202,7 +198,7 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { @Override public void failedFetchPhase(SearchContext searchContext) { - long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true); @@ -226,12 +222,12 @@ private void addResourceTrackingCompletionListenerForFetchPhase( long overallStartTime = fetchStartTime; long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); /** - * There are scenarios where both query and fetch pahses run in the same task for an + * There are scenarios where both query and fetch phases run in the same task for an * optimization. Adding a special handling for that case to divide the CPU usage between * these 2 operations by their runTime. */ if (queryTaskId == searchContext.getTask().getId()) { - overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); } addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index ad517c74..64052ab1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -59,8 +59,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { private boolean isCollectorEnabled() { return controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.RCA.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index c1e5b55f..82a0abe6 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -36,7 +36,7 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; - boolean logOnce = false; + private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; RTFPerformanceAnalyzerTransportRequestHandler( @@ -52,7 +52,7 @@ private Histogram createCPUUtilizationHistogram() { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), "CPU Utilization per shard for an operation", - "rate"); + RTFMetrics.MetricUnits.RATE.toString()); } else { return null; } @@ -79,8 +79,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { private boolean isCollectorEnabled() { return OpenSearchResources.INSTANCE.getMetricsRegistry() != null && controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.TELEMETRY.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 97eeb460..ebf440eb 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -112,18 +112,37 @@ public static HashMap getShards() { IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + /** + * CPU Utilization is the time spend in CPU cycles divide by the total time cpu available time. + * Total cpu available time would be the multiplication of num of processors and the process + * time. It also takes into account the cpuShareFactor in case some adjustments are needed. + * + * @param numProcessors + * @param totalOperationTime + * @param cpuUsageTime + * @param cpuShareFactor + * @return + */ public static double calculateCPUUtilization( int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { - LOG.debug("CPUUtilization calculation - numProcessors {}", numProcessors); - LOG.debug("CPUUtilization calculation - cpuShareFactor {}", cpuShareFactor); - LOG.debug("CPUUtilization calculation - totalCpuTime {}", cpuUsageTime); - LOG.debug("CPUUtilization calculation - totalOperationTime {}", totalOperationTime); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with numProcessors: {}", + numProcessors); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with cpuShareFactor {}", + cpuShareFactor); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalCpuTime {}", + cpuUsageTime); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalOperationTime {}", + totalOperationTime); if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { return 0.0d; } double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); double cpuUtil = cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); - LOG.debug("CPUUtilization calculation - cpuUtil {}", cpuUtil); + LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil); return cpuUtil; } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index fdd3aa49..f7204783 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -72,15 +72,15 @@ public void init() { @Test public void tesSearchListener() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 0c19492a..16aba4bc 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -76,16 +76,16 @@ public void init() { @Test public void tesSearchListener() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); assertTrue( searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); assertTrue( searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); @@ -94,7 +94,7 @@ public void tesSearchListener() { @Test public void testQueryPhase() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); @@ -104,7 +104,7 @@ public void testQueryPhase() { @Test public void testQueryPhaseFailed() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preQueryPhase(searchContext); searchListener.failedQueryPhase(searchContext); @@ -114,7 +114,7 @@ public void testQueryPhaseFailed() { @Test public void testFetchPhase() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); @@ -124,7 +124,7 @@ public void testFetchPhase() { @Test public void testFetchPhaseFailed() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preFetchPhase(searchContext); searchListener.failedFetchPhase(searchContext); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java index e14b778d..099390fa 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java @@ -72,7 +72,7 @@ public void testGetChannel() { @Test public void testGetChannelIfRCAModeIsDisabled() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -85,7 +85,7 @@ public void testGetChannelIfRCAModeIsDisabled() { @Test public void testGetChannelIfDualModeIsEnabled() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java index 7c4ed412..9a0f9d0e 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -65,7 +65,7 @@ public void testMessageReceived() throws Exception { @Test public void testGetChannel() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -79,7 +79,7 @@ public void testGetChannel() { @Test public void testGetChannelTelemetryIsDisabled() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -93,7 +93,7 @@ public void testGetChannelTelemetryIsDisabled() { @Test public void testGetChannelDualMode() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -107,7 +107,7 @@ public void testGetChannelDualMode() { @Test public void testGetChannelMetricRegistryIsNull() { OpenSearchResources.INSTANCE.setMetricsRegistry(null); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); From 590ba5d916de80b9698ef7374ea04a182c4c32f6 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 20:51:32 +0530 Subject: [PATCH 8/8] Addresses the review comments Signed-off-by: Gagan Juneja --- .../RTFPerformanceAnalyzerSearchListener.java | 70 ++++++++----------- ...TFPerformanceAnalyzerTransportChannel.java | 9 ++- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index a636444b..6b7921cc 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -36,8 +36,8 @@ public class RTFPerformanceAnalyzerSearchListener private static final Logger LOG = LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); - private static final String OPERATION_SHARD_FETCH = "shard_fetch"; - private static final String OPERATION_SHARD_QUERY = "shard_query"; + private static final String SHARD_FETCH_PHASE = "shard_fetch"; + private static final String SHARD_QUERY_PHASE = "shard_query"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; public static final String QUERY_TASK_ID = "query_task_id"; @@ -63,7 +63,7 @@ private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), - "CPU Utilization per shard for an operation", + "CPU Utilization per shard for a search phase", RTFMetrics.MetricUnits.RATE.toString()); } else { LOG.debug("MetricsRegistry is null"); @@ -75,7 +75,7 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), - "Heap used per shard for an operation", + "Heap used per shard for a search phase", RTFMetrics.MetricUnits.BYTE.toString()); } else { LOG.debug("MetricsRegistry is null"); @@ -172,7 +172,7 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); } @Override @@ -180,7 +180,7 @@ public void failedQueryPhase(SearchContext searchContext) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, true); } @Override @@ -193,7 +193,7 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false); + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); } @Override @@ -201,60 +201,55 @@ public void failedFetchPhase(SearchContext searchContext) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true); + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true); } private void addResourceTrackingCompletionListener( SearchContext searchContext, long startTime, long queryTime, - String operation, + String phase, boolean isFailed) { - addCompletionListener(searchContext, startTime, queryTime, operation, isFailed); + addCompletionListener(searchContext, startTime, queryTime, phase, isFailed); } private void addResourceTrackingCompletionListenerForFetchPhase( SearchContext searchContext, long fetchStartTime, long fetchTime, - String operation, + String phase, boolean isFailed) { - long overallStartTime = fetchStartTime; - long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); + long startTime = fetchStartTime; + long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, -1l); /** * There are scenarios where both query and fetch phases run in the same task for an * optimization. Adding a special handling for that case to divide the CPU usage between * these 2 operations by their runTime. */ if (queryTaskId == searchContext.getTask().getId()) { - overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); + startTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); } - addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed); + addCompletionListener(searchContext, startTime, fetchTime, phase, isFailed); } private void addCompletionListener( SearchContext searchContext, - long overallStartTime, - long operationTime, - String operation, + long startTime, + long phaseTookTime, + String phase, boolean isFailed) { searchContext .getTask() .addResourceTrackingCompletionListener( - createListener( - searchContext, - overallStartTime, - operationTime, - operation, - isFailed)); + createListener(searchContext, startTime, phaseTookTime, phase, isFailed)); } @VisibleForTesting NotifyOnceListener createListener( SearchContext searchContext, - long overallStartTime, - long totalOperationTime, - String operation, + long startTime, + long phaseTookTime, + String phase, boolean isFailed) { return new NotifyOnceListener() { @Override @@ -262,24 +257,21 @@ protected void innerOnResponse(Task task) { LOG.debug("Updating the counter for task {}", task.getId()); /** * There are scenarios where cpuUsageTime consists of the total of CPU of multiple - * operations. In that case we are computing the cpuShareFactor by dividing the - * particular operationTime and the total time till this calculation happen from the + * phases. In that case we are computing the cpuShareFactor by dividing the + * particular phaseTime and the total time till this calculation happen from the * overall start time. */ - long totalTime = System.nanoTime() - overallStartTime; - double operationShareFactor = computeShareFactor(totalOperationTime, totalTime); + long totalTime = System.nanoTime() - startTime; + double shareFactor = computeShareFactor(phaseTookTime, totalTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), - operationShareFactor), + shareFactor), createTags()); heapUsedHistogram.record( - Math.max( - 0, - task.getTotalResourceStats().getMemoryInBytes() - * operationShareFactor), + Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), createTags()); } @@ -294,7 +286,7 @@ private Tags createTags() { .addTag( RTFMetrics.CommonDimension.SHARD_ID.toString(), searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); } @@ -306,7 +298,7 @@ protected void innerOnFailure(Exception e) { } @VisibleForTesting - static double computeShareFactor(long totalOperationTime, long totalTime) { - return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime)); + static double computeShareFactor(long phaseTookTime, long totalTime) { + return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 3cc4b353..6eb64185 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -77,20 +77,19 @@ public String getChannelType() { @Override public void sendResponse(TransportResponse response) throws IOException { - emitMetrics(null); + emitMetrics(false); original.sendResponse(response); } @Override public void sendResponse(Exception exception) throws IOException { - emitMetrics(exception); + emitMetrics(true); original.sendResponse(exception); } - private void emitMetrics(Exception exception) { + private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); - recordCPUUtilizationMetric( - shardId, cpuUtilization, OPERATION_SHARD_BULK, exception != null); + recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) {