Skip to content

Commit

Permalink
Decouple replication lag from logic to fail stale replicas (#9507)
Browse files Browse the repository at this point in the history
* Decouple replication lag from replication timer logic used to fail stale replicas

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Added changelog entry

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments 2

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Retry gradle

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* fix UT

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Retry Gradle

Signed-off-by: Ankit Kala <ankikala@amazon.com>

---------

Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Aug 31, 2023
1 parent ae42deb commit be800c8
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- Add support for extensions to search responses using SearchExtBuilder ([#9379](https://github.com/opensearch-project/OpenSearch/pull/9379))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -54,7 +55,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}
Expand Down Expand Up @@ -225,7 +226,10 @@ public void testBelowReplicaLimit() throws Exception {

public void testFailStaleReplica() throws Exception {

Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.getKey(), TimeValue.timeValueMillis(1000))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -264,7 +268,9 @@ public void testWithDocumentReplicationEnabledIndex() throws Exception {
"Can't create DocRep index with remote store enabled. Skipping.",
Objects.equals(featureFlagSettings().get(FeatureFlags.REMOTE_STORE, "false"), "false")
);
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchBackpressureSettings.SETTING_CANCELLATION_BURST, // deprecated
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class SegmentReplicationPressureService implements Closeable {
private volatile boolean isSegmentReplicationBackpressureEnabled;
private volatile int maxCheckpointsBehind;
private volatile double maxAllowedStaleReplicas;
private volatile TimeValue maxReplicationTime;
private volatile TimeValue replicationTimeLimitBackpressure;
private volatile TimeValue replicationTimeLimitFailReplica;

private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class);

Expand All @@ -65,13 +66,23 @@ public class SegmentReplicationPressureService implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_REPLICATION_TIME_SETTING = Setting.positiveTimeSetting(
// Time limit on max allowed replica staleness after which backpressure kicks in on primary.
public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting(
"segrep.pressure.time.limit",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// Time limit on max allowed replica staleness after which we start failing the replica shard.
// Defaults to 0(disabled)
public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting(
"segrep.replication.time.limit",
TimeValue.timeValueMinutes(0),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Double> MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting(
"segrep.pressure.replica.stale.limit",
.5,
Expand Down Expand Up @@ -114,8 +125,11 @@ public SegmentReplicationPressureService(
this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind);

this.maxReplicationTime = MAX_REPLICATION_TIME_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_SETTING, this::setMaxReplicationTime);
this.replicationTimeLimitBackpressure = MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING, this::setReplicationTimeLimitBackpressure);

this.replicationTimeLimitFailReplica = MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, this::setReplicationTimeLimitFailReplica);

this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);
Expand Down Expand Up @@ -159,7 +173,7 @@ private void validateReplicationGroup(IndexShard shard) {
private Set<SegmentReplicationShardStats> getStaleReplicas(final Set<SegmentReplicationShardStats> replicas) {
return replicas.stream()
.filter(entry -> entry.getCheckpointsBehindCount() > maxCheckpointsBehind)
.filter(entry -> entry.getCurrentReplicationTimeMillis() > maxReplicationTime.millis())
.filter(entry -> entry.getCurrentReplicationTimeMillis() > replicationTimeLimitBackpressure.millis())
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -187,8 +201,12 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) {
this.maxAllowedStaleReplicas = maxAllowedStaleReplicas;
}

public void setMaxReplicationTime(TimeValue maxReplicationTime) {
this.maxReplicationTime = maxReplicationTime;
public void setReplicationTimeLimitFailReplica(TimeValue replicationTimeLimitFailReplica) {
this.replicationTimeLimitFailReplica = replicationTimeLimitFailReplica;
}

public void setReplicationTimeLimitBackpressure(TimeValue replicationTimeLimitBackpressure) {
this.replicationTimeLimitBackpressure = replicationTimeLimitBackpressure;
}

@Override
Expand Down Expand Up @@ -216,7 +234,8 @@ protected boolean mustReschedule() {

@Override
protected void runInternal() {
if (pressureService.isSegmentReplicationBackpressureEnabled) {
// Do not fail the replicas if time limit is set to 0 (i.e. disabled).
if (TimeValue.ZERO.equals(pressureService.replicationTimeLimitFailReplica) == false) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();

// Find the shardId in node which is having stale replicas with highest current replication time.
Expand All @@ -242,7 +261,7 @@ protected void runInternal() {
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
if (staleReplica.getCurrentReplicationTimeMillis() > pressureService.replicationTimeLimitFailReplica.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class SegmentReplicationShardStats implements Writeable, ToXContentFragme
private final String allocationId;
private final long checkpointsBehindCount;
private final long bytesBehindCount;
// Total Replication lag observed.
private final long currentReplicationLagMillis;
// Total time taken for replicas to catch up. Similar to replication lag except this
// doesn't include time taken by primary to upload data to remote store.
private final long currentReplicationTimeMillis;
private final long lastCompletedReplicationTimeMillis;

Expand All @@ -40,12 +44,14 @@ public SegmentReplicationShardStats(
long checkpointsBehindCount,
long bytesBehindCount,
long currentReplicationTimeMillis,
long currentReplicationLagMillis,
long lastCompletedReplicationTime
) {
this.allocationId = allocationId;
this.checkpointsBehindCount = checkpointsBehindCount;
this.bytesBehindCount = bytesBehindCount;
this.currentReplicationTimeMillis = currentReplicationTimeMillis;
this.currentReplicationLagMillis = currentReplicationLagMillis;
this.lastCompletedReplicationTimeMillis = lastCompletedReplicationTime;
}

Expand All @@ -55,6 +61,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException {
this.bytesBehindCount = in.readVLong();
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
}

public String getAllocationId() {
Expand All @@ -73,6 +80,19 @@ public long getCurrentReplicationTimeMillis() {
return currentReplicationTimeMillis;
}

/**
* Total Replication lag observed.
* @return currentReplicationLagMillis
*/
public long getCurrentReplicationLagMillis() {
return currentReplicationLagMillis;
}

/**
* Total time taken for replicas to catch up. Similar to replication lag except this doesn't include time taken by
* primary to upload data to remote store.
* @return lastCompletedReplicationTimeMillis
*/
public long getLastCompletedReplicationTimeMillis() {
return lastCompletedReplicationTimeMillis;
}
Expand All @@ -93,6 +113,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("checkpoints_behind", checkpointsBehindCount);
builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString());
builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis));
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
Expand All @@ -110,6 +131,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(bytesBehindCount);
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
}

@Override
Expand All @@ -121,6 +143,8 @@ public String toString() {
+ checkpointsBehindCount
+ ", bytesBehindCount="
+ bytesBehindCount
+ ", currentReplicationLagMillis="
+ currentReplicationLagMillis
+ ", currentReplicationTimeMillis="
+ currentReplicationTimeMillis
+ ", lastCompletedReplicationTimeMillis="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -714,7 +714,7 @@ public static class CheckpointState implements Writeable {
* Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when
* the replica is caught up.
*/
Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers;
Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers;

/**
* The time it took to complete the most recent replication event.
Expand Down Expand Up @@ -1188,9 +1188,9 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
cps.checkpointTimers.entrySet().removeIf((entry) -> {
boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false;
if (result) {
final ReplicationTimer timer = entry.getValue();
final SegmentReplicationLagTimer timer = entry.getValue();
timer.stop();
lastFinished.set(Math.max(lastFinished.get(), timer.time()));
lastFinished.set(Math.max(lastFinished.get(), timer.totalElapsedTime()));
}
return result;
});
Expand All @@ -1210,7 +1210,7 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
}

/**
* After a new checkpoint is published, start a timer for each replica to the checkpoint.
* After a new checkpoint is published, create a timer for each replica to the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
Expand All @@ -1219,15 +1219,15 @@ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint ch
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
startReplicationLagTimers();
createReplicationLagTimers();
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private void startReplicationLagTimers() {
private void createReplicationLagTimers() {
for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) {
final String allocationId = entry.getKey();
if (allocationId.equals(this.shardAllocationId) == false) {
Expand All @@ -1237,11 +1237,7 @@ private void startReplicationLagTimers() {
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final ReplicationTimer replicationTimer = new ReplicationTimer();
replicationTimer.start();
return replicationTimer;
});
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint for {} at visible cp {} to {} - timers [{}]",
Expand All @@ -1256,6 +1252,29 @@ private void startReplicationLagTimers() {
}
}

/**
* After a new checkpoint is published, start a timer per replica for the checkpoint.
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
if (primaryMode) {
checkpoints.entrySet().stream().filter(e -> !e.getKey().equals(this.shardAllocationId)).forEach(e -> {
String allocationId = e.getKey();
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
}
});
}
}

/**
* Fetch stats on segment replication.
* @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group,
Expand Down Expand Up @@ -1284,14 +1303,15 @@ private SegmentReplicationShardStats buildShardStats(
final String allocationId,
final CheckpointState checkpointState
) {
final Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers = checkpointState.checkpointTimers;
final Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers = checkpointState.checkpointTimers;
return new SegmentReplicationShardStats(
allocationId,
checkpointTimers.size(),
checkpointState.visibleReplicationCheckpoint == null
? latestCheckpointLength
: Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0),
checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
checkpointState.lastCompletedReplicationLag
);
}
Expand Down
Loading

0 comments on commit be800c8

Please sign in to comment.