diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/SegmentReplicationRestIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/SegmentReplicationRestIT.java new file mode 100644 index 0000000000000..a1b5d4c267aed --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/SegmentReplicationRestIT.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.opensearch.client.Node; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.XContentTestUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.rest.RestStatus.CREATED; +import static org.opensearch.rest.RestStatus.OK; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, + numClientNodes = 0) +public class SegmentReplicationRestIT extends HttpSmokeTestCase { + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @SuppressWarnings("unchecked") + public void testSegmentReplicationStats() throws Exception { + // create index. + Request createRequest = new Request("PUT", "/test_index"); + createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " + + "\"replication\": {\"type\": \"SEGMENT\"}}}}"); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertEquals(indexCreatedResponse.getStatusLine().getStatusCode(), OK.getStatus()); + ensureGreen("test_index"); + + //index a doc + Request successfulIndexingRequest = new Request("POST", "/test_index/_doc/"); + successfulIndexingRequest.setJsonEntity("{\"foo\": \"bar\"}"); + final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); + assertEquals(indexSuccessFul.getStatusLine().getStatusCode(), CREATED.getStatus()); + + assertBusy(() -> { + // wait for SR to run. + for (Node node : getRestClient().getNodes()) { + assertHitCount(client(node.getName()).prepareSearch("test_index").setSize(0).setPreference("_only_local").get(), 1); + } + }); + Request statsRequest = new Request("GET", "/_nodes/stats/segment_replication?pretty"); + final Response response = getRestClient().performRequest(statsRequest); + logger.info("Node stats response\n{}", EntityUtils.toString(response.getEntity())); + Map statsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), + true); + List nodes = new ArrayList<>(((Map) statsMap.get("nodes")).values()); + assertEquals(2, nodes.size()); + XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map) nodes.get(0)); + final Map node1_map = node1.get("segment_replication"); + Map primaryNode_map = node1_map; + if (node1_map.isEmpty()) { + XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map) nodes.get(1)); + primaryNode_map = node2.get("segment_replication"); + } + List primary_values = new ArrayList<>(primaryNode_map + .values()); + assertEquals(1, primary_values.size()); + XContentTestUtils.JsonMapView shard1 = new XContentTestUtils.JsonMapView((Map) primary_values.get(0)); + Integer node1TotalLimitsRejections = shard1.get("rejected_requests"); + assertNotNull(node1TotalLimitsRejections); + String average_replication_lag = shard1.get("average_replication_lag"); + assertNotNull(average_replication_lag); + List shard1_replicas = new ArrayList<>(((Map) shard1.get("replicas")).values()); + assertEquals(1, shard1_replicas.size()); + XContentTestUtils.JsonMapView replica = new XContentTestUtils.JsonMapView((Map) shard1_replicas.get(0)); + Integer checkpoints_behind = replica.get("checkpoints_behind"); + assertEquals(0, checkpoints_behind.intValue()); + assertNotNull(replica.get("bytes_behind")); + assertNotNull(replica.get("average_replication_lag")); + assertNotNull(replica.get("average_checkpoints_per_sync")); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 6d151843219bd..47025f1bce934 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,6 +46,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.SegmentReplicationStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -123,6 +124,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private ShardIndexingPressureStats shardIndexingPressureStats; + @Nullable + private SegmentReplicationStats segmentReplicationStats; + @Nullable private SearchBackpressureStats searchBackpressureStats; @@ -160,6 +164,12 @@ public NodeStats(StreamInput in) throws IOException { indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new); shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new); + if (in.getVersion().onOrAfter(Version.V_2_7_0)) { + segmentReplicationStats = in.readOptionalWriteable(SegmentReplicationStats::new); + } else { + segmentReplicationStats = null; + } + if (in.getVersion().onOrAfter(Version.V_2_4_0)) { searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new); } else { @@ -202,6 +212,7 @@ public NodeStats( @Nullable ScriptCacheStats scriptCacheStats, @Nullable IndexingPressureStats indexingPressureStats, @Nullable ShardIndexingPressureStats shardIndexingPressureStats, + @Nullable SegmentReplicationStats segmentReplicationStats, @Nullable SearchBackpressureStats searchBackpressureStats, @Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats, @Nullable WeightedRoutingStats weightedRoutingStats, @@ -225,6 +236,7 @@ public NodeStats( this.scriptCacheStats = scriptCacheStats; this.indexingPressureStats = indexingPressureStats; this.shardIndexingPressureStats = shardIndexingPressureStats; + this.segmentReplicationStats = segmentReplicationStats; this.searchBackpressureStats = searchBackpressureStats; this.clusterManagerThrottlingStats = clusterManagerThrottlingStats; this.weightedRoutingStats = weightedRoutingStats; @@ -338,6 +350,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() { return shardIndexingPressureStats; } + @Nullable + public SegmentReplicationStats getSegmentReplicationStats() { + return segmentReplicationStats; + } + @Nullable public SearchBackpressureStats getSearchBackpressureStats() { return searchBackpressureStats; @@ -381,6 +398,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(indexingPressureStats); out.writeOptionalWriteable(shardIndexingPressureStats); + if (out.getVersion().onOrAfter(Version.V_2_7_0)) { + out.writeOptionalWriteable(segmentReplicationStats); + } + if (out.getVersion().onOrAfter(Version.V_2_4_0)) { out.writeOptionalWriteable(searchBackpressureStats); } @@ -465,6 +486,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getShardIndexingPressureStats() != null) { getShardIndexingPressureStats().toXContent(builder, params); } + if (getSegmentReplicationStats() != null) { + getSegmentReplicationStats().toXContent(builder, params); + } if (getSearchBackpressureStats() != null) { getSearchBackpressureStats().toXContent(builder, params); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index a9c58ac803590..3b2908f1582fb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -207,6 +207,7 @@ public enum Metric { SCRIPT_CACHE("script_cache"), INDEXING_PRESSURE("indexing_pressure"), SHARD_INDEXING_PRESSURE("shard_indexing_pressure"), + SEGMENT_REPLICATION("segment_replication"), SEARCH_BACKPRESSURE("search_backpressure"), CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"), WEIGHTED_ROUTING_STATS("weighted_routing"), diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 02b5ceef2c7e4..c8334e1522449 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -119,6 +119,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics), NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics), NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics), + NodesStatsRequest.Metric.SEGMENT_REPLICATION.containedIn(metrics), NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics), NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics), NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics), diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 2fdaa46de01bc..b3d2615695e69 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -166,6 +166,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java new file mode 100644 index 0000000000000..6a08176e29786 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPerGroupStats.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Return Segment Replication stats for a Replication Group. + * + * @opensearch.internal + */ +public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment { + + private final Set replicaStats; + private final long rejectedRequestCount; + private final TimeValue averageReplicationLag; + + public SegmentReplicationPerGroupStats( + Set replicaStats, + long rejectedRequestCount, + TimeValue averageReplicationLag + ) { + this.replicaStats = replicaStats; + this.rejectedRequestCount = rejectedRequestCount; + this.averageReplicationLag = averageReplicationLag; + } + + public SegmentReplicationPerGroupStats(StreamInput in) throws IOException { + this.replicaStats = in.readSet(SegmentReplicationShardStats::new); + this.rejectedRequestCount = in.readVLong(); + this.averageReplicationLag = new TimeValue(in.readVLong(), TimeUnit.MILLISECONDS); + } + + public Set getReplicaStats() { + return replicaStats; + } + + public long getRejectedRequestCount() { + return rejectedRequestCount; + } + + public TimeValue getAverageReplicationLag() { + return averageReplicationLag; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("rejected_requests", rejectedRequestCount); + builder.field("average_replication_lag", averageReplicationLag); + builder.startObject("replicas"); + for (SegmentReplicationShardStats stats : replicaStats) { + stats.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(replicaStats); + out.writeVLong(rejectedRequestCount); + out.writeVLong(averageReplicationLag.millis()); + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java new file mode 100644 index 0000000000000..25e8fa045e2f7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; + +/** + * Service responsible for applying backpressure for lagging behind replicas when Segment Replication is enabled. + * + * @opensearch.internal + */ +public class SegmentReplicationPressureService { + + private final IndicesService indicesService; + private final SegmentReplicationStatsTracker tracker; + + public SegmentReplicationPressureService(IndicesService indexService) { + this.indicesService = indexService; + this.tracker = new SegmentReplicationStatsTracker(indicesService); + } + + public void isSegrepLimitBreached(ShardId shardId) { + // TODO. + } + + public SegmentReplicationStats nodeStats() { + return tracker.getStats(); + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java new file mode 100644 index 0000000000000..f81c8197e0f10 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationShardStats.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * SegRep stats for a single shard. + * + * @opensearch.internal + */ +public class SegmentReplicationShardStats implements Writeable, ToXContentFragment { + private final String allocationId; + private final long checkpointsBehindCount; + private final long bytesBehindCount; + private final TimeValue averageReplicationLag; + private final double averageCheckpointsPerSync; + + public SegmentReplicationShardStats( + String allocationId, + long checkpointsBehindCount, + long bytesBehindCount, + TimeValue averageReplicationLag, + double averageCheckpointsPerSync + ) { + this.allocationId = allocationId; + this.checkpointsBehindCount = checkpointsBehindCount; + this.bytesBehindCount = bytesBehindCount; + this.averageReplicationLag = averageReplicationLag; + this.averageCheckpointsPerSync = averageCheckpointsPerSync; + } + + public SegmentReplicationShardStats(StreamInput in) throws IOException { + this.allocationId = in.readString(); + this.checkpointsBehindCount = in.readVLong(); + this.bytesBehindCount = in.readVLong(); + this.averageReplicationLag = new TimeValue(in.readVLong(), TimeUnit.MILLISECONDS); + this.averageCheckpointsPerSync = in.readDouble(); + } + + public String getAllocationId() { + return allocationId; + } + + public long getCheckpointsBehindCount() { + return checkpointsBehindCount; + } + + public long getBytesBehindCount() { + return bytesBehindCount; + } + + public TimeValue getAverageReplicationLag() { + return averageReplicationLag; + } + + public double getAverageCheckpointsPerSync() { + return averageCheckpointsPerSync; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(allocationId); + builder.field("checkpoints_behind", checkpointsBehindCount); + builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString()); + builder.field("average_replication_lag", averageReplicationLag); + builder.field("average_checkpoints_per_sync", averageCheckpointsPerSync); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(allocationId); + out.writeVLong(checkpointsBehindCount); + out.writeVLong(bytesBehindCount); + out.writeVLong(averageReplicationLag.millis()); + out.writeDouble(averageCheckpointsPerSync); + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java new file mode 100644 index 0000000000000..f5db80eda8593 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStats.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Segment Replication Stats. + * + * @opensearch.internal + */ +public class SegmentReplicationStats implements Writeable, ToXContentFragment { + + private final Map shardStats; + + public SegmentReplicationStats(Map shardStats) { + this.shardStats = shardStats; + } + + public SegmentReplicationStats(StreamInput in) throws IOException { + int shardEntries = in.readInt(); + shardStats = new HashMap<>(); + for (int i = 0; i < shardEntries; i++) { + ShardId shardId = new ShardId(in); + SegmentReplicationPerGroupStats groupStats = new SegmentReplicationPerGroupStats(in); + shardStats.put(shardId, groupStats); + } + } + + public Map getShardStats() { + return shardStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("segment_replication"); + for (Map.Entry entry : shardStats.entrySet()) { + builder.startObject(entry.getKey().toString()); + entry.getValue().toXContent(builder, params); + builder.endObject(); + } + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(shardStats.size()); + for (Map.Entry entry : shardStats.entrySet()) { + entry.getKey().writeTo(out); + entry.getValue().writeTo(out); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java new file mode 100644 index 0000000000000..7f18a6241c4dc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Tracker responsible for computing SegmentReplicationStats. + * + * @opensearch.internal + */ +public class SegmentReplicationStatsTracker { + + private final IndicesService indicesService; + + public SegmentReplicationStatsTracker(IndicesService indicesService) { + this.indicesService = indicesService; + } + + public SegmentReplicationStats getStats() { + Map stats = new HashMap<>(); + for (IndexService indexShards : indicesService) { + for (IndexShard indexShard : indexShards) { + if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { + final Tuple> replicationStats = indexShard.getReplicationStats(); + stats.putIfAbsent( + indexShard.shardId(), + new SegmentReplicationPerGroupStats( + replicationStats.v2(), + // TODO: Store rejected counts. + 0L, + replicationStats.v1() + ) + ); + } + } + } + return new SegmentReplicationStats(stats); + } +} diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index cc94f199ebcd0..53961efb5557b 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -34,6 +34,7 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -48,14 +49,20 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.metrics.MeanMetric; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; import java.nio.file.Path; @@ -68,6 +75,8 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -243,6 +252,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Consumer onReplicationGroupUpdated; + private volatile ReplicationCheckpoint lastPublishedReplicationCheckpoint; + + private MeanMetric meanReplicationTime; + /** * Get all retention leases tracked on this shard. * @@ -665,15 +678,52 @@ public synchronized void renewPeerRecoveryRetentionLeases() { } /** - * The state of the lucene checkpoint - * - * @opensearch.internal - */ + * Fetch stats on segment replication. + * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, + * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. + */ + public synchronized Tuple> getSegmentReplicationStats() { + final ReplicationCheckpoint lastPublishedCheckpoint = this.lastPublishedReplicationCheckpoint; + if (lastPublishedCheckpoint != null) { + return new Tuple<>( + new TimeValue((long) meanReplicationTime.mean(), TimeUnit.MILLISECONDS), + this.checkpoints.entrySet() + .stream() + .filter(entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync) + .map(entry -> buildShardStats(lastPublishedCheckpoint.getLength(), entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()) + ); + } + return new Tuple<>(TimeValue.ZERO, Collections.emptySet()); + } + + private SegmentReplicationShardStats buildShardStats( + final long latestCheckpointLength, + final String allocationId, + final CheckpointState checkpointState + ) { + final Map checkpointTimers = checkpointState.checkpointTimers; + return new SegmentReplicationShardStats( + allocationId, + checkpointTimers.size(), + checkpointState.visibleReplicationCheckpoint == null + ? latestCheckpointLength + : latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), + new TimeValue((long) checkpointState.averageReplicationTime.mean(), TimeUnit.MILLISECONDS), + checkpointState.averageCheckpointsPerSync.mean() + ); + } + + /** + * The state of the lucene checkpoint + * + * @opensearch.internal + */ public static class CheckpointState implements Writeable { /** - * the last local checkpoint information that we have for this shard. All operations up to this point are properly fsynced to disk. - */ + * the last local checkpoint information that we have for this shard. All operations up to this point are properly fsynced to disk. + */ long localCheckpoint; /** @@ -699,12 +749,36 @@ public static class CheckpointState implements Writeable { */ boolean replicated; + /** + * The currently searchable replication checkpoint. + */ + ReplicationCheckpoint visibleReplicationCheckpoint; + + /** + * Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when + * the replica is caught up. + */ + Map checkpointTimers; + + /** + * The average amount of time it takes for a replica to sync to a published checkpoint. + */ + MeanMetric averageReplicationTime; + + /** + * The average amount of published checkpoints a replica jumps per replication event. + */ + MeanMetric averageCheckpointsPerSync; + public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean replicated) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; this.tracked = tracked; this.replicated = replicated; + this.checkpointTimers = ConcurrentCollections.newConcurrentMap(); + this.averageReplicationTime = new MeanMetric(); + this.averageCheckpointsPerSync = new MeanMetric(); } public CheckpointState(StreamInput in) throws IOException { @@ -1031,6 +1105,7 @@ public ReplicationTracker( this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; + this.meanReplicationTime = new MeanMetric(); assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -1135,6 +1210,74 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI assert invariant(); } + /** + * Update the local knowledge of the visible checkpoint for the specified allocation ID. + * + * @param allocationId the allocation ID to update the global checkpoint for + * @param visibleCheckpoint the visible checkpoint + */ + public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { + assert indexSettings.isSegRepEnabled(); + assert primaryMode; + assert handoffInProgress == false; + assert invariant(); + final CheckpointState cps = checkpoints.get(allocationId); + assert !this.shardAllocationId.equals(allocationId) && cps != null; + // stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers. + final AtomicInteger removedCount = new AtomicInteger(0); + if (cps.checkpointTimers.isEmpty() == false) { + cps.checkpointTimers.entrySet().removeIf((entry) -> { + boolean result = visibleCheckpoint.equals(entry.getKey()) || visibleCheckpoint.isAheadOf(entry.getKey()); + if (result) { + final ReplicationTimer timer = entry.getValue(); + timer.stop(); + cps.averageReplicationTime.inc(timer.time()); + meanReplicationTime.inc(timer.time()); + removedCount.incrementAndGet(); + } + return result; + }); + cps.averageCheckpointsPerSync.inc(removedCount.get()); + } + logger.trace( + () -> new ParameterizedMessage( + "updated local knowledge for [{}] on the primary of the visible checkpoint from [{}] to [{}] removed {} checkpoints, active timers {}", + allocationId, + cps.visibleReplicationCheckpoint, + visibleCheckpoint, + removedCount.get(), + cps.checkpointTimers.size() + ) + ); + cps.visibleReplicationCheckpoint = visibleCheckpoint; + assert invariant(); + } + + /** + * After a new checkpoint is published, start a timer for each replica to the checkpoint. + * @param checkpoint {@link ReplicationCheckpoint} + */ + public synchronized void updatePrimaryReplicationCheckpoint(ReplicationCheckpoint checkpoint) { + assert indexSettings.isSegRepEnabled(); + assert primaryMode; + assert handoffInProgress == false; + if (checkpoint.equals(lastPublishedReplicationCheckpoint) == false) { + this.lastPublishedReplicationCheckpoint = checkpoint; + for (Map.Entry entry : checkpoints.entrySet()) { + if (entry.getKey().equals(this.shardAllocationId) == false) { + final CheckpointState cps = entry.getValue(); + if (cps.inSync) { + cps.checkpointTimers.computeIfAbsent(lastPublishedReplicationCheckpoint, ignored -> { + final ReplicationTimer replicationTimer = new ReplicationTimer(); + replicationTimer.start(); + return replicationTimer; + }); + } + } + } + } + } + /** * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index fb046e2310d93..dd9d967d74ad5 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -41,7 +41,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) { - publisher.publish(shard); + publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 71ec8a4decd38..72fa7b8765b5d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -106,6 +106,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.bitset.ShardBitsetFilterCache; @@ -1490,7 +1491,12 @@ public Tuple, ReplicationCheckpoint> getLatestSegme getOperationPrimaryTerm(), segmentInfos.getGeneration(), shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(), - segmentInfos.getVersion() + segmentInfos.getVersion(), + // TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with + // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. + shardRouting.primary() + ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() + : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes() ) ); } catch (IOException e) { @@ -1737,6 +1743,10 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } + public void onCheckpointPublished(ReplicationCheckpoint checkpoint) { + replicationTracker.updatePrimaryReplicationCheckpoint(checkpoint); + } + /** * Wrapper for a non-closing reader * @@ -2702,6 +2712,28 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } + /** + * Update the local knowledge of the visible global checkpoint for the specified allocation ID. + * + * @param allocationId the allocation ID to update the global checkpoint for + * @param visibleCheckpoint the visible checkpoint + */ + public void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { + assert assertPrimaryMode(); + verifyNotClosed(); + replicationTracker.updateVisibleCheckpointForShard(allocationId, visibleCheckpoint); + } + + /** + * Fetch stats on segment replication. + * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, + * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. + */ + public Tuple> getReplicationStats() { + assert assertPrimaryMode(); + return replicationTracker.getSegmentReplicationStats(); + } + /** * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 750e7629783e7..f9720991338c2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -159,6 +159,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene sendFileStep.whenComplete(r -> { try { + shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint()); future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 0fd934c31ef7f..e3d19461f9e35 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -109,14 +109,13 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { /** * Publish checkpoint request to shard */ - final void publish(IndexShard indexShard) { + final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the sync is authorized threadContext.markAsSystemContext(); - PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); final List replicationTargets = indexShard.getReplicationGroup().getReplicationTargets(); for (ShardRouting replicationTarget : replicationTargets) { diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 6a4e5e449f178..ca7b29d8b090f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -30,6 +30,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable adjustNodesStats(List nodesStats) { nodeStats.getScriptCacheStats(), nodeStats.getIndexingPressureStats(), nodeStats.getShardIndexingPressureStats(), + nodeStats.getSegmentReplicationStats(), nodeStats.getSearchBackpressureStats(), nodeStats.getClusterManagerThrottlingStats(), nodeStats.getWeightedRoutingStats(), diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 0cbffdb38f05a..e2acd505ddd99 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2685,6 +2685,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(