From 764f7982490e6bc0f65e896d8a6df855779ba475 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 23 Jul 2024 22:41:12 +0530 Subject: [PATCH 1/2] [Feature] adds index_uuid as a tag in node stats all shard metrics (#680) (#689) * Adds index_uuid as a dimension for node stats col Signed-off-by: Atharva Sharma * updates gauge usage and added index_uuid as tag Signed-off-by: Atharva Sharma * removed incomplete collector Signed-off-by: Atharva Sharma * fixed UTs after gauge changes Signed-off-by: Atharva Sharma * addressed comments Signed-off-by: Atharva Sharma * reverted gauge related changes Signed-off-by: Atharva Sharma --------- Signed-off-by: Atharva Sharma (cherry picked from commit 948b54a7f11820f2e5753036545a190d4b4a554b) Co-authored-by: Atharva Sharma <60044988+atharvasharma61@users.noreply.github.com> --- ...RTFNodeStatsAllShardsMetricsCollector.java | 30 +++++++++++-------- ...deStatsAllShardsMetricsCollectorTests.java | 4 +-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java index 63033dba..6b32b3c5 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -143,8 +143,7 @@ configOverridesWrapper, getCollectorName())) { // Populating value for the first run. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId.getIndexName(), - String.valueOf(shardId.id())); + shardId); continue; } ShardStats prevShardStats = prevPerShardStats.get(shardId); @@ -153,15 +152,14 @@ configOverridesWrapper, getCollectorName())) { // run. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId.getIndexName(), - String.valueOf(shardId.id())); + shardId); continue; } NodeStatsMetricsAllShardsPerCollectionStatus prevValue = new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); NodeStatsMetricsAllShardsPerCollectionStatus currValue = new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); - populateDiffMetricValue(prevValue, currValue, shardId.getIndexName(), shardId.id()); + populateDiffMetricValue(prevValue, currValue, shardId); } } @@ -243,13 +241,20 @@ public void populatePerShardStats(IndicesService indicesService) { } private void recordMetrics( - NodeStatsMetricsAllShardsPerCollectionStatus metrics, - String indexName, - String shardId) { + NodeStatsMetricsAllShardsPerCollectionStatus metrics, ShardId shardId) { Tags nodeStatsMetricsTag = Tags.create() - .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), indexName) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId); + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + + if (shardId.getIndex() != null) { + nodeStatsMetricsTag.addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), shardId.getIndex().getUUID()); + } cacheQueryMissMetrics.add(metrics.getQueryCacheMissCount(), nodeStatsMetricsTag); cacheQuerySizeMetrics.add(metrics.getQueryCacheInBytes(), nodeStatsMetricsTag); @@ -267,8 +272,7 @@ private void recordMetrics( public void populateDiffMetricValue( NodeStatsMetricsAllShardsPerCollectionStatus prevValue, NodeStatsMetricsAllShardsPerCollectionStatus currValue, - String indexName, - int shardId) { + ShardId shardId) { NodeStatsMetricsAllShardsPerCollectionStatus metrics = new NodeStatsMetricsAllShardsPerCollectionStatus( @@ -289,7 +293,7 @@ public void populateDiffMetricValue( 0), currValue.requestCacheInBytes); - recordMetrics(metrics, indexName, String.valueOf(shardId)); + recordMetrics(metrics, shardId); } public static class NodeStatsMetricsAllShardsPerCollectionStatus extends MetricStatus { diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java index 089a685b..918c766a 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java @@ -120,11 +120,11 @@ public void testCollectMetrics() throws IOException { createIndex(TEST_INDEX); rtfNodeStatsAllShardsMetricsCollector.collectMetrics(startTimeInMills); verify(rtfNodeStatsAllShardsMetricsCollector, never()) - .populateDiffMetricValue(any(), any(), anyString(), anyInt()); + .populateDiffMetricValue(any(), any(), any()); startTimeInMills += 500; rtfNodeStatsAllShardsMetricsCollector.collectMetrics(startTimeInMills); verify(rtfNodeStatsAllShardsMetricsCollector, times(1)) - .populateDiffMetricValue(any(), any(), anyString(), anyInt()); + .populateDiffMetricValue(any(), any(), any()); verify(cacheFieldDataEvictionCounter, atLeastOnce()).add(anyDouble(), any()); verify(cacheFieldDataSizeCounter, atLeastOnce()).add(anyDouble(), any()); verify(cacheQueryMissCounter, atLeastOnce()).add(anyDouble(), any()); From 7f11e21191463c9a9de7a8c85fac0f909bdc67b8 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 23 Jul 2024 22:43:05 +0530 Subject: [PATCH 2/2] Adds the listener for resource utilization metrics (#687) (#688) * Adds the listener for resource utilization metrics Signed-off-by: Gagan Juneja Co-authored-by: Gagan Juneja (cherry picked from commit c95feb6377205e7f4ec64c3402f7c07a3f3edd02) Co-authored-by: Gagan Juneja --- .../PerformanceAnalyzerPlugin.java | 10 +- .../config/PerformanceAnalyzerController.java | 9 + .../PerformanceAnalyzerSearchListener.java | 14 +- .../RTFPerformanceAnalyzerSearchListener.java | 304 ++++++++++++++++++ ...rmanceAnalyzerTransportRequestHandler.java | 10 +- ...TFPerformanceAnalyzerTransportChannel.java | 128 ++++++++ ...rformanceAnalyzerTransportInterceptor.java | 34 ++ ...rmanceAnalyzerTransportRequestHandler.java | 124 +++++++ .../performanceanalyzer/util/Utils.java | 37 +++ .../PerformanceAnalyzerPluginTests.java | 10 +- ...erformanceAnalyzerSearchListenerTests.java | 16 + ...erformanceAnalyzerSearchListenerTests.java | 175 ++++++++++ ...eAnalyzerTransportRequestHandlerTests.java | 28 ++ ...formanceAnalyzerTransportChannelTests.java | 82 +++++ ...eAnalyzerTransportRequestHandlerTests.java | 120 +++++++ .../performanceanalyzer/util/UtilsTests.java | 32 ++ 16 files changed, 1120 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 28421766..9669a96b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -85,7 +85,9 @@ import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction; import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction; import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener; +import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener; import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor; +import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor; import org.opensearch.plugins.ActionPlugin; @@ -298,7 +300,10 @@ public List getActionFilters() { public void onIndexModule(IndexModule indexModule) { PerformanceAnalyzerSearchListener performanceanalyzerSearchListener = new PerformanceAnalyzerSearchListener(performanceAnalyzerController); + RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener = + new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController); indexModule.addSearchOperationListener(performanceanalyzerSearchListener); + indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener); } // follower check, leader check @@ -326,8 +331,9 @@ public void onDiscovery(Discovery discovery) { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { - return singletonList( - new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); + return Arrays.asList( + new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController), + new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index faaebc7c..6915e0fe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -383,4 +383,13 @@ public boolean isCollectorDisabled( return disabledCollectorsList.contains(collectorName); } + + /** + * Collectors Setting value. + * + * @return collectorsSettingValue + */ + public int getCollectorsRunModeValue() { + return collectorsSettingValue; + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index 7719cdd6..99d46e3d 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -7,6 +7,7 @@ import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.index.shard.SearchOperationListener; @@ -16,6 +17,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.search.internal.SearchContext; @@ -36,8 +38,16 @@ public String toString() { return PerformanceAnalyzerSearchListener.class.getSimpleName(); } - private SearchListener getSearchListener() { - return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER; + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() + == Util.CollectorMode.RCA.getValue()); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java new file mode 100644 index 00000000..6b7921cc --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -0,0 +1,304 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.index.shard.SearchOperationListener; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * {@link SearchOperationListener} to capture the resource utilization of a shard search operation. + * This will be getting the resource tracking information from the {@link + * org.opensearch.tasks.TaskResourceTrackingService}. + */ +public class RTFPerformanceAnalyzerSearchListener + implements SearchOperationListener, SearchListener { + + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); + private static final String SHARD_FETCH_PHASE = "shard_fetch"; + private static final String SHARD_QUERY_PHASE = "shard_query"; + public static final String QUERY_START_TIME = "query_start_time"; + public static final String FETCH_START_TIME = "fetch_start_time"; + public static final String QUERY_TASK_ID = "query_task_id"; + private final ThreadLocal> threadLocal; + private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); + + private final PerformanceAnalyzerController controller; + private final Histogram cpuUtilizationHistogram; + private final Histogram heapUsedHistogram; + private final int numProcessors; + + public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { + this.controller = controller; + this.cpuUtilizationHistogram = + createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.heapUsedHistogram = + createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + } + + private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for a search phase", + RTFMetrics.MetricUnits.RATE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), + "Heap used per shard for a search phase", + RTFMetrics.MetricUnits.BYTE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + @Override + public String toString() { + return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); + } + + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + @Override + public void onPreQueryPhase(SearchContext searchContext) { + try { + getSearchListener().preQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onQueryPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().queryPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedQueryPhase(SearchContext searchContext) { + try { + getSearchListener().failedQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onPreFetchPhase(SearchContext searchContext) { + try { + getSearchListener().preFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFetchPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().fetchPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedFetchPhase(SearchContext searchContext) { + try { + getSearchListener().failedFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void preQueryPhase(SearchContext searchContext) { + threadLocal.get().put(QUERY_START_TIME, System.nanoTime()); + threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId()); + } + + @Override + public void queryPhase(SearchContext searchContext, long tookInNanos) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); + long queryTime = (System.nanoTime() - queryStartTime); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); + } + + @Override + public void failedQueryPhase(SearchContext searchContext) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); + long queryTime = (System.nanoTime() - queryStartTime); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, true); + } + + @Override + public void preFetchPhase(SearchContext searchContext) { + threadLocal.get().put(FETCH_START_TIME, System.nanoTime()); + } + + @Override + public void fetchPhase(SearchContext searchContext, long tookInNanos) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); + long fetchTime = (System.nanoTime() - fetchStartTime); + addResourceTrackingCompletionListenerForFetchPhase( + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); + } + + @Override + public void failedFetchPhase(SearchContext searchContext) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); + long fetchTime = (System.nanoTime() - fetchStartTime); + addResourceTrackingCompletionListenerForFetchPhase( + searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, true); + } + + private void addResourceTrackingCompletionListener( + SearchContext searchContext, + long startTime, + long queryTime, + String phase, + boolean isFailed) { + addCompletionListener(searchContext, startTime, queryTime, phase, isFailed); + } + + private void addResourceTrackingCompletionListenerForFetchPhase( + SearchContext searchContext, + long fetchStartTime, + long fetchTime, + String phase, + boolean isFailed) { + long startTime = fetchStartTime; + long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, -1l); + /** + * There are scenarios where both query and fetch phases run in the same task for an + * optimization. Adding a special handling for that case to divide the CPU usage between + * these 2 operations by their runTime. + */ + if (queryTaskId == searchContext.getTask().getId()) { + startTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); + } + addCompletionListener(searchContext, startTime, fetchTime, phase, isFailed); + } + + private void addCompletionListener( + SearchContext searchContext, + long startTime, + long phaseTookTime, + String phase, + boolean isFailed) { + searchContext + .getTask() + .addResourceTrackingCompletionListener( + createListener(searchContext, startTime, phaseTookTime, phase, isFailed)); + } + + @VisibleForTesting + NotifyOnceListener createListener( + SearchContext searchContext, + long startTime, + long phaseTookTime, + String phase, + boolean isFailed) { + return new NotifyOnceListener() { + @Override + protected void innerOnResponse(Task task) { + LOG.debug("Updating the counter for task {}", task.getId()); + /** + * There are scenarios where cpuUsageTime consists of the total of CPU of multiple + * phases. In that case we are computing the cpuShareFactor by dividing the + * particular phaseTime and the total time till this calculation happen from the + * overall start time. + */ + long totalTime = System.nanoTime() - startTime; + double shareFactor = computeShareFactor(phaseTookTime, totalTime); + cpuUtilizationHistogram.record( + Utils.calculateCPUUtilization( + numProcessors, + totalTime, + task.getTotalResourceStats().getCpuTimeInNanos(), + shareFactor), + createTags()); + heapUsedHistogram.record( + Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), + createTags()); + } + + private Tags createTags() { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + + @Override + protected void innerOnFailure(Exception e) { + LOG.error("Error is executing the the listener", e); + } + }; + } + + @VisibleForTesting + static double computeShareFactor(long phaseTookTime, long totalTime) { + return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index a4cd946e..64052ab1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportChannel; @@ -45,7 +46,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro @VisibleForTesting TransportChannel getChannel(T request, TransportChannel channel, Task task) { - if (!controller.isPerformanceAnalyzerEnabled()) { + if (!isCollectorEnabled()) { return channel; } @@ -56,6 +57,13 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { } } + private boolean isCollectorEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() + == Util.CollectorMode.RCA.getValue()); + } + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { String className = request.getClass().getName(); boolean bPrimary = false; diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java new file mode 100644 index 00000000..6eb64185 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.management.ThreadMXBean; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +/** + * {@link TransportChannel} implementation to override the sendResponse behavior to have handle of + * the {@link org.opensearch.action.bulk.BulkShardRequest} completion. + */ +public final class RTFPerformanceAnalyzerTransportChannel implements TransportChannel { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportChannel.class); + + private static final ThreadMXBean threadMXBean = + (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private static final String OPERATION_SHARD_BULK = "shardbulk"; + private static final String SHARD_ROLE_PRIMARY = "primary"; + private static final String SHARD_ROLE_REPLICA = "replica"; + + private long cpuStartTime; + private long operationStartTime; + + private Histogram cpuUtilizationHistogram; + + private TransportChannel original; + private String indexName; + private ShardId shardId; + private boolean primary; + + private long threadID; + private int numProcessors; + + void set( + TransportChannel original, + Histogram cpuUtilizationHistogram, + String indexName, + ShardId shardId, + boolean bPrimary) { + this.original = original; + this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexName = indexName; + this.shardId = shardId; + this.primary = bPrimary; + + this.operationStartTime = System.nanoTime(); + threadID = Thread.currentThread().getId(); + this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + LOG.debug("Thread Name {}", Thread.currentThread().getName()); + } + + @Override + public String getProfileName() { + return "RTFPerformanceAnalyzerTransportChannelProfile"; + } + + @Override + public String getChannelType() { + return "RTFPerformanceAnalyzerTransportChannelType"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + emitMetrics(false); + original.sendResponse(response); + } + + @Override + public void sendResponse(Exception exception) throws IOException { + emitMetrics(true); + original.sendResponse(exception); + } + + private void emitMetrics(boolean isFailed) { + double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); + recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); + } + + private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { + LOG.debug("Completion Thread Name {}", Thread.currentThread().getName()); + long totalCpuTime = + Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime)); + return Utils.calculateCPUUtilization( + numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); + } + + @VisibleForTesting + void recordCPUUtilizationMetric( + ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { + cpuUtilizationHistogram.record( + cpuUtilization, + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + } + + // This function is called from the security plugin using reflection. Do not + // remove this function without changing the security plugin. + public TransportChannel getInnerChannel() { + return this.original; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java new file mode 100644 index 00000000..5d2de2b6 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * Transport Interceptor to intercept the Indexing requests and populate the resource utilization + * metrics. + */ +public final class RTFPerformanceAnalyzerTransportInterceptor implements TransportInterceptor { + + private final PerformanceAnalyzerController controller; + + public RTFPerformanceAnalyzerTransportInterceptor( + final PerformanceAnalyzerController controller) { + this.controller = controller; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler) { + return new RTFPerformanceAnalyzerTransportRequestHandler<>(actualHandler, controller); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java new file mode 100644 index 00000000..82a0abe6 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * {@link TransportRequestHandler} implementation to intercept only the {@link BulkShardRequest} and + * skip other transport calls. + * + * @param {@link TransportRequest} + */ +public final class RTFPerformanceAnalyzerTransportRequestHandler + implements TransportRequestHandler { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportRequestHandler.class); + private final PerformanceAnalyzerController controller; + private TransportRequestHandler actualHandler; + private boolean logOnce = false; + private final Histogram cpuUtilizationHistogram; + + RTFPerformanceAnalyzerTransportRequestHandler( + TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { + this.actualHandler = actualHandler; + this.controller = controller; + this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + } + + private Histogram createCPUUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for an operation", + RTFMetrics.MetricUnits.RATE.toString()); + } else { + return null; + } + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + actualHandler.messageReceived(request, getChannel(request, channel, task), task); + } + + @VisibleForTesting + TransportChannel getChannel(T request, TransportChannel channel, Task task) { + if (!isCollectorEnabled()) { + return channel; + } + + if (request instanceof ConcreteShardRequest) { + return getShardBulkChannel(request, channel, task); + } else { + return channel; + } + } + + private boolean isCollectorEnabled() { + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { + String className = request.getClass().getName(); + boolean bPrimary = false; + + if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteShardRequest")) { + bPrimary = true; + } else if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteReplicaRequest")) { + bPrimary = false; + } else { + return channel; + } + + TransportRequest transportRequest = ((ConcreteShardRequest) request).getRequest(); + + if (!(transportRequest instanceof BulkShardRequest)) { + return channel; + } + + BulkShardRequest bsr = (BulkShardRequest) transportRequest; + RTFPerformanceAnalyzerTransportChannel rtfPerformanceAnalyzerTransportChannel = + new RTFPerformanceAnalyzerTransportChannel(); + + try { + rtfPerformanceAnalyzerTransportChannel.set( + channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + } catch (Exception ex) { + if (!logOnce) { + LOG.error(ex); + logOnce = true; + } + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + + return rtfPerformanceAnalyzerTransportChannel; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 21f254ca..34d91b02 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -8,6 +8,8 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; @@ -27,6 +29,7 @@ import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics; public class Utils { + private static final Logger LOG = LogManager.getLogger(Utils.class); public static void configureMetrics() { ServiceMetrics.initStatsReporter(); @@ -107,4 +110,38 @@ public static HashMap getShards() { IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + + /** + * CPU Utilization is the time spend in CPU cycles divide by the total time cpu available time. + * Total cpu available time would be the multiplication of num of processors and the process + * time. It also takes into account the cpuShareFactor in case some adjustments are needed. + * + * @param numProcessors + * @param totalOperationTime + * @param cpuUsageTime + * @param cpuShareFactor + * @return + */ + public static double calculateCPUUtilization( + int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { + LOG.debug( + "Performance Analyzer CPUUtilization calculation with numProcessors: {}", + numProcessors); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with cpuShareFactor {}", + cpuShareFactor); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalCpuTime {}", + cpuUsageTime); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalOperationTime {}", + totalOperationTime); + if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { + return 0.0d; + } + double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); + double cpuUtil = cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); + LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil); + return cpuUtil; + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java index fc5f95fa..554eca27 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java @@ -22,7 +22,6 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilter; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -90,12 +89,7 @@ public void setup() { } catch (Exception e) { e.printStackTrace(); } - clusterService = - new ClusterService( - settings, - clusterSettings, - threadPool, - new ClusterManagerMetrics(metricsRegistry)); + clusterService = new ClusterService(settings, clusterSettings, threadPool); identityService = new IdentityService(Settings.EMPTY, List.of()); restController = new RestController( @@ -131,7 +125,7 @@ public void testGetActions() { @Test public void testGetTransportInterceptors() { List list = plugin.getTransportInterceptors(null, null); - assertEquals(1, list.size()); + assertEquals(2, list.size()); assertEquals(PerformanceAnalyzerTransportInterceptor.class, list.get(0).getClass()); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index cfa8bf58..f7204783 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -25,6 +25,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.TestUtil; import org.opensearch.performanceanalyzer.util.Utils; @@ -69,6 +70,21 @@ public void init() { TestUtil.readEvents(); } + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + } + @Test public void testGetMetricsPath() { String expectedPath = diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java new file mode 100644 index 00000000..16aba4bc --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -0,0 +1,175 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFPerformanceAnalyzerSearchListenerTests { + + private RTFPerformanceAnalyzerSearchListener searchListener; + + @Mock private SearchContext searchContext; + @Mock private ShardSearchRequest shardSearchRequest; + @Mock private ShardId shardId; + @Mock private PerformanceAnalyzerController controller; + @Mock private SearchShardTask task; + @Mock private MetricsRegistry metricsRegistry; + @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram heapUsedHistogram; + @Mock private Index index; + + @Mock private TaskResourceUsage taskResourceUsage; + + @BeforeClass + public static void setup() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + } + + @Before + public void init() { + initMocks(this); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("cpu_utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(cpuUtilizationHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) + .thenReturn(heapUsedHistogram); + searchListener = new RTFPerformanceAnalyzerSearchListener(controller); + assertEquals( + RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), + searchListener.toString()); + } + + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + } + + @Test + public void testQueryPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.queryPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testQueryPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.failedQueryPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.failedFetchPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testOperationShareFactor() { + assertEquals( + Double.valueOf(10.0 / 15), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15), + 0); + assertEquals( + Double.valueOf(1), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10), + 0); + } + + @Test + public void testTaskCompletionListener() { + initializeValidSearchContext(true); + RTFPerformanceAnalyzerSearchListener rtfSearchListener = + new RTFPerformanceAnalyzerSearchListener(controller); + + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(task.getTotalResourceStats()).thenReturn(taskResourceUsage); + Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); + + NotifyOnceListener taskCompletionListener = + rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); + taskCompletionListener.onResponse(task); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } + + private void initializeValidSearchContext(boolean isValid) { + if (isValid) { + Mockito.when(searchContext.request()).thenReturn(shardSearchRequest); + Mockito.when(searchContext.getTask()).thenReturn(task); + Mockito.when(shardSearchRequest.shardId()).thenReturn(shardId); + } else { + Mockito.when(searchContext.request()).thenReturn(null); + } + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java index 56dc289a..099390fa 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.transport; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; @@ -18,6 +19,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.tasks.Task; @@ -67,4 +69,30 @@ public void testGetChannel() { TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); } + + @Test + public void testGetChannelIfRCAModeIsDisabled() { + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelIfDualModeIsEnabled() { + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java new file mode 100644 index 00000000..aa4e425b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.io.IOException; +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +public class RTFPerformanceAnalyzerTransportChannelTests { + private RTFPerformanceAnalyzerTransportChannel channel; + + @Mock private TransportChannel originalChannel; + @Mock private TransportResponse response; + @Mock private Histogram cpuUtilizationHistogram; + private ShardId shardId; + @Mock private ShardId mockedShardId; + @Mock private Index index; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + String indexName = "testIndex"; + shardId = new ShardId(new Index(indexName, "uuid"), 1); + channel = new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); + assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); + assertEquals(originalChannel, channel.getInnerChannel()); + } + + @Test + public void testResponse() throws IOException { + channel.sendResponse(response); + verify(originalChannel).sendResponse(response); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testResponseWithException() throws IOException { + Exception exception = new Exception("dummy exception"); + channel.sendResponse(exception); + verify(originalChannel).sendResponse(exception); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testRecordCPUUtilizationMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java new file mode 100644 index 00000000..9a0f9d0e --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.bulk.BulkItemRequest; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +@SuppressWarnings("unchecked") +public class RTFPerformanceAnalyzerTransportRequestHandlerTests { + private RTFPerformanceAnalyzerTransportRequestHandler handler; + private ConcreteShardRequest concreteShardRequest; + + @Mock private TransportRequestHandler transportRequestHandler; + @Mock private PerformanceAnalyzerController controller; + @Mock private TransportChannel channel; + @Mock private TransportRequest request; + @Mock private BulkShardRequest bulkShardRequest; + @Mock private Task task; + @Mock private ShardId shardId; + @Mock private MetricsRegistry metricsRegistry; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + handler = + new RTFPerformanceAnalyzerTransportRequestHandler( + transportRequestHandler, controller); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + } + + @Test + public void testMessageReceived() throws Exception { + handler.messageReceived(request, channel, task); + verify(transportRequestHandler).messageReceived(request, channel, task); + } + + @Test + public void testGetChannel() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelTelemetryIsDisabled() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelDualMode() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelMetricRegistryIsNull() { + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + Mockito.when(controller.getCollectorsRunModeValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java new file mode 100644 index 00000000..b1e2490e --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.util; + +import org.junit.Assert; +import org.junit.Test; + +public class UtilsTests { + + @Test + public void testCPUUtilization() { + Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5, 1.0), 0.0); + Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5, 1.0), 0.0); + Assert.assertEquals( + Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10, 1.0), 0.0); + Assert.assertEquals( + Double.valueOf(0.50 * (20 / 30.0)), + Utils.calculateCPUUtilization(3, 10, 20, 0.5d), + 0.0); + } + + @Test + public void testCPUUtilizationZeroValue() { + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 0.0), 0.0); + } +}