From 6a4e75774e4db0b07626519617d3169b3b149d8e Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 11 Oct 2023 13:49:05 +0000 Subject: [PATCH 01/11] Initial WIP for adding segrep backpressure to node stats. Signed-off-by: Rishikesh1159 --- .../admin/cluster/node/stats/NodeStats.java | 25 ++++++++++++++++++- .../cluster/node/stats/NodesStatsRequest.java | 4 ++- .../node/stats/TransportNodesStatsAction.java | 3 ++- .../index/SegmentReplicationStats.java | 14 +++++++++-- .../index/SegmentReplicationStatsTracker.java | 5 +++- .../main/java/org/opensearch/node/Node.java | 7 +++++- .../java/org/opensearch/node/NodeService.java | 13 +++++++--- .../MockInternalClusterInfoService.java | 3 ++- 8 files changed, 63 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 69efea186d927..c033152dd053c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,6 +46,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.SegmentReplicationStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -128,6 +129,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private SearchBackpressureStats searchBackpressureStats; + @Nullable + private SegmentReplicationStats segmentReplicationStats; + @Nullable private ClusterManagerThrottlingStats clusterManagerThrottlingStats; @@ -207,6 +211,11 @@ public NodeStats(StreamInput in) throws IOException { } else { resourceUsageStats = null; } + if (in.getVersion().onOrAfter(Version.V_2_11_0)) { + segmentReplicationStats = in.readOptionalWriteable(SegmentReplicationStats::new); + } else { + segmentReplicationStats = null; + } } public NodeStats( @@ -234,7 +243,8 @@ public NodeStats( @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, - @Nullable SearchPipelineStats searchPipelineStats + @Nullable SearchPipelineStats searchPipelineStats, + @Nullable SegmentReplicationStats segmentReplicationStats ) { super(node); this.timestamp = timestamp; @@ -261,6 +271,7 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; + this.segmentReplicationStats = segmentReplicationStats; } public long getTimestamp() { @@ -403,6 +414,11 @@ public SearchPipelineStats getSearchPipelineStats() { return searchPipelineStats; } + @Nullable + public SegmentReplicationStats getSegmentReplicationStats() { + return segmentReplicationStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -449,6 +465,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport out.writeOptionalWriteable(resourceUsageStats); } + if (out.getVersion().onOrAfter(Version.V_2_11_0)) { + out.writeOptionalWriteable(segmentReplicationStats); + } } @Override @@ -542,6 +561,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getResourceUsageStats() != null) { getResourceUsageStats().toXContent(builder, params); } + if (getSegmentReplicationStats() != null) { + getSegmentReplicationStats().toXContent(builder, params); + } + return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 99c9fb2d1e26a..34fdbbd27199c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -214,7 +214,9 @@ public enum Metric { FILE_CACHE_STATS("file_cache"), TASK_CANCELLATION("task_cancellation"), SEARCH_PIPELINE("search_pipeline"), - RESOURCE_USAGE_STATS("resource_usage_stats"); + + RESOURCE_USAGE_STATS("resource_usage_stats"), + SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 204157236a282..838bd0c34dce1 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -125,7 +125,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), - NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), + NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index cdf22b05d5861..0567657637984 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -28,8 +28,11 @@ public class SegmentReplicationStats implements Writeable, ToXContentFragment { private final Map shardStats; - public SegmentReplicationStats(final Map shardStats) { + private final long totalRejectionCount; + + public SegmentReplicationStats(final Map shardStats, final long totalRejectionCount) { this.shardStats = shardStats; + this.totalRejectionCount = totalRejectionCount; } public SegmentReplicationStats(StreamInput in) throws IOException { @@ -40,12 +43,17 @@ public SegmentReplicationStats(StreamInput in) throws IOException { SegmentReplicationPerGroupStats groupStats = new SegmentReplicationPerGroupStats(in); shardStats.put(shardId, groupStats); } + this.totalRejectionCount = in.readVLong(); } public Map getShardStats() { return shardStats; } + public long getTotalRejectionCount() { + return totalRejectionCount; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("segment_replication"); @@ -54,6 +62,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws entry.getValue().toXContent(builder, params); builder.endObject(); } + builder.field("total_rejected_requests", totalRejectionCount); return builder.endObject(); } @@ -64,10 +73,11 @@ public void writeTo(StreamOutput out) throws IOException { entry.getKey().writeTo(out); entry.getValue().writeTo(out); } + out.writeVLong(totalRejectionCount); } @Override public String toString() { - return "SegmentReplicationStats{" + "shardStats=" + shardStats + '}'; + return "SegmentReplicationStats{" + "shardStats=" + shardStats + ", totalRejectedRequestCount=" + totalRejectionCount + '}'; } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 6d5c00c08caff..2a7aca0a1c7ed 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -42,7 +42,10 @@ public SegmentReplicationStats getStats() { } } } - return new SegmentReplicationStats(stats); + return new SegmentReplicationStats( + stats, + stats.values().stream().mapToLong(shardGroup -> shardGroup.getRejectedRequestCount()).sum() + ); } public void incrementRejectionCount(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c456f01135dee..d93df68f54373 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -62,6 +62,7 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -136,6 +137,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; @@ -964,6 +966,8 @@ protected Node( transportService.getTaskManager() ); + ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, clusterModule.getAllocationService(), rerouteService, threadPool); + final SegmentReplicationPressureService segmentReplicationPressureService =new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, threadPool); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), @@ -1102,7 +1106,8 @@ protected Node( searchPipelineService, fileCache, taskCancellationMonitoringService, - resourceUsageCollectorService + resourceUsageCollectorService, + segmentReplicationPressureService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 9bb07080fa717..5fc8461cf8844 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -48,6 +48,7 @@ import org.opensearch.discovery.Discovery; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; @@ -94,6 +95,8 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; + private final SegmentReplicationPressureService segmentReplicationPressureService; + NodeService( Settings settings, ThreadPool threadPool, @@ -116,7 +119,8 @@ public class NodeService implements Closeable { SearchPipelineService searchPipelineService, FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, - ResourceUsageCollectorService resourceUsageCollectorService + ResourceUsageCollectorService resourceUsageCollectorService, + SegmentReplicationPressureService segmentReplicationPressureService ) { this.settings = settings; this.threadPool = threadPool; @@ -142,6 +146,7 @@ public class NodeService implements Closeable { this.resourceUsageCollectorService = resourceUsageCollectorService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); + this.segmentReplicationPressureService = segmentReplicationPressureService; } public NodeInfo info( @@ -221,7 +226,8 @@ public NodeStats stats( boolean fileCacheStats, boolean taskCancellation, boolean searchPipelineStats, - boolean resourceUsageStats + boolean resourceUsageStats, + boolean segmentReplicationBackPressureStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -250,7 +256,8 @@ public NodeStats stats( weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, - searchPipelineStats ? this.searchPipelineService.stats() : null + searchPipelineStats ? this.searchPipelineService.stats() : null, + segmentReplicationBackPressureStats ? this.segmentReplicationPressureService.nodeStats() : null ); } diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index a520b6278ea47..55b3ae9fbdd4f 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -121,7 +121,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getWeightedRoutingStats(), nodeStats.getFileCacheStats(), nodeStats.getTaskCancellationStats(), - nodeStats.getSearchPipelineStats() + nodeStats.getSearchPipelineStats(), + nodeStats.getSegmentReplicationStats() ); }).collect(Collectors.toList()); } From 7da3cfb8bdc79c5d76e49727739ceab342a9bf80 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 12 Oct 2023 01:58:17 +0000 Subject: [PATCH 02/11] Bind SegmentReplicarionStatsTracker in Node.java Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationPressureService.java | 3 ++- .../index/SegmentReplicationStatsTracker.java | 2 ++ server/src/main/java/org/opensearch/node/Node.java | 8 +++----- .../main/java/org/opensearch/node/NodeService.java | 12 ++++++------ .../SegmentReplicationPressureServiceTests.java | 9 ++++++++- .../snapshots/SnapshotResiliencyTests.java | 2 ++ 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..d9d480e7b2b27 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -106,10 +106,11 @@ public SegmentReplicationPressureService( ClusterService clusterService, IndicesService indicesService, ShardStateAction shardStateAction, + SegmentReplicationStatsTracker tracker, ThreadPool threadPool ) { this.indicesService = indicesService; - this.tracker = new SegmentReplicationStatsTracker(this.indicesService); + this.tracker = tracker; this.shardStateAction = shardStateAction; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 2a7aca0a1c7ed..f018aa5a800de 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -8,6 +8,7 @@ package org.opensearch.index; +import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; @@ -28,6 +29,7 @@ public class SegmentReplicationStatsTracker { private final IndicesService indicesService; private final Map rejectionCount; + @Inject public SegmentReplicationStatsTracker(IndicesService indicesService) { this.indicesService = indicesService; rejectionCount = ConcurrentCollections.newConcurrentMap(); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d93df68f54373..82e29c92fcd86 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -62,7 +62,6 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; -import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -137,7 +136,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; @@ -966,8 +965,7 @@ protected Node( transportService.getTaskManager() ); - ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, clusterModule.getAllocationService(), rerouteService, threadPool); - final SegmentReplicationPressureService segmentReplicationPressureService =new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, threadPool); + final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), @@ -1107,7 +1105,7 @@ protected Node( fileCache, taskCancellationMonitoringService, resourceUsageCollectorService, - segmentReplicationPressureService + segmentReplicationStatsTracker ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 5fc8461cf8844..12c869e77f18c 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -48,7 +48,7 @@ import org.opensearch.discovery.Discovery; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; -import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; @@ -95,7 +95,7 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; - private final SegmentReplicationPressureService segmentReplicationPressureService; + private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; NodeService( Settings settings, @@ -120,7 +120,7 @@ public class NodeService implements Closeable { FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, ResourceUsageCollectorService resourceUsageCollectorService, - SegmentReplicationPressureService segmentReplicationPressureService + SegmentReplicationStatsTracker segmentReplicationStatsTracker ) { this.settings = settings; this.threadPool = threadPool; @@ -146,7 +146,7 @@ public class NodeService implements Closeable { this.resourceUsageCollectorService = resourceUsageCollectorService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); - this.segmentReplicationPressureService = segmentReplicationPressureService; + this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; } public NodeInfo info( @@ -227,7 +227,7 @@ public NodeStats stats( boolean taskCancellation, boolean searchPipelineStats, boolean resourceUsageStats, - boolean segmentReplicationBackPressureStats + boolean segmentReplicationTrackerStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -257,7 +257,7 @@ public NodeStats stats( fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, - segmentReplicationBackPressureStats ? this.segmentReplicationPressureService.nodeStats() : null + segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getStats() : null ); } diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 34fa13f0ba62c..478fdcb24f76a 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class)); + return new SegmentReplicationPressureService( + settings, + clusterService, + indicesService, + shardStateAction, + new SegmentReplicationStatsTracker(indicesService), + mock(ThreadPool.class) + ); } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..8458b887a8c3c 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -178,6 +178,7 @@ import org.opensearch.gateway.TransportNodesListGatewayStartedShards; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; @@ -2186,6 +2187,7 @@ public void onFailure(final Exception e) { clusterService, mock(IndicesService.class), mock(ShardStateAction.class), + mock(SegmentReplicationStatsTracker.class), mock(ThreadPool.class) ), mock(RemoteStorePressureService.class), From d9ad4ce28e61c7109ef8fa6601f925200f4e60a9 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Mon, 16 Oct 2023 23:25:12 +0000 Subject: [PATCH 03/11] remove additional segrep backpressure info from node stats Signed-off-by: Rishikesh1159 --- .../admin/cluster/node/stats/NodeStats.java | 8 +++++-- .../SegmentReplicationPerGroupStats.java | 2 ++ .../index/SegmentReplicationStats.java | 21 ++++++++++++------- .../cluster/node/stats/NodeStatsTests.java | 1 + 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index c033152dd053c..498b04ab0195d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -206,12 +206,15 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport resourceUsageStats = in.readOptionalWriteable(NodesResourceUsageStats::new); } else { resourceUsageStats = null; } - if (in.getVersion().onOrAfter(Version.V_2_11_0)) { + + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { segmentReplicationStats = in.readOptionalWriteable(SegmentReplicationStats::new); } else { segmentReplicationStats = null; @@ -465,7 +468,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport out.writeOptionalWriteable(resourceUsageStats); } - if (out.getVersion().onOrAfter(Version.V_2_11_0)) { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeOptionalWriteable(segmentReplicationStats); } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index c3b4f8217c961..28ab704093927 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -55,12 +55,14 @@ public ShardId getShardId() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(shardId.toString()); builder.field("rejected_requests", rejectedRequestCount); builder.startArray("replicas"); for (SegmentReplicationShardStats stats : replicaStats) { stats.toXContent(builder, params); } builder.endArray(); + builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index 0567657637984..c4e8d5b83e612 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -8,6 +8,7 @@ package org.opensearch.index; +import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -28,7 +29,10 @@ public class SegmentReplicationStats implements Writeable, ToXContentFragment { private final Map shardStats; - private final long totalRejectionCount; + /** + * Total rejections due to segment replication backpressure + */ + private long totalRejectionCount; public SegmentReplicationStats(final Map shardStats, final long totalRejectionCount) { this.shardStats = shardStats; @@ -43,7 +47,10 @@ public SegmentReplicationStats(StreamInput in) throws IOException { SegmentReplicationPerGroupStats groupStats = new SegmentReplicationPerGroupStats(in); shardStats.put(shardId, groupStats); } - this.totalRejectionCount = in.readVLong(); + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.totalRejectionCount = in.readVLong(); + } } public Map getShardStats() { @@ -57,11 +64,6 @@ public long getTotalRejectionCount() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("segment_replication"); - for (Map.Entry entry : shardStats.entrySet()) { - builder.startObject(entry.getKey().toString()); - entry.getValue().toXContent(builder, params); - builder.endObject(); - } builder.field("total_rejected_requests", totalRejectionCount); return builder.endObject(); } @@ -73,7 +75,10 @@ public void writeTo(StreamOutput out) throws IOException { entry.getKey().writeTo(out); entry.getValue().writeTo(out); } - out.writeVLong(totalRejectionCount); + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(totalRejectionCount); + } } @Override diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 7a1b6f113d0e8..37c1e4ea146ce 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -840,6 +840,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { weightedRoutingStats, null, null, + null, null ); } From 00cfe84a9ca5704f672b5ede90fcdf3402f0f6fa Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 17 Oct 2023 00:14:07 +0000 Subject: [PATCH 04/11] fix metric name in node stats Signed-off-by: Rishikesh1159 --- .../action/admin/cluster/stats/TransportClusterStatsAction.java | 1 + .../main/java/org/opensearch/index/SegmentReplicationStats.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index d8323e209be23..f51fabbfb2388 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -169,6 +169,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index c4e8d5b83e612..1c883a54af5cc 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -63,7 +63,7 @@ public long getTotalRejectionCount() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("segment_replication"); + builder.startObject("segment_replication_backpressure"); builder.field("total_rejected_requests", totalRejectionCount); return builder.endObject(); } From bb65d40e9c5b792f5dfe834ccc64c4331021a881 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 17 Oct 2023 00:28:56 +0000 Subject: [PATCH 05/11] Fix compile error. Signed-off-by: Rishikesh1159 --- .../src/main/java/org/opensearch/test/InternalTestCluster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 3c7423f73685f..965d7af260f50 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2719,6 +2719,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat( From 68869e3fd98a2ee3f1109db9e380977afa576f32 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 17 Oct 2023 03:50:39 +0000 Subject: [PATCH 06/11] Fix compile errors. Signed-off-by: Rishikesh1159 --- .../test/java/org/opensearch/cluster/DiskUsageTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 64949cf861f70..6f03e87bf5824 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -191,6 +191,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -218,6 +219,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -245,6 +247,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -303,6 +306,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -330,6 +334,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -357,6 +362,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); From b09bd3716599a8ae163423223ecf7077f9ffe13d Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 17 Oct 2023 16:11:19 +0000 Subject: [PATCH 07/11] Address comments on PR. Signed-off-by: Rishikesh1159 --- .../admin/cluster/node/stats/NodeStats.java | 22 +++---- .../SegmentReplicationPerGroupStats.java | 2 - .../SegmentReplicationRejectionStats.java | 62 +++++++++++++++++++ .../index/SegmentReplicationStats.java | 31 +++------- .../index/SegmentReplicationStatsTracker.java | 17 +++-- .../java/org/opensearch/node/NodeService.java | 2 +- .../cluster/node/stats/NodeStatsTests.java | 19 +++++- .../MockInternalClusterInfoService.java | 2 +- 8 files changed, 114 insertions(+), 43 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 498b04ab0195d..1ecd4100cc9a4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,7 +46,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; -import org.opensearch.index.SegmentReplicationStats; +import org.opensearch.index.SegmentReplicationRejectionStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -130,7 +130,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { private SearchBackpressureStats searchBackpressureStats; @Nullable - private SegmentReplicationStats segmentReplicationStats; + private SegmentReplicationRejectionStats segmentReplicationRejectionStats; @Nullable private ClusterManagerThrottlingStats clusterManagerThrottlingStats; @@ -215,9 +215,9 @@ public NodeStats(StreamInput in) throws IOException { // TODO: change to V_2_12_0 on main after backport to 2.x if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - segmentReplicationStats = in.readOptionalWriteable(SegmentReplicationStats::new); + segmentReplicationRejectionStats = in.readOptionalWriteable(SegmentReplicationRejectionStats::new); } else { - segmentReplicationStats = null; + segmentReplicationRejectionStats = null; } } @@ -247,7 +247,7 @@ public NodeStats( @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, @Nullable SearchPipelineStats searchPipelineStats, - @Nullable SegmentReplicationStats segmentReplicationStats + @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats ) { super(node); this.timestamp = timestamp; @@ -274,7 +274,7 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; - this.segmentReplicationStats = segmentReplicationStats; + this.segmentReplicationRejectionStats = segmentReplicationRejectionStats; } public long getTimestamp() { @@ -418,8 +418,8 @@ public SearchPipelineStats getSearchPipelineStats() { } @Nullable - public SegmentReplicationStats getSegmentReplicationStats() { - return segmentReplicationStats; + public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() { + return segmentReplicationRejectionStats; } @Override @@ -470,7 +470,7 @@ public void writeTo(StreamOutput out) throws IOException { } // TODO: change to V_2_12_0 on main after backport to 2.x if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeOptionalWriteable(segmentReplicationStats); + out.writeOptionalWriteable(segmentReplicationRejectionStats); } } @@ -565,8 +565,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getResourceUsageStats() != null) { getResourceUsageStats().toXContent(builder, params); } - if (getSegmentReplicationStats() != null) { - getSegmentReplicationStats().toXContent(builder, params); + if (getSegmentReplicationRejectionStats() != null) { + getSegmentReplicationRejectionStats().toXContent(builder, params); } return builder; diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java index 28ab704093927..c3b4f8217c961 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -55,14 +55,12 @@ public ShardId getShardId() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(shardId.toString()); builder.field("rejected_requests", rejectedRequestCount); builder.startArray("replicas"); for (SegmentReplicationShardStats stats : replicaStats) { stats.toXContent(builder, params); } builder.endArray(); - builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java new file mode 100644 index 0000000000000..59a1d3edce47f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.Version; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +public class SegmentReplicationRejectionStats implements Writeable, ToXContentFragment { + + /** + * Total rejections due to segment replication backpressure + */ + private long totalRejectionCount; + + public SegmentReplicationRejectionStats(final long totalRejectionCount) { + this.totalRejectionCount = totalRejectionCount; + } + + public SegmentReplicationRejectionStats(StreamInput in) throws IOException { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.totalRejectionCount = in.readVLong(); + } + } + + public long getTotalRejectionCount() { + return totalRejectionCount; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("segment_replication_backpressure"); + builder.field("total_rejected_requests", totalRejectionCount); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(totalRejectionCount); + } + } + + @Override + public String toString() { + return "SegmentReplicationRejectionStats{ totalRejectedRequestCount=" + totalRejectionCount + '}'; + } + +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java index 1c883a54af5cc..cdf22b05d5861 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -8,7 +8,6 @@ package org.opensearch.index; -import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -29,14 +28,8 @@ public class SegmentReplicationStats implements Writeable, ToXContentFragment { private final Map shardStats; - /** - * Total rejections due to segment replication backpressure - */ - private long totalRejectionCount; - - public SegmentReplicationStats(final Map shardStats, final long totalRejectionCount) { + public SegmentReplicationStats(final Map shardStats) { this.shardStats = shardStats; - this.totalRejectionCount = totalRejectionCount; } public SegmentReplicationStats(StreamInput in) throws IOException { @@ -47,24 +40,20 @@ public SegmentReplicationStats(StreamInput in) throws IOException { SegmentReplicationPerGroupStats groupStats = new SegmentReplicationPerGroupStats(in); shardStats.put(shardId, groupStats); } - // TODO: change to V_2_12_0 on main after backport to 2.x - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.totalRejectionCount = in.readVLong(); - } } public Map getShardStats() { return shardStats; } - public long getTotalRejectionCount() { - return totalRejectionCount; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("segment_replication_backpressure"); - builder.field("total_rejected_requests", totalRejectionCount); + builder.startObject("segment_replication"); + for (Map.Entry entry : shardStats.entrySet()) { + builder.startObject(entry.getKey().toString()); + entry.getValue().toXContent(builder, params); + builder.endObject(); + } return builder.endObject(); } @@ -75,14 +64,10 @@ public void writeTo(StreamOutput out) throws IOException { entry.getKey().writeTo(out); entry.getValue().writeTo(out); } - // TODO: change to V_2_12_0 on main after backport to 2.x - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeVLong(totalRejectionCount); - } } @Override public String toString() { - return "SegmentReplicationStats{" + "shardStats=" + shardStats + ", totalRejectedRequestCount=" + totalRejectionCount + '}'; + return "SegmentReplicationStats{" + "shardStats=" + shardStats + '}'; } } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index f018aa5a800de..cc1068f4e33d1 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -35,6 +35,18 @@ public SegmentReplicationStatsTracker(IndicesService indicesService) { rejectionCount = ConcurrentCollections.newConcurrentMap(); } + public SegmentReplicationRejectionStats getRejectionStats() { + long rejectionCount = 0; + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { + rejectionCount += getStatsForShard(indexShard).getRejectedRequestCount(); + } + } + } + return new SegmentReplicationRejectionStats(rejectionCount); + } + public SegmentReplicationStats getStats() { Map stats = new HashMap<>(); for (IndexService indexService : indicesService) { @@ -44,10 +56,7 @@ public SegmentReplicationStats getStats() { } } } - return new SegmentReplicationStats( - stats, - stats.values().stream().mapToLong(shardGroup -> shardGroup.getRejectedRequestCount()).sum() - ); + return new SegmentReplicationStats(stats); } public void incrementRejectionCount(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 12c869e77f18c..d414a6ef77abc 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -257,7 +257,7 @@ public NodeStats stats( fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, - segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getStats() : null + segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getRejectionStats() : null ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 37c1e4ea146ce..aa3831d391580 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -49,6 +49,7 @@ import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; import org.opensearch.index.ReplicationStats; +import org.opensearch.index.SegmentReplicationRejectionStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.RemoteTranslogStats; @@ -413,6 +414,17 @@ public void testSerialization() throws IOException { assertEquals(aResourceUsageStats.getTimestamp(), bResourceUsageStats.getTimestamp()); }); } + SegmentReplicationRejectionStats segmentReplicationRejectionStats = nodeStats.getSegmentReplicationRejectionStats(); + SegmentReplicationRejectionStats deserializedSegmentReplicationRejectionStats = deserializedNodeStats + .getSegmentReplicationRejectionStats(); + if (segmentReplicationRejectionStats == null) { + assertNull(deserializedSegmentReplicationRejectionStats); + } else { + assertEquals( + segmentReplicationRejectionStats.getTotalRejectionCount(), + deserializedSegmentReplicationRejectionStats.getTotalRejectionCount() + ); + } ScriptCacheStats scriptCacheStats = nodeStats.getScriptCacheStats(); ScriptCacheStats deserializedScriptCacheStats = deserializedNodeStats.getScriptCacheStats(); if (scriptCacheStats == null) { @@ -800,6 +812,11 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { } nodesResourceUsageStats = new NodesResourceUsageStats(resourceUsageStatsMap); } + SegmentReplicationRejectionStats segmentReplicationRejectionStats = null; + if (frequently()) { + segmentReplicationRejectionStats = new SegmentReplicationRejectionStats(randomNonNegativeLong()); + } + ClusterManagerThrottlingStats clusterManagerThrottlingStats = null; if (frequently()) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); @@ -841,7 +858,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { null, null, null, - null + segmentReplicationRejectionStats ); } diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 55b3ae9fbdd4f..d7cd73f398754 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -122,7 +122,7 @@ List adjustNodesStats(List nodesStats) { nodeStats.getFileCacheStats(), nodeStats.getTaskCancellationStats(), nodeStats.getSearchPipelineStats(), - nodeStats.getSegmentReplicationStats() + nodeStats.getSegmentReplicationRejectionStats() ); }).collect(Collectors.toList()); } From 317ec74cc8e0b34fab03048f33690956627dcde5 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 17 Oct 2023 17:53:53 +0000 Subject: [PATCH 08/11] Update java docs. Signed-off-by: Rishikesh1159 --- .../opensearch/index/SegmentReplicationRejectionStats.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java index 59a1d3edce47f..3b8c0e8e073ff 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java @@ -17,6 +17,11 @@ import java.io.IOException; +/** + * Segment Replication Rejection Stats. + * + * @opensearch.internal + */ public class SegmentReplicationRejectionStats implements Writeable, ToXContentFragment { /** From 5449d364e1bdf683de598d492f3da3f964c026f6 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 18 Oct 2023 22:13:33 +0000 Subject: [PATCH 09/11] Address comments on PR and fix compile errors. Signed-off-by: Rishikesh1159 --- .../action/admin/cluster/node/stats/NodeStats.java | 1 + .../cluster/stats/TransportClusterStatsAction.java | 1 + .../index/SegmentReplicationStatsTracker.java | 10 +++++----- server/src/main/java/org/opensearch/node/Node.java | 1 + .../java/org/opensearch/cluster/DiskUsageTests.java | 6 ++++++ .../java/org/opensearch/test/InternalTestCluster.java | 1 + 6 files changed, 15 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 7277b503df69f..63d274f7c5d1a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -429,6 +429,7 @@ public SearchPipelineStats getSearchPipelineStats() { @Nullable public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() { return segmentReplicationRejectionStats; + } public RepositoriesStats getRepositoriesStats() { return repositoriesStats; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index f51fabbfb2388..5efec8b876435 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -170,6 +170,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index cc1068f4e33d1..7f115a9676c1c 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -8,7 +8,6 @@ package org.opensearch.index; -import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; @@ -29,7 +28,6 @@ public class SegmentReplicationStatsTracker { private final IndicesService indicesService; private final Map rejectionCount; - @Inject public SegmentReplicationStatsTracker(IndicesService indicesService) { this.indicesService = indicesService; rejectionCount = ConcurrentCollections.newConcurrentMap(); @@ -38,9 +36,11 @@ public SegmentReplicationStatsTracker(IndicesService indicesService) { public SegmentReplicationRejectionStats getRejectionStats() { long rejectionCount = 0; for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { - rejectionCount += getStatsForShard(indexShard).getRejectedRequestCount(); + if (indexService.getIndexSettings().isSegRepEnabled()) { + for (IndexShard indexShard : indexService) { + if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { + rejectionCount += getStatsForShard(indexShard).getRejectedRequestCount(); + } } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 2c123ff5b2659..711a90d424ac3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1249,6 +1249,7 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); + b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 6f03e87bf5824..f037b75dc16a3 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -192,6 +192,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -220,6 +221,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -248,6 +250,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -307,6 +310,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -335,6 +339,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -363,6 +368,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 898e125b94954..63d8f069bebea 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2722,6 +2722,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat( From 5765b25beed5e04d3dc66a05ffd2512a5fcc8f29 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 20 Oct 2023 00:21:06 +0000 Subject: [PATCH 10/11] Address comments on PR. Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationStatsTracker.java | 18 ++++-------- .../java/org/opensearch/node/NodeService.java | 2 +- .../SegmentReplicationStatsTrackerTests.java | 28 +++++++++++++++++++ 3 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 7f115a9676c1c..f5fc8aa1c1eea 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -33,18 +33,12 @@ public SegmentReplicationStatsTracker(IndicesService indicesService) { rejectionCount = ConcurrentCollections.newConcurrentMap(); } - public SegmentReplicationRejectionStats getRejectionStats() { - long rejectionCount = 0; - for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled()) { - for (IndexShard indexShard : indexService) { - if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { - rejectionCount += getStatsForShard(indexShard).getRejectedRequestCount(); - } - } - } - } - return new SegmentReplicationRejectionStats(rejectionCount); + public SegmentReplicationRejectionStats getTotalRejectionStats() { + return new SegmentReplicationRejectionStats(this.rejectionCount.values().stream().mapToInt(AtomicInteger::get).sum()); + } + + protected Map getRejectionCount() { + return rejectionCount; } public SegmentReplicationStats getStats() { diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index d81c360abd927..49dde0b81cac7 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -262,7 +262,7 @@ public NodeStats stats( fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, - segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getRejectionStats() : null, + segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null ); } diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java new file mode 100644 index 0000000000000..d2a8e28ff0221 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.indices.IndicesService; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; + +public class SegmentReplicationStatsTrackerTests extends OpenSearchTestCase { + + private IndicesService indicesService = mock(IndicesService.class); + + public void testRejectedCountWhenEmpty() { + SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); + + // Verify that total rejection count is 0 on an empty rejectionCount map in statsTracker. + assertTrue(segmentReplicationStatsTracker.getRejectionCount().isEmpty()); + assertEquals(segmentReplicationStatsTracker.getTotalRejectionStats().getTotalRejectionCount(), 0L); + } + +} From b96a875a03f40409f3588453815f95dd94a042cc Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Fri, 20 Oct 2023 15:38:36 +0000 Subject: [PATCH 11/11] Update unit test. Signed-off-by: Rishikesh1159 --- .../index/SegmentReplicationRejectionStats.java | 2 +- .../index/SegmentReplicationStatsTrackerTests.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java index 3b8c0e8e073ff..9f9f150ebe2d7 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationRejectionStats.java @@ -18,7 +18,7 @@ import java.io.IOException; /** - * Segment Replication Rejection Stats. + * Segment replication rejection stats. * * @opensearch.internal */ diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java index d2a8e28ff0221..04423d583e8f9 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationStatsTrackerTests.java @@ -8,21 +8,28 @@ package org.opensearch.index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndicesService; import org.opensearch.test.OpenSearchTestCase; +import org.mockito.Mockito; + import static org.mockito.Mockito.mock; public class SegmentReplicationStatsTrackerTests extends OpenSearchTestCase { private IndicesService indicesService = mock(IndicesService.class); - public void testRejectedCountWhenEmpty() { + public void testRejectedCount() { SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); // Verify that total rejection count is 0 on an empty rejectionCount map in statsTracker. assertTrue(segmentReplicationStatsTracker.getRejectionCount().isEmpty()); assertEquals(segmentReplicationStatsTracker.getTotalRejectionStats().getTotalRejectionCount(), 0L); + + // Verify that total rejection count is 1 after incrementing rejectionCount. + segmentReplicationStatsTracker.incrementRejectionCount(Mockito.mock(ShardId.class)); + assertEquals(segmentReplicationStatsTracker.getTotalRejectionStats().getTotalRejectionCount(), 1L); } }