Skip to content

Commit ead3aa0

Browse files
mch2Divyansh Sharma
authored andcommitted
Fix bugs in replication lag computation (opensearch-project#18602)
* Fix bug in replication lag computation. This change fixes a bug with replication lag computation to correctly use epoch reference point with Instant.now() and DateUtils. This change also fixes pruning logic to correctly remove the latest synced to checkpoint from tracking. Previously we would only prune up to the latest. This ensures that when a new checkpoint is eventually received we aren't incorrectly computing lag from the synced-to checkpoint. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * add changelog entry Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix flaky test to hardcode time lag between checkpoint arrival. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent acc4741 commit ead3aa0

File tree

4 files changed

+28
-12
lines changed

4 files changed

+28
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4444
- Cannot communicate with HTTP/2 when reactor-netty is enabled ([#18599](https://github.com/opensearch-project/OpenSearch/pull/18599))
4545
- Fix the visit of sub queries for HasParentQuery and HasChildQuery ([#18621](https://github.com/opensearch-project/OpenSearch/pull/18621))
4646
- Fix the backward compatibility regression with COMPLEMENT for Regexp queries introduced in OpenSearch 3.0 ([#18640](https://github.com/opensearch-project/OpenSearch/pull/18640))
47+
- Fix Replication lag computation ([#18602](https://github.com/opensearch-project/OpenSearch/pull/18602))
4748

4849
### Security
4950

server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.OpenSearchCorruptionException;
1616
import org.opensearch.common.Nullable;
1717
import org.opensearch.common.SetOnce;
18+
import org.opensearch.common.time.DateUtils;
1819
import org.opensearch.common.unit.TimeValue;
1920
import org.opensearch.common.util.concurrent.AbstractRunnable;
2021
import org.opensearch.common.util.concurrent.ConcurrentCollections;
@@ -31,12 +32,13 @@
3132
import org.opensearch.threadpool.ThreadPool;
3233

3334
import java.io.IOException;
35+
import java.time.Duration;
36+
import java.time.Instant;
3437
import java.util.List;
3538
import java.util.Map;
3639
import java.util.concurrent.ConcurrentMap;
3740
import java.util.concurrent.ConcurrentNavigableMap;
3841
import java.util.concurrent.ConcurrentSkipListMap;
39-
import java.util.concurrent.TimeUnit;
4042

4143
import reactor.util.annotation.NonNull;
4244

@@ -54,7 +56,7 @@ public class SegmentReplicator {
5456
private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;
5557
private final ReplicationCollection<MergedSegmentReplicationTarget> onGoingMergedSegmentReplications;
5658
private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();
57-
private final ConcurrentMap<ShardId, ConcurrentNavigableMap<Long, ReplicationCheckpointStats>> replicationCheckpointStats =
59+
protected final ConcurrentMap<ShardId, ConcurrentNavigableMap<Long, ReplicationCheckpointStats>> replicationCheckpointStats =
5860
ConcurrentCollections.newConcurrentMap();
5961
private final ConcurrentMap<ShardId, ReplicationCheckpoint> primaryCheckpoint = ConcurrentCollections.newConcurrentMap();
6062

@@ -167,9 +169,8 @@ public ReplicationStats getSegmentReplicationStats(final ShardId shardId) {
167169

168170
long bytesBehind = highestEntry.getValue().getBytesBehind();
169171
long replicationLag = bytesBehind > 0L
170-
? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lowestEntry.getValue().getTimestamp())
172+
? Duration.ofNanos(DateUtils.toLong(Instant.now()) - lowestEntry.getValue().getTimestamp()).toMillis()
171173
: 0;
172-
173174
return new ReplicationStats(bytesBehind, bytesBehind, replicationLag);
174175
}
175176

@@ -217,7 +218,7 @@ protected void pruneCheckpointsUpToLastSync(final IndexShard indexShard) {
217218
);
218219

219220
if (existingCheckpointStats != null && !existingCheckpointStats.isEmpty()) {
220-
existingCheckpointStats.keySet().removeIf(key -> key < segmentInfoVersion);
221+
existingCheckpointStats.keySet().removeIf(key -> key <= segmentInfoVersion);
221222
Map.Entry<Long, ReplicationCheckpointStats> lastEntry = existingCheckpointStats.lastEntry();
222223
if (lastEntry != null) {
223224
lastEntry.getValue().setBytesBehind(calculateBytesBehind(latestCheckpoint, indexReplicationCheckPoint));

server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.Version;
1212
import org.opensearch.common.Nullable;
1313
import org.opensearch.common.annotation.PublicApi;
14+
import org.opensearch.common.time.DateUtils;
1415
import org.opensearch.core.common.io.stream.StreamInput;
1516
import org.opensearch.core.common.io.stream.StreamOutput;
1617
import org.opensearch.core.common.io.stream.Writeable;
@@ -19,6 +20,7 @@
1920
import org.opensearch.index.store.StoreFileMetadata;
2021

2122
import java.io.IOException;
23+
import java.time.Instant;
2224
import java.util.Collections;
2325
import java.util.Map;
2426
import java.util.Objects;
@@ -56,11 +58,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) {
5658
length = 0L;
5759
this.codec = codec;
5860
this.metadataMap = Collections.emptyMap();
59-
this.createdTimeStamp = System.nanoTime();
61+
this.createdTimeStamp = DateUtils.toLong(Instant.now());
6062
}
6163

6264
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
63-
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.nanoTime());
65+
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), DateUtils.toLong(Instant.now()));
6466
}
6567

6668
public ReplicationCheckpoint(
@@ -79,7 +81,7 @@ public ReplicationCheckpoint(
7981
this.length = length;
8082
this.codec = codec;
8183
this.metadataMap = metadataMap;
82-
this.createdTimeStamp = System.nanoTime();
84+
this.createdTimeStamp = DateUtils.toLong(Instant.now());
8385
}
8486

8587
public ReplicationCheckpoint(

server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.cluster.service.ClusterService;
2222
import org.opensearch.common.lucene.Lucene;
2323
import org.opensearch.common.settings.Settings;
24+
import org.opensearch.common.time.DateUtils;
2425
import org.opensearch.core.action.ActionListener;
2526
import org.opensearch.core.index.shard.ShardId;
2627
import org.opensearch.index.ReplicationStats;
@@ -38,6 +39,7 @@
3839

3940
import java.io.IOException;
4041
import java.io.UncheckedIOException;
42+
import java.time.Instant;
4143
import java.util.ArrayList;
4244
import java.util.HashMap;
4345
import java.util.List;
@@ -215,10 +217,10 @@ public void testGetSegmentReplicationStats_WhenNoReplication() {
215217
assertEquals(0, replicationStats.maxBytesBehind);
216218
}
217219

218-
public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefreshedToNewCheckPoint() {
220+
public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefreshedToNewCheckPoint() throws InterruptedException {
219221
ShardId shardId = new ShardId("index", "uuid", 0);
220222
ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId);
221-
223+
long baseTime = DateUtils.toLong(Instant.now());
222224
StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, new BytesRef(500));
223225
StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, new BytesRef(500));
224226
Map<String, StoreFileMetadata> stringStoreFileMetadataMapOne = new HashMap<>();
@@ -232,7 +234,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
232234
1000,
233235
"",
234236
stringStoreFileMetadataMapOne,
235-
System.nanoTime() - TimeUnit.MINUTES.toNanos(1)
237+
baseTime - 5_000_000
236238
);
237239

238240
IndexShard replicaShard = mock(IndexShard.class);
@@ -260,7 +262,7 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
260262
200,
261263
"",
262264
stringStoreFileMetadataMapTwo,
263-
System.nanoTime() - TimeUnit.MINUTES.toNanos(1)
265+
baseTime - 1_000_000
264266
);
265267

266268
segmentReplicator.updateReplicationCheckpointStats(thirdReplicationCheckpoint, replicaShard);
@@ -276,6 +278,16 @@ public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefr
276278
assertEquals(200, replicationStatsSecond.totalBytesBehind);
277279
assertEquals(200, replicationStatsSecond.maxBytesBehind);
278280
assertTrue(replicationStatsSecond.maxReplicationLag > 0);
281+
282+
// shard finished syncing to last checkpoint (sis 3)
283+
when(replicaShard.getLatestReplicationCheckpoint()).thenReturn(thirdReplicationCheckpoint);
284+
segmentReplicator.pruneCheckpointsUpToLastSync(replicaShard);
285+
ReplicationStats finalStats = segmentReplicator.getSegmentReplicationStats(shardId);
286+
assertEquals(0, finalStats.totalBytesBehind);
287+
assertEquals(0, finalStats.maxBytesBehind);
288+
assertEquals(0, finalStats.maxReplicationLag);
289+
// shard is up to date, should not have any tracked stats
290+
assertTrue(segmentReplicator.replicationCheckpointStats.get(shardId).isEmpty());
279291
}
280292

281293
public void testGetSegmentReplicationStats_WhenCheckPointReceivedOutOfOrder() {

0 commit comments

Comments
 (0)