Skip to content

Commit

Permalink
Adds latency metrics for ClusterState Appliers and Listeners (#12333)
Browse files Browse the repository at this point in the history
* Adds latency metrics for ClusterState Appliers and Listeners

Signed-off-by: Harsh Garg <gkharsh@amazon.com>
(cherry picked from commit a254aa9)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed May 20, 2024
1 parent cf20a61 commit 6d23c1d
Show file tree
Hide file tree
Showing 48 changed files with 495 additions and 101 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.RestHandler;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
Expand Down Expand Up @@ -50,8 +51,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);

clusterService = new ClusterService(settings, clusterSettings, threadPool);

clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
}

public void testGetSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
private final ClusterService clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
private final TransportService transportService = mock(TransportService.class);
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
private final ActionFilters actionFilters = mock(ActionFilters.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
*
* @opensearch.internal
*/
public final class ClusterManagerMetrics {

private static final String LATENCY_METRIC_UNIT_MS = "ms";

public final Histogram clusterStateAppliersHistogram;
public final Histogram clusterStateListenersHistogram;
public final Histogram rerouteHistogram;
public final Histogram clusterStateComputeHistogram;
public final Histogram clusterStatePublishHistogram;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
LATENCY_METRIC_UNIT_MS
);
clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
LATENCY_METRIC_UNIT_MS
);
rerouteHistogram = metricsRegistry.createHistogram(
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
LATENCY_METRIC_UNIT_MS
);
clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
LATENCY_METRIC_UNIT_MS
);
clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
"Histogram for recording time taken to publish a new cluster state",
LATENCY_METRIC_UNIT_MS
);
}

public void recordLatency(Histogram histogram, Double value) {
histogram.record(value);
}

public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
histogram.record(value);
return;
}
histogram.record(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public ClusterModule(
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
ThreadContext threadContext,
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -158,7 +159,8 @@ public ClusterModule(
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
settings,
clusterManagerMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand All @@ -56,10 +57,12 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -96,6 +99,7 @@ public class AllocationService {
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final ClusterManagerMetrics clusterManagerMetrics;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -105,32 +109,40 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
ClusterManagerMetrics clusterManagerMetrics
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY, clusterManagerMetrics);
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings

Settings settings,
ClusterManagerMetrics clusterManagerMetrics
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
this.clusterManagerMetrics = clusterManagerMetrics;
}

/**
Expand Down Expand Up @@ -550,11 +562,15 @@ private void reroute(RoutingAllocation allocation) {
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

long rerouteStartTimeNS = System.nanoTime();
removeDelayMarkers(allocation);

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.rerouteHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS))
);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -61,13 +62,15 @@
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -120,8 +123,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final String nodeName;

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
private final ClusterManagerMetrics clusterManagerMetrics;

public ClusterApplierService(
String nodeName,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
Expand All @@ -132,6 +142,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold
);
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -597,15 +608,21 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(
private void callClusterStateAppliers(
ClusterChangedEvent clusterChangedEvent,
StopWatch stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers
) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) {
long applierStartTimeNS = System.nanoTime();
applier.applyClusterState(clusterChangedEvent);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateAppliersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - applierStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", applier.getClass().getSimpleName()))
);
}
}
}
Expand All @@ -624,7 +641,13 @@ private void callClusterStateListener(
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
long listenerStartTimeNS = System.nanoTime();
listener.clusterChanged(clusterChangedEvent);
clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateListenersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName()))
);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -20,7 +21,12 @@
*/
@PublicApi(since = "2.2.0")
public class ClusterManagerService extends MasterService {
public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
super(settings, clusterSettings, threadPool, clusterManagerMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -91,12 +92,17 @@ public class ClusterService extends AbstractLifecycleComponent {

private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
) {
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
);
}

Expand Down
Loading

0 comments on commit 6d23c1d

Please sign in to comment.