Skip to content

Commit

Permalink
Compute Segment Replication stats for backpressure.
Browse files Browse the repository at this point in the history
This PR introduces new mechanisms to keep track of the current replicas within a replication group intended to be used
to apply backpressure.  The new stats are also added to NodeStats.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 2, 2023
1 parent ee305d0 commit 4d062a2
Show file tree
Hide file tree
Showing 26 changed files with 754 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http;

import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.client.Node;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.XContentTestUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.opensearch.rest.RestStatus.CREATED;
import static org.opensearch.rest.RestStatus.OK;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2,
numClientNodes = 0)
public class SegmentReplicationRestIT extends HttpSmokeTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@SuppressWarnings("unchecked")
public void testSegmentReplicationStats() throws Exception {
// create index.
Request createRequest = new Request("PUT", "/test_index");
createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " +
"\"replication\": {\"type\": \"SEGMENT\"}}}}");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertEquals(indexCreatedResponse.getStatusLine().getStatusCode(), OK.getStatus());
ensureGreen("test_index");

//index a doc
Request successfulIndexingRequest = new Request("POST", "/test_index/_doc/");
successfulIndexingRequest.setJsonEntity("{\"foo\": \"bar\"}");
final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertEquals(indexSuccessFul.getStatusLine().getStatusCode(), CREATED.getStatus());

assertBusy(() -> {
// wait for SR to run.
for (Node node : getRestClient().getNodes()) {
assertHitCount(client(node.getName()).prepareSearch("test_index").setSize(0).setPreference("_only_local").get(), 1);
}
});
Request statsRequest = new Request("GET", "/_nodes/stats/segment_replication?pretty");
final Response response = getRestClient().performRequest(statsRequest);
logger.info("Node stats response\n{}", EntityUtils.toString(response.getEntity()));
Map<String, Object> statsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(),
true);
List<Object> nodes = new ArrayList<>(((Map<Object, Object>) statsMap.get("nodes")).values());
assertEquals(2, nodes.size());
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) nodes.get(0));
final Map<Object, Object> node1_map = node1.get("segment_replication");
Map<Object, Object> primaryNode_map = node1_map;
if (node1_map.isEmpty()) {
XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) nodes.get(1));
primaryNode_map = node2.get("segment_replication");
}
List<Object> primary_values = new ArrayList<>(primaryNode_map
.values());
assertEquals(1, primary_values.size());
XContentTestUtils.JsonMapView shard1 = new XContentTestUtils.JsonMapView((Map<String, Object>) primary_values.get(0));
Integer node1TotalLimitsRejections = shard1.get("rejected_requests");
assertNotNull(node1TotalLimitsRejections);
String average_replication_lag = shard1.get("average_replication_lag");
assertNotNull(average_replication_lag);
List<Object> shard1_replicas = new ArrayList<>(((Map<Object, Object>) shard1.get("replicas")).values());
assertEquals(1, shard1_replicas.size());
XContentTestUtils.JsonMapView replica = new XContentTestUtils.JsonMapView((Map<String, Object>) shard1_replicas.get(0));
Integer checkpoints_behind = replica.get("checkpoints_behind");
assertEquals(0, checkpoints_behind.intValue());
assertNotNull(replica.get("bytes_behind"));
assertNotNull(replica.get("average_replication_lag"));
assertNotNull(replica.get("average_checkpoints_per_sync"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.SegmentReplicationStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

@Nullable
private SegmentReplicationStats segmentReplicationStats;

@Nullable
private SearchBackpressureStats searchBackpressureStats;

Expand Down Expand Up @@ -160,6 +164,12 @@ public NodeStats(StreamInput in) throws IOException {
indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);

if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
segmentReplicationStats = in.readOptionalWriteable(SegmentReplicationStats::new);
} else {
segmentReplicationStats = null;
}

if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new);
} else {
Expand Down Expand Up @@ -202,6 +212,7 @@ public NodeStats(
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SegmentReplicationStats segmentReplicationStats,
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
Expand All @@ -225,6 +236,7 @@ public NodeStats(
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.segmentReplicationStats = segmentReplicationStats;
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
Expand Down Expand Up @@ -338,6 +350,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Nullable
public SegmentReplicationStats getSegmentReplicationStats() {
return segmentReplicationStats;
}

@Nullable
public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
Expand Down Expand Up @@ -381,6 +398,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(indexingPressureStats);
out.writeOptionalWriteable(shardIndexingPressureStats);

if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalWriteable(segmentReplicationStats);
}

if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
Expand Down Expand Up @@ -465,6 +486,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
if (getSegmentReplicationStats() != null) {
getSegmentReplicationStats().toXContent(builder, params);
}
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public enum Metric {
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEGMENT_REPLICATION("segment_replication"),
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* Return Segment Replication stats for a Replication Group.
*
* @opensearch.internal
*/
public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment {

private final Set<SegmentReplicationShardStats> replicaStats;
private final long rejectedRequestCount;
private final TimeValue averageReplicationLag;

public SegmentReplicationPerGroupStats(
Set<SegmentReplicationShardStats> replicaStats,
long rejectedRequestCount,
TimeValue averageReplicationLag
) {
this.replicaStats = replicaStats;
this.rejectedRequestCount = rejectedRequestCount;
this.averageReplicationLag = averageReplicationLag;
}

public SegmentReplicationPerGroupStats(StreamInput in) throws IOException {
this.replicaStats = in.readSet(SegmentReplicationShardStats::new);
this.rejectedRequestCount = in.readVLong();
this.averageReplicationLag = new TimeValue(in.readVLong(), TimeUnit.MILLISECONDS);
}

public Set<SegmentReplicationShardStats> getReplicaStats() {
return replicaStats;
}

public long getRejectedRequestCount() {
return rejectedRequestCount;
}

public TimeValue getAverageReplicationLag() {
return averageReplicationLag;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("rejected_requests", rejectedRequestCount);
builder.field("average_replication_lag", averageReplicationLag);
builder.startObject("replicas");
for (SegmentReplicationShardStats stats : replicaStats) {
stats.toXContent(builder, params);
}
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(replicaStats);
out.writeVLong(rejectedRequestCount);
out.writeVLong(averageReplicationLag.millis());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;

/**
* Service responsible for applying backpressure for lagging behind replicas when Segment Replication is enabled.
*
* @opensearch.internal
*/
public class SegmentReplicationPressureService {

private final IndicesService indicesService;
private final SegmentReplicationStatsTracker tracker;

public SegmentReplicationPressureService(IndicesService indexService) {
this.indicesService = indexService;
this.tracker = new SegmentReplicationStatsTracker(indicesService);
}

public void isSegrepLimitBreached(ShardId shardId) {
// TODO.
}

public SegmentReplicationStats nodeStats() {
return tracker.getStats();
}
}
Loading

0 comments on commit 4d062a2

Please sign in to comment.