Skip to content

Commit

Permalink
Fix Segment Replication stats bytes behind metric (#9686)
Browse files Browse the repository at this point in the history
* Fix Segment Replication stats bytes behind metric.

This metric currently gives an estimate of the bytes behind based on the difference in size of the segments
referenced by the active readers between shards. This does not give a good indication of the amount of bytes
that need to be fetched and is inaccurate after deletes and merges. Fixed by sending file metadata with each checkpoint
and computing a diff between checkpoints when SegmentReplicationShardStats is built.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Skip SegRep bwc test until this is backported to 2.x.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add changelog entry.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored Sep 1, 2023
1 parent dbb868a commit f9b6694
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437))
- Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019))
- [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495))
- [Segment Replication] Fixed bug where bytes behind metric is not accurate ([#9686](https://github.com/opensearch-project/OpenSearch/pull/9686))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ private void printClusterRouting() throws IOException, ParseException {
* This test verifies that segment replication does not break when primary shards are on lower OS version. It does this
* by verifying replica shards contains same number of documents as primary's.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9685")
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

Expand Down Expand Up @@ -1290,27 +1292,25 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
)
.map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue()))
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
}
return Collections.emptySet();
}

private SegmentReplicationShardStats buildShardStats(
final long latestCheckpointLength,
final String allocationId,
final CheckpointState checkpointState
) {
final Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers = checkpointState.checkpointTimers;
private SegmentReplicationShardStats buildShardStats(final String allocationId, final CheckpointState cps) {
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpoint.getMetadataMap(),
cps.visibleReplicationCheckpoint != null ? cps.visibleReplicationCheckpoint.getMetadataMap() : Collections.emptyMap()
);
final long bytesBehind = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
return new SegmentReplicationShardStats(
allocationId,
checkpointTimers.size(),
checkpointState.visibleReplicationCheckpoint == null
? latestCheckpointLength
: Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
checkpointState.lastCompletedReplicationLag
cps.checkpointTimers.size(),
bytesBehind,
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
cps.lastCompletedReplicationLag
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,15 +1610,17 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName()
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -110,11 +113,13 @@ public void write(IndexOutput out) throws IOException {

public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
Map<String, String> metadata = indexInput.readMapOfStrings();
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput);
final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap = RemoteSegmentMetadata
.fromMapOfStrings(metadata);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap);
int byteArraySize = (int) indexInput.readLong();
byte[] segmentInfosBytes = new byte[byteArraySize];
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint);
return new RemoteSegmentMetadata(uploadedSegmentMetadataMap, segmentInfosBytes, replicationCheckpoint);
}

public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicationCheckpoint, IndexOutput out) throws IOException {
Expand All @@ -131,14 +136,30 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio
out.writeString(replicationCheckpoint.getCodec());
}

private static ReplicationCheckpoint readCheckpointFromIndexInput(IndexInput in) throws IOException {
private static ReplicationCheckpoint readCheckpointFromIndexInput(
IndexInput in,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap
) throws IOException {
return new ReplicationCheckpoint(
new ShardId(new Index(in.readString(), in.readString()), in.readVInt()),
in.readLong(),
in.readLong(),
in.readLong(),
in.readLong(),
in.readString()
in.readString(),
toStoreFileMetadata(uploadedSegmentMetadataMap)
);
}

private static Map<String, StoreFileMetadata> toStoreFileMetadata(
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata
) {
return metadata.entrySet()
.stream()
// TODO: Version here should be read from UploadedSegmentMetadata.
.map(
entry -> new StoreFileMetadata(entry.getKey(), entry.getValue().getLength(), entry.getValue().getChecksum(), Version.LATEST)
)
.collect(Collectors.toMap(StoreFileMetadata::name, Function.identity()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.StoreFileMetadata;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -32,6 +35,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentInfosVersion;
private final long length;
private final String codec;
private final Map<String, StoreFileMetadata> metadataMap;

public static ReplicationCheckpoint empty(ShardId shardId) {
return empty(shardId, "");
Expand All @@ -48,19 +52,29 @@ private ReplicationCheckpoint(ShardId shardId, String codec) {
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
length = 0L;
this.codec = codec;
this.metadataMap = Collections.emptyMap();
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec);
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap());
}

public ReplicationCheckpoint(
ShardId shardId,
long primaryTerm,
long segmentsGen,
long segmentInfosVersion,
long length,
String codec,
Map<String, StoreFileMetadata> metadataMap
) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.segmentInfosVersion = segmentInfosVersion;
this.length = length;
this.codec = codec;
this.metadataMap = metadataMap;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
Expand All @@ -75,6 +89,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException {
length = 0L;
codec = null;
}
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new);
} else {
this.metadataMap = Collections.emptyMap();
}
}

/**
Expand Down Expand Up @@ -135,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(length);
out.writeString(codec);
}
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
}
}

@Override
Expand Down Expand Up @@ -169,6 +191,10 @@ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
|| (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion());
}

public Map<String, StoreFileMetadata> getMetadataMap() {
return metadataMap;
}

@Override
public String toString() {
return "ReplicationCheckpoint{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.util.Version;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.AllocationId;
Expand All @@ -50,6 +51,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;
Expand Down Expand Up @@ -1826,29 +1828,35 @@ public void testSegmentReplicationCheckpointTracking() {

initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));

final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 1L, "abcd", Version.LATEST);
final StoreFileMetadata segment_2 = new StoreFileMetadata("segment_2", 50L, "abcd", Version.LATEST);
final StoreFileMetadata segment_3 = new StoreFileMetadata("segment_3", 100L, "abcd", Version.LATEST);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Map.of("segment_1", segment_1)
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
51L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1, "segment_2", segment_2)
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
151L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3)
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
Expand All @@ -1864,7 +1872,7 @@ public void testSegmentReplicationCheckpointTracking() {
assertEquals(expectedIds.size(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(3, shardStat.getCheckpointsBehindCount());
assertEquals(100L, shardStat.getBytesBehindCount());
assertEquals(151L, shardStat.getBytesBehindCount());
assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis());
}

Expand All @@ -1881,7 +1889,7 @@ public void testSegmentReplicationCheckpointTracking() {
assertEquals(expectedIds.size(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(2, shardStat.getCheckpointsBehindCount());
assertEquals(99L, shardStat.getBytesBehindCount());
assertEquals(150L, shardStat.getBytesBehindCount());
}

for (String id : expectedIds) {
Expand Down Expand Up @@ -1938,7 +1946,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() {
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Collections.emptyMap()
);
tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.startReplicationLagTimers(initialCheckpoint);
Expand Down
Loading

0 comments on commit f9b6694

Please sign in to comment.