Skip to content

Commit bbc7c24

Browse files
committed
Merge branch 'main' into tanik-derived-source-integration
Signed-off-by: tanik98 <72665765+tanik98@users.noreply.github.com>
2 parents 7df3db6 + 163c870 commit bbc7c24

File tree

35 files changed

+1859
-108
lines changed

35 files changed

+1859
-108
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4646
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4747
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
48+
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
4849
- [Derived Source] Add integration of derived source feature across various paths like get/search/recovery ([#18565](https://github.com/opensearch-project/OpenSearch/pull/18565))
4950

5051
### Changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
12+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.action.support.replication.TransportReplicationAction;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
17+
import org.opensearch.index.IndexSettings;
18+
import org.opensearch.index.TieredMergePolicyProvider;
19+
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
20+
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.opensearch.test.transport.MockTransportService;
22+
import org.opensearch.test.transport.StubbableTransport;
23+
import org.opensearch.transport.TransportService;
24+
import org.junit.Before;
25+
26+
import java.nio.file.Path;
27+
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
33+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
34+
public class RemoteStoreMergedSegmentWarmerIT extends SegmentReplicationBaseIT {
35+
private Path absolutePath;
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal) {
39+
if (absolutePath == null) {
40+
absolutePath = randomRepoPath().toAbsolutePath();
41+
}
42+
return Settings.builder()
43+
.put(super.nodeSettings(nodeOrdinal))
44+
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
45+
.build();
46+
}
47+
48+
@Override
49+
protected Settings featureFlagSettings() {
50+
Settings.Builder featureSettings = Settings.builder();
51+
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
52+
return featureSettings.build();
53+
}
54+
55+
@Before
56+
public void setup() {
57+
internalCluster().startClusterManagerOnlyNode();
58+
}
59+
60+
public void testMergeSegmentWarmerRemote() throws Exception {
61+
final String node1 = internalCluster().startDataOnlyNode();
62+
final String node2 = internalCluster().startDataOnlyNode();
63+
createIndex(INDEX_NAME);
64+
ensureGreen(INDEX_NAME);
65+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
66+
TransportService.class,
67+
node1
68+
);
69+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
70+
TransportService.class,
71+
node2
72+
);
73+
final CountDownLatch latch = new CountDownLatch(1);
74+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
75+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
76+
assertTrue(
77+
((TransportReplicationAction.ConcreteReplicaRequest) request)
78+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
79+
);
80+
latch.countDown();
81+
}
82+
connection.sendRequest(requestId, action, request, options);
83+
};
84+
85+
for (int i = 0; i < 30; i++) {
86+
client().prepareIndex(INDEX_NAME)
87+
.setId(String.valueOf(i))
88+
.setSource("foo" + i, "bar" + i)
89+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
90+
.get();
91+
}
92+
93+
waitForSearchableDocs(30, node1, node2);
94+
95+
mockTransportServiceNode1.addSendBehavior(behavior);
96+
mockTransportServiceNode2.addSendBehavior(behavior);
97+
98+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
99+
waitForSegmentCount(INDEX_NAME, 2, logger);
100+
assertTrue(latch.await(10, TimeUnit.SECONDS));
101+
mockTransportServiceNode1.clearAllRules();
102+
mockTransportServiceNode2.clearAllRules();
103+
}
104+
105+
public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
106+
String node1 = internalCluster().startDataOnlyNode();
107+
String node2 = internalCluster().startDataOnlyNode();
108+
createIndex(
109+
INDEX_NAME,
110+
Settings.builder()
111+
.put(indexSettings())
112+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
113+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
114+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
115+
.build()
116+
);
117+
ensureGreen(INDEX_NAME);
118+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
119+
TransportService.class,
120+
node1
121+
);
122+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
123+
TransportService.class,
124+
node2
125+
);
126+
CountDownLatch latch = new CountDownLatch(2);
127+
AtomicLong numInvocations = new AtomicLong(0);
128+
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
129+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
130+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
131+
assertTrue(
132+
((TransportReplicationAction.ConcreteReplicaRequest) request)
133+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
134+
);
135+
latch.countDown();
136+
numInvocations.incrementAndGet();
137+
executingThreads.add(Thread.currentThread().getName());
138+
}
139+
connection.sendRequest(requestId, action, request, options);
140+
};
141+
142+
mockTransportServiceNode1.addSendBehavior(behavior);
143+
mockTransportServiceNode2.addSendBehavior(behavior);
144+
145+
for (int i = 0; i < 30; i++) {
146+
client().prepareIndex(INDEX_NAME)
147+
.setId(String.valueOf(i))
148+
.setSource("foo" + i, "bar" + i)
149+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
150+
.get();
151+
}
152+
153+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
154+
155+
waitForSegmentCount(INDEX_NAME, 2, logger);
156+
logger.info("Number of merge invocations: {}", numInvocations.get());
157+
assertTrue(latch.await(10, TimeUnit.SECONDS));
158+
assertTrue(executingThreads.size() > 1);
159+
// Verify concurrent execution by checking that multiple unique threads handled merge operations
160+
assertTrue(numInvocations.get() > 1);
161+
mockTransportServiceNode1.clearAllRules();
162+
mockTransportServiceNode2.clearAllRules();
163+
}
164+
165+
public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
166+
internalCluster().startDataOnlyNode();
167+
createIndex(INDEX_NAME);
168+
ensureYellowAndNoInitializingShards(INDEX_NAME);
169+
170+
for (int i = 0; i < 30; i++) {
171+
client().prepareIndex(INDEX_NAME)
172+
.setId(String.valueOf(i))
173+
.setSource("foo" + i, "bar" + i)
174+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
175+
.get();
176+
}
177+
178+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
179+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
180+
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
181+
}
182+
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
package org.opensearch.indices.replication;
1010

11+
import org.apache.logging.log4j.Logger;
1112
import org.apache.lucene.index.SegmentInfos;
13+
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
14+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
15+
import org.opensearch.action.admin.indices.segments.ShardSegments;
1216
import org.opensearch.action.search.SearchResponse;
1317
import org.opensearch.cluster.ClusterState;
1418
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -22,11 +26,13 @@
2226
import org.opensearch.common.lease.Releasable;
2327
import org.opensearch.common.settings.Settings;
2428
import org.opensearch.common.unit.TimeValue;
29+
import org.opensearch.common.util.set.Sets;
2530
import org.opensearch.core.index.Index;
2631
import org.opensearch.index.IndexModule;
2732
import org.opensearch.index.IndexService;
2833
import org.opensearch.index.SegmentReplicationShardStats;
2934
import org.opensearch.index.engine.Engine;
35+
import org.opensearch.index.engine.Segment;
3036
import org.opensearch.index.shard.IndexShard;
3137
import org.opensearch.index.store.Store;
3238
import org.opensearch.index.store.StoreFileMetadata;
@@ -244,4 +250,26 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
244250
return closeable.get();
245251
}
246252
}
253+
254+
public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
255+
assertBusy(() -> {
256+
Set<String> primarySegments = Sets.newHashSet();
257+
Set<String> replicaSegments = Sets.newHashSet();
258+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
259+
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
260+
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
261+
for (Segment segment : shardSegment.getSegments()) {
262+
if (shardSegment.getShardRouting().primary()) {
263+
primarySegments.add(segment.getName());
264+
} else {
265+
replicaSegments.add(segment.getName());
266+
}
267+
}
268+
}
269+
}
270+
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
271+
assertEquals(segmentCount, primarySegments.size());
272+
assertEquals(segmentCount, replicaSegments.size());
273+
}, 1, TimeUnit.MINUTES);
274+
}
247275
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRepositoryRegistrationIT.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.core.xcontent.ToXContent;
2020
import org.opensearch.core.xcontent.XContentBuilder;
2121
import org.opensearch.plugins.Plugin;
22+
import org.opensearch.test.InternalTestCluster.RestartCallback;
2223
import org.opensearch.test.OpenSearchIntegTestCase;
2324
import org.opensearch.test.disruption.NetworkDisruption;
2425
import org.opensearch.test.transport.MockTransportService;
@@ -178,4 +179,105 @@ public void testSystemRepositorySettingIsHiddenForGetRepositoriesRequest() throw
178179
repositoriesResponse = GetRepositoriesResponse.fromXContent(createParser(xContentBuilder));
179180
assertEquals(false, SYSTEM_REPOSITORY_SETTING.get(repositoriesResponse.repositories().get(0).settings()));
180181
}
182+
183+
/**
184+
* Test node join failure when trying to join a cluster with different remote store repository attributes.
185+
* This negative test case verifies that nodes with incompatible remote store configurations are rejected.
186+
*/
187+
public void testNodeJoinFailureWithDifferentRemoteStoreRepositoryAttributes() throws Exception {
188+
// Start initial cluster with specific remote store repository configuration
189+
internalCluster().startNode();
190+
ensureStableCluster(1);
191+
192+
// Attempt to start a second node with different remote store attributes
193+
// This should fail because the remote store repository attributes don't match
194+
expectThrows(IllegalStateException.class, () -> {
195+
internalCluster().startNode(
196+
Settings.builder()
197+
.put("node.attr.remote_store.segment.repository", "different-repo")
198+
.put("node.attr.remote_store.translog.repository", "different-translog-repo")
199+
.build()
200+
);
201+
ensureStableCluster(2);
202+
});
203+
204+
ensureStableCluster(1);
205+
}
206+
207+
/**
208+
* Test node rejoin failure when node attributes are changed after initial join.
209+
* This test verifies that a node cannot rejoin the cluster with different remote store attributes.
210+
*/
211+
public void testNodeRejoinFailureWithChangedRemoteStoreAttributes() throws Exception {
212+
// Start cluster with 2 nodes
213+
internalCluster().startNodes(2);
214+
ensureStableCluster(2);
215+
216+
String nodeToRestart = internalCluster().getNodeNames()[1];
217+
218+
// Attempt to restart node with different remote store attributes should fail
219+
// The validation happens during node startup and throws IllegalStateException
220+
expectThrows(IllegalStateException.class, () -> {
221+
internalCluster().restartNode(nodeToRestart, new RestartCallback() {
222+
@Override
223+
public Settings onNodeStopped(String nodeName) {
224+
// Return different remote store attributes when restarting
225+
// This will fail because it's missing the required repository type attributes
226+
return Settings.builder()
227+
.put("node.attr.remote_store.segment.repository", "changed-segment-repo")
228+
.put("node.attr.remote_store.translog.repository", "changed-translog-repo")
229+
.build();
230+
}
231+
});
232+
});
233+
234+
ensureStableCluster(1);
235+
}
236+
237+
/**
238+
* Test node join failure when missing required remote store attributes.
239+
* This test verifies that nodes without proper remote store configuration are rejected.
240+
*/
241+
public void testNodeJoinFailureWithMissingRemoteStoreAttributes() throws Exception {
242+
internalCluster().startNode();
243+
ensureStableCluster(1);
244+
245+
// Attempt to add a node without remote store attributes
246+
// This should fail because remote store attributes are required
247+
expectThrows(IllegalStateException.class, () -> {
248+
internalCluster().startNode(
249+
Settings.builder()
250+
.putNull("node.attr.remote_store.segment.repository")
251+
.putNull("node.attr.remote_store.translog.repository")
252+
.build()
253+
);
254+
});
255+
256+
ensureStableCluster(1);
257+
}
258+
259+
/**
260+
* Test repository verification failure during node join.
261+
* This test verifies that nodes fail to join when remote store repositories cannot be verified
262+
* due to invalid repository settings or missing repository type information.
263+
*/
264+
public void testRepositoryVerificationFailureDuringNodeJoin() throws Exception {
265+
internalCluster().startNode();
266+
ensureStableCluster(1);
267+
268+
// Attempt to start a node with invalid repository type - this should fail during repository validation
269+
// We use an invalid repository type that doesn't exist to trigger repository verification failure
270+
expectThrows(Exception.class, () -> {
271+
internalCluster().startNode(
272+
Settings.builder()
273+
.put("node.attr.remote_store.segment.repository", REPOSITORY_NAME)
274+
.put("node.attr.remote_store.translog.repository", REPOSITORY_NAME)
275+
.put("node.attr.remote_store.repository." + REPOSITORY_NAME + ".type", "invalid_repo_type")
276+
.build()
277+
);
278+
});
279+
280+
ensureStableCluster(1);
281+
}
282+
181283
}

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,8 +1613,8 @@ public String toString() {
16131613
*
16141614
* @opensearch.internal
16151615
*/
1616-
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
1617-
1616+
public static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
1617+
// public for tests
16181618
private final long globalCheckpoint;
16191619
private final long maxSeqNoOfUpdatesOrDeletes;
16201620

0 commit comments

Comments
 (0)