Skip to content

Commit 0e84bd5

Browse files
committed
Avoid primary shard failure caused by merge segment warmer exceptions
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
1 parent a19434c commit 0e84bd5

File tree

2 files changed

+60
-21
lines changed

2 files changed

+60
-21
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
1515
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1616
import org.opensearch.action.admin.indices.segments.ShardSegments;
17+
import org.opensearch.action.index.IndexRequestBuilder;
1718
import org.opensearch.action.support.WriteRequest;
1819
import org.opensearch.common.settings.Settings;
1920
import org.opensearch.common.unit.TimeValue;
@@ -38,6 +39,7 @@
3839
import java.util.stream.Collectors;
3940

4041
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
42+
import static org.hamcrest.Matchers.equalTo;
4143

4244
/**
4345
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
@@ -59,6 +61,33 @@ protected Settings featureFlagSettings() {
5961
return featureSettings.build();
6062
}
6163

64+
public void testPrimaryNodeRestart() throws Exception {
65+
logger.info("--> start nodes");
66+
internalCluster().startNode();
67+
68+
logger.info("--> creating test index: {}", INDEX_NAME);
69+
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("number_of_replicas", 0).build());
70+
71+
ensureGreen();
72+
73+
logger.info("--> indexing sample data");
74+
final int numDocs = 100;
75+
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
76+
77+
for (int i = 0; i < numDocs; i++) {
78+
docs[i] = client().prepareIndex(INDEX_NAME)
79+
.setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat());
80+
}
81+
82+
indexRandom(true, docs);
83+
flush();
84+
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), equalTo((long) numDocs));
85+
86+
logger.info("--> restarting cluster");
87+
internalCluster().fullRestart();
88+
ensureGreen();
89+
}
90+
6291
public void testMergeSegmentWarmer() throws Exception {
6392
final String primaryNode = internalCluster().startDataOnlyNode();
6493
final String replicaNode = internalCluster().startDataOnlyNode();

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,29 +51,39 @@ public MergedSegmentWarmer(
5151

5252
@Override
5353
public void warm(LeafReader leafReader) throws IOException {
54-
if (shouldWarm() == false) {
55-
return;
56-
}
57-
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
58-
assert leafReader instanceof SegmentReader;
59-
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
54+
try {
55+
if (shouldWarm() == false) {
56+
return;
57+
}
58+
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
59+
assert leafReader instanceof SegmentReader;
60+
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
6061

61-
long startTime = System.currentTimeMillis();
62-
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
63-
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
64-
indexShard.publishMergedSegment(segmentCommitInfo);
65-
logger.trace(() -> {
66-
long segmentSize = -1;
67-
try {
68-
segmentSize = segmentCommitInfo.sizeInBytes();
69-
} catch (IOException ignored) {}
70-
return new ParameterizedMessage(
71-
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
72-
segmentCommitInfo.info.name,
73-
segmentSize,
74-
(System.currentTimeMillis() - startTime)
62+
long startTime = System.currentTimeMillis();
63+
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
64+
logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
65+
indexShard.publishMergedSegment(segmentCommitInfo);
66+
logger.trace(() -> {
67+
long segmentSize = -1;
68+
try {
69+
segmentSize = segmentCommitInfo.sizeInBytes();
70+
} catch (IOException ignored) {}
71+
return new ParameterizedMessage(
72+
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
73+
segmentCommitInfo.info.name,
74+
segmentSize,
75+
(System.currentTimeMillis() - startTime)
76+
);
77+
});
78+
} catch (Exception e) {
79+
logger.warn(
80+
() -> new ParameterizedMessage(
81+
"throw exception during merged segment warmer, skip merged segment {} warmer",
82+
((SegmentReader) leafReader).getSegmentName()
83+
),
84+
e
7585
);
76-
});
86+
}
7787
}
7888

7989
// package-private for tests

0 commit comments

Comments
 (0)