Skip to content

Commit a4a867b

Browse files
guojialiang92jainankitk
authored andcommitted
[BUG FIX] Add reference count control in NRTReplicationEngine#acquireLastIndexCommit (opensearch-project#19214)
* NRTReplicationEngine#acquireLastIndexCommit add ref count control Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * fix test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * the incRef in NRTReplicationEngine#commitSegmentInfos should also be managed by lock. Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add change log Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> --------- Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> Signed-off-by: Ankit Jain <jainankitk@apache.org>
1 parent 81691f6 commit a4a867b

File tree

4 files changed

+175
-5
lines changed

4 files changed

+175
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3434
- Grant access to testclusters dir for tests ([#19085](https://github.com/opensearch-project/OpenSearch/issues/19085))
3535
- Fix assertion error when collapsing search results with concurrent segment search enabled ([#19053](https://github.com/opensearch-project/OpenSearch/pull/19053))
3636
- Fix skip_unavailable setting changing to default during node drop issue ([#18766](https://github.com/opensearch-project/OpenSearch/pull/18766))
37+
- Add reference count control in NRTReplicationEngine#acquireLastIndexCommit ([#19214](https://github.com/opensearch-project/OpenSearch/pull/19214))
3738
- Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212))
3839
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
3940
- Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188))

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
import org.apache.lucene.document.StringField;
1717
import org.apache.lucene.document.TextField;
1818
import org.apache.lucene.index.Fields;
19+
import org.apache.lucene.index.IndexCommit;
1920
import org.apache.lucene.index.IndexWriter;
2021
import org.apache.lucene.index.IndexWriterConfig;
2122
import org.apache.lucene.index.SegmentInfos;
2223
import org.apache.lucene.index.StandardDirectoryReader;
24+
import org.apache.lucene.store.Directory;
2325
import org.apache.lucene.tests.util.TestUtil;
2426
import org.apache.lucene.util.BytesRef;
2527
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
2628
import org.opensearch.action.admin.indices.alias.Alias;
2729
import org.opensearch.action.admin.indices.flush.FlushRequest;
30+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
2831
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
2932
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
3033
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
@@ -54,6 +57,7 @@
5457
import org.opensearch.cluster.routing.ShardRoutingState;
5558
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
5659
import org.opensearch.common.action.ActionFuture;
60+
import org.opensearch.common.concurrent.GatedCloseable;
5761
import org.opensearch.common.lease.Releasable;
5862
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
5963
import org.opensearch.common.settings.Settings;
@@ -136,6 +140,109 @@ private static String indexOrAlias() {
136140
return randomBoolean() ? INDEX_NAME : "alias";
137141
}
138142

143+
public void testAcquireLastIndexCommit() throws Exception {
144+
final String primaryNode = internalCluster().startDataOnlyNode();
145+
createIndex(INDEX_NAME);
146+
ensureYellowAndNoInitializingShards(INDEX_NAME);
147+
final String replicaNode = internalCluster().startDataOnlyNode();
148+
ensureGreen(INDEX_NAME);
149+
150+
// generate _0.si
151+
client().prepareIndex(INDEX_NAME)
152+
.setId(String.valueOf(1))
153+
.setSource("foo", "bar")
154+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
155+
.get();
156+
157+
// generate _1.si
158+
client().prepareIndex(INDEX_NAME)
159+
.setId(String.valueOf(2))
160+
.setSource("foo2", "bar2")
161+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
162+
.get();
163+
164+
waitForSearchableDocs(2, primaryNode, replicaNode);
165+
166+
// primary and replica generate index commit
167+
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
168+
primaryShard.flush(new FlushRequest(INDEX_NAME));
169+
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
170+
replicaShard.flush(new FlushRequest(INDEX_NAME));
171+
172+
logger.info("primary {} acquire last IndexCommit", primaryShard.shardId());
173+
GatedCloseable<IndexCommit> primaryIndexCommit = primaryShard.acquireLastIndexCommit(false);
174+
175+
logger.info("replica {} acquire last IndexCommit", replicaShard.shardId());
176+
GatedCloseable<IndexCommit> replicaIndexCommit = replicaShard.acquireLastIndexCommit(false);
177+
178+
logger.info("Verify that before merge, primary and replica contain _0.si and _1.si");
179+
Directory primaryDirectory = primaryShard.store().directory();
180+
Set<String> primaryFilesBeforeMerge = Sets.newHashSet(primaryDirectory.listAll());
181+
logger.info("primaryFilesBeforeMerge {}: {}", primaryFilesBeforeMerge.size(), primaryFilesBeforeMerge);
182+
assertTrue(
183+
primaryFilesBeforeMerge.stream().anyMatch(s -> s.startsWith("_0"))
184+
&& primaryFilesBeforeMerge.stream().anyMatch(s -> s.startsWith("_1"))
185+
);
186+
187+
Directory replicaDirectory = replicaShard.store().directory();
188+
Set<String> replicaFilesBeforeMerge = Sets.newHashSet(replicaDirectory.listAll());
189+
logger.info("replicaFilesBeforeMerge {}: {}", replicaFilesBeforeMerge.size(), replicaFilesBeforeMerge);
190+
assertTrue(
191+
replicaFilesBeforeMerge.stream().anyMatch(s -> s.startsWith("_0"))
192+
&& replicaFilesBeforeMerge.stream().anyMatch(s -> s.startsWith("_1"))
193+
);
194+
195+
// generate _2.si
196+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1));
197+
waitForSegmentCount(INDEX_NAME, 1, logger);
198+
primaryShard.flush(new FlushRequest(INDEX_NAME));
199+
replicaShard.flush(new FlushRequest(INDEX_NAME));
200+
201+
logger.info("Verify that after merge, primary and replica contain _0.si, _1.si and _2.si");
202+
Set<String> primaryFilesAfterMerge = Sets.newHashSet(primaryDirectory.listAll());
203+
logger.info("primaryFilesAfterMerge {}: {}", primaryFilesAfterMerge.size(), primaryFilesAfterMerge);
204+
assertTrue(
205+
primaryFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_0"))
206+
&& primaryFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_1"))
207+
&& primaryFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_2"))
208+
);
209+
210+
Set<String> replicaFilesAfterMerge = Sets.newHashSet(replicaDirectory.listAll());
211+
logger.info("replicaFilesAfterMerge {}: {}", replicaFilesAfterMerge.size(), replicaFilesAfterMerge);
212+
assertTrue(
213+
replicaFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_0"))
214+
&& replicaFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_1"))
215+
&& replicaFilesAfterMerge.stream().anyMatch(s -> s.startsWith("_2"))
216+
);
217+
218+
logger.info("Verify that after close index commit, primary and replica only contain _2.si");
219+
primaryIndexCommit.close();
220+
Set<String> primaryFilesAfterIndexCommitClose = Sets.newHashSet(primaryDirectory.listAll());
221+
logger.info(
222+
"primaryFilesAfterIndexCommitClose {}: {}",
223+
primaryFilesAfterIndexCommitClose.size(),
224+
primaryFilesAfterIndexCommitClose
225+
);
226+
assertTrue(
227+
primaryFilesAfterIndexCommitClose.stream().noneMatch(s -> s.startsWith("_0"))
228+
&& primaryFilesAfterIndexCommitClose.stream().noneMatch(s -> s.startsWith("_1"))
229+
&& primaryFilesAfterIndexCommitClose.stream().anyMatch(s -> s.startsWith("_2"))
230+
);
231+
232+
replicaIndexCommit.close();
233+
Set<String> replicaFilesAfterIndexCommitClose = Sets.newHashSet(replicaDirectory.listAll());
234+
logger.info(
235+
"replicaFilesAfterIndexCommitClose {}: {}",
236+
replicaFilesAfterIndexCommitClose.size(),
237+
replicaFilesAfterIndexCommitClose
238+
);
239+
assertTrue(
240+
replicaFilesAfterIndexCommitClose.stream().noneMatch(s -> s.startsWith("_0"))
241+
&& replicaFilesAfterIndexCommitClose.stream().noneMatch(s -> s.startsWith("_1"))
242+
&& replicaFilesAfterIndexCommitClose.stream().anyMatch(s -> s.startsWith("_2"))
243+
);
244+
}
245+
139246
public void testRetryPublishCheckPoint() throws Exception {
140247
// Reproduce the case where the replica shard cannot synchronize data from the primary shard when there is a network exception.
141248
// Test update of configuration PublishCheckpointAction#PUBLISH_CHECK_POINT_RETRY_TIMEOUT.

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
public class NRTReplicationEngine extends Engine {
6060

6161
private volatile SegmentInfos lastCommittedSegmentInfos;
62+
private final Object lastCommittedSegmentInfosMutex = new Object();
6263
private final NRTReplicationReaderManager readerManager;
6364
private final CompletionStatsCache completionStatsCache;
6465
private final LocalCheckpointTracker localCheckpointTracker;
@@ -194,9 +195,11 @@ private void commitSegmentInfos(SegmentInfos infos) throws IOException {
194195
// get a reference to the previous commit files so they can be decref'd once a new commit is made.
195196
final Collection<String> previousCommitFiles = getLastCommittedSegmentInfos().files(true);
196197
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
197-
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
198-
// incref the latest on-disk commit.
199-
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
198+
synchronized (lastCommittedSegmentInfosMutex) {
199+
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
200+
// incref the latest on-disk commit.
201+
replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true));
202+
}
200203
// decref the prev commit.
201204
replicaFileTracker.decRef(previousCommitFiles);
202205
translogManager.syncTranslog();
@@ -413,8 +416,12 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
413416
flush(false, true);
414417
}
415418
try {
416-
final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory());
417-
return new GatedCloseable<>(indexCommit, () -> {});
419+
synchronized (lastCommittedSegmentInfosMutex) {
420+
final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory());
421+
final Collection<String> files = indexCommit.getFileNames();
422+
replicaFileTracker.incRef(files);
423+
return new GatedCloseable<>(indexCommit, () -> { replicaFileTracker.decRef(files); });
424+
}
418425
} catch (IOException e) {
419426
throw new EngineException(shardId, "Unable to build latest IndexCommit", e);
420427
}

server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,61 @@ public class NRTReplicationEngineTests extends EngineTestCase {
5353
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
5454
);
5555

56+
public void testAcquireLastIndexCommit() throws Exception {
57+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
58+
59+
try (
60+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
61+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS)
62+
) {
63+
// only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2.
64+
final int docCount = 2;
65+
List<Engine.Operation> operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean());
66+
for (Engine.Operation op : operations) {
67+
applyOperation(engine, op);
68+
applyOperation(nrtEngine, op);
69+
// refresh to create a lot of segments.
70+
engine.refresh("test");
71+
}
72+
assertEquals(2, engine.segmentsStats(false, false).getCount());
73+
// wipe the nrt directory initially so we can sync with primary.
74+
Lucene.cleanLuceneIndex(nrtEngineStore.directory());
75+
for (String file : engine.getLatestSegmentInfos().files(true)) {
76+
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
77+
}
78+
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
79+
nrtEngine.flush(false, true);
80+
81+
// acquire latest commit
82+
GatedCloseable<IndexCommit> indexCommitGatedCloseable = nrtEngine.acquireLastIndexCommit(false);
83+
List<String> replicaFiles = List.of(nrtEngine.store.directory().listAll());
84+
85+
// merge primary down to 1 segment
86+
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
87+
88+
final Collection<String> latestPrimaryFiles = engine.getLatestSegmentInfos().files(false);
89+
// copy new segments in and load reader.
90+
for (String file : latestPrimaryFiles) {
91+
if (replicaFiles.contains(file) == false) {
92+
nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT);
93+
}
94+
}
95+
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
96+
nrtEngine.flush(false, true);
97+
98+
// Verify that the files contained in indexCommitGatedCloseable will not be deleted.
99+
replicaFiles = List.of(nrtEngine.store.directory().listAll());
100+
assertTrue(replicaFiles.containsAll(indexCommitGatedCloseable.get().getFileNames()));
101+
102+
// After closing, the files in indexCommitGatedCloseable will be deleted.
103+
indexCommitGatedCloseable.close();
104+
replicaFiles = List.of(nrtEngine.store.directory().listAll());
105+
for (String fileName : indexCommitGatedCloseable.get().getFileNames()) {
106+
assertFalse(replicaFiles.contains(fileName));
107+
}
108+
}
109+
}
110+
56111
public void testCreateEngine() throws IOException {
57112
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
58113
try (

0 commit comments

Comments
 (0)