Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Aug 31, 2023
1 parent 75ad56c commit b921169
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ public class SegmentReplicationPressureService implements Closeable {
Setting.Property.NodeScope
);

// Time limit on max allowed replica staleness after which backpressure kicks in on primary.
public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting(
"segrep.pressure.time.limit",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// Time limit on max allowed replica staleness after which we start failing the replica shard.
// Defaults to 0(disabled)
public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting(
"segrep.replication.time.limit",
TimeValue.timeValueMinutes(15),
TimeValue.timeValueMinutes(0),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public class SegmentReplicationShardStats implements Writeable, ToXContentFragme
private final String allocationId;
private final long checkpointsBehindCount;
private final long bytesBehindCount;
private final long currentReplicationTimeMillis;
// Total Replication lag observed.
private final long currentReplicationLagMillis;
// Total time taken for replicas to catch up. Similar to replication lag except this
// doesn't include time taken by primary to upload data to remote store.
private final long currentReplicationTimeMillis;
private final long lastCompletedReplicationTimeMillis;

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1235,10 +1235,7 @@ private void createReplicationLagTimers() {
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) {
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> {
final SegmentReplicationLagTimer replicationTimer = new SegmentReplicationLagTimer();
return replicationTimer;
});
cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> new SegmentReplicationLagTimer());
logger.trace(
() -> new ParameterizedMessage(
"updated last published checkpoint for {} at visible cp {} to {} - timers [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
* 1. Replication Lag: Total time taken by replica to sync after primary refreshed.
* 2. Replication event time: Total time taken by replica to sync after primary published the checkpoint
* (excludes the time spent by primary for uploading the segments to remote store).
*
* @opensearch.internal
*/
public class SegmentReplicationLagTimer extends ReplicationTimer {
private long creationTime;
Expand Down

0 comments on commit b921169

Please sign in to comment.