Skip to content

Commit 4f1e398

Browse files
authored
Merge branch 'main' into ip
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
2 parents 9377d86 + 9e49930 commit 4f1e398

File tree

40 files changed

+1873
-125
lines changed

40 files changed

+1873
-125
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4646
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4747
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
48+
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
4849
- [Star-Tree] Add search support for ip field type ([#18671](https://github.com/opensearch-project/OpenSearch/pull/18671))
4950

5051
### Changed
@@ -55,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5556
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
5657
- Optimize grouping for segment concurrent search by ensuring that documents within each group are as equal as possible ([#18451](https://github.com/opensearch-project/OpenSearch/pull/18451))
5758
- Move transport-grpc from a core plugin to a module ([#18897](https://github.com/opensearch-project/OpenSearch/pull/18897))
59+
- Remove `experimental` designation from transport-grpc settings ([#18915](https://github.com/opensearch-project/OpenSearch/pull/18915))
5860

5961
### Dependencies
6062
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
@@ -103,6 +105,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
103105
- Fix leafSorter optimization for ReadOnlyEngine and NRTReplicationEngine ([#18639](https://github.com/opensearch-project/OpenSearch/pull/18639))
104106
- Close IndexFieldDataService asynchronously ([#18888](https://github.com/opensearch-project/OpenSearch/pull/18888))
105107
- Fix query string regex queries incorrectly swallowing TooComplexToDeterminizeException ([#18883](https://github.com/opensearch-project/OpenSearch/pull/18883))
108+
- Fix socks5 user password settings for Azure repo ([#18904](https://github.com/opensearch-project/OpenSearch/pull/18904))
109+
106110

107111
### Security
108112

modules/transport-grpc/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ The `transport-grpc` module initializes a new client/server transport implementi
99
Enable this transport with:
1010

1111
```
12-
setting 'aux.transport.types', '[experimental-transport-grpc]'
13-
setting 'aux.transport.experimental-transport-grpc.port', '9400-9500' //optional
12+
setting 'aux.transport.types', '[transport-grpc]'
13+
setting 'aux.transport.transport-grpc.port', '9400-9500' //optional
1414
```
1515

1616
For the secure transport:
1717

1818
```
19-
setting 'aux.transport.types', '[experimental-secure-transport-grpc]'
20-
setting 'aux.transport.experimental-secure-transport-grpc.port', '9400-9500' //optional
19+
setting 'aux.transport.types', '[secure-transport-grpc]'
20+
setting 'aux.transport.secure-transport-grpc.port', '9400-9500' //optional
2121
```
2222

2323

@@ -76,5 +76,5 @@ setting 'grpc.netty.keepalive_timeout', '1s'
7676
To run OpenSearch with the gRPC transport enabled:
7777

7878
```bash
79-
./gradlew run -Dtests.opensearch.aux.transport.types="[experimental-transport-grpc]"
79+
./gradlew run -Dtests.opensearch.aux.transport.types="[transport-grpc]"
8080
```

modules/transport-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ opensearchplugin {
1616

1717
testClusters {
1818
integTest {
19-
setting 'aux.transport.types', '[experimental-transport-grpc]'
19+
setting 'aux.transport.types', '[transport-grpc]'
2020
}
2121
}
2222

modules/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/Netty4GrpcServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class Netty4GrpcServerTransport extends AuxTransport {
6262
/**
6363
* Type key for configuring settings of this auxiliary transport.
6464
*/
65-
public static final String GRPC_TRANSPORT_SETTING_KEY = "experimental-transport-grpc";
65+
public static final String GRPC_TRANSPORT_SETTING_KEY = "transport-grpc";
6666

6767
/**
6868
* Port range on which to bind.

modules/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class SecureNetty4GrpcServerTransport extends Netty4GrpcServerTransport {
4242
/**
4343
* Type key to select secure transport.
4444
*/
45-
public static final String GRPC_SECURE_TRANSPORT_SETTING_KEY = "experimental-secure-transport-grpc";
45+
public static final String GRPC_SECURE_TRANSPORT_SETTING_KEY = "secure-transport-grpc";
4646

4747
/**
4848
* Distinct port setting required as it depends on transport type key.

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureStorageService.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@
5959
import org.opensearch.core.common.unit.ByteSizeValue;
6060

6161
import java.io.IOException;
62-
import java.net.Authenticator;
63-
import java.net.PasswordAuthentication;
6462
import java.net.URISyntaxException;
6563
import java.security.AccessController;
6664
import java.security.InvalidKeyException;
@@ -209,15 +207,11 @@ private ClientState buildClient(AzureStorageSettings azureStorageSettings, BiCon
209207
SocketAccess.doPrivilegedVoidException(() -> {
210208
final ProxySettings proxySettings = azureStorageSettings.getProxySettings();
211209
if (proxySettings != ProxySettings.NO_PROXY_SETTINGS) {
210+
final ProxyOptions proxyOptions = new ProxyOptions(proxySettings.getType().toProxyType(), proxySettings.getAddress());
212211
if (proxySettings.isAuthenticated()) {
213-
Authenticator.setDefault(new Authenticator() {
214-
@Override
215-
protected PasswordAuthentication getPasswordAuthentication() {
216-
return new PasswordAuthentication(proxySettings.getUsername(), proxySettings.getPassword().toCharArray());
217-
}
218-
});
212+
proxyOptions.setCredentials(proxySettings.getUsername(), proxySettings.getPassword());
219213
}
220-
clientBuilder.proxy(new ProxyOptions(proxySettings.getType().toProxyType(), proxySettings.getAddress()));
214+
clientBuilder.proxy(proxyOptions);
221215
}
222216
});
223217

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
12+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.action.support.replication.TransportReplicationAction;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
17+
import org.opensearch.index.IndexSettings;
18+
import org.opensearch.index.TieredMergePolicyProvider;
19+
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
20+
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.opensearch.test.transport.MockTransportService;
22+
import org.opensearch.test.transport.StubbableTransport;
23+
import org.opensearch.transport.TransportService;
24+
import org.junit.Before;
25+
26+
import java.nio.file.Path;
27+
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
33+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
34+
public class RemoteStoreMergedSegmentWarmerIT extends SegmentReplicationBaseIT {
35+
private Path absolutePath;
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal) {
39+
if (absolutePath == null) {
40+
absolutePath = randomRepoPath().toAbsolutePath();
41+
}
42+
return Settings.builder()
43+
.put(super.nodeSettings(nodeOrdinal))
44+
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
45+
.build();
46+
}
47+
48+
@Override
49+
protected Settings featureFlagSettings() {
50+
Settings.Builder featureSettings = Settings.builder();
51+
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
52+
return featureSettings.build();
53+
}
54+
55+
@Before
56+
public void setup() {
57+
internalCluster().startClusterManagerOnlyNode();
58+
}
59+
60+
public void testMergeSegmentWarmerRemote() throws Exception {
61+
final String node1 = internalCluster().startDataOnlyNode();
62+
final String node2 = internalCluster().startDataOnlyNode();
63+
createIndex(INDEX_NAME);
64+
ensureGreen(INDEX_NAME);
65+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
66+
TransportService.class,
67+
node1
68+
);
69+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
70+
TransportService.class,
71+
node2
72+
);
73+
final CountDownLatch latch = new CountDownLatch(1);
74+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
75+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
76+
assertTrue(
77+
((TransportReplicationAction.ConcreteReplicaRequest) request)
78+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
79+
);
80+
latch.countDown();
81+
}
82+
connection.sendRequest(requestId, action, request, options);
83+
};
84+
85+
for (int i = 0; i < 30; i++) {
86+
client().prepareIndex(INDEX_NAME)
87+
.setId(String.valueOf(i))
88+
.setSource("foo" + i, "bar" + i)
89+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
90+
.get();
91+
}
92+
93+
waitForSearchableDocs(30, node1, node2);
94+
95+
mockTransportServiceNode1.addSendBehavior(behavior);
96+
mockTransportServiceNode2.addSendBehavior(behavior);
97+
98+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
99+
waitForSegmentCount(INDEX_NAME, 2, logger);
100+
assertTrue(latch.await(10, TimeUnit.SECONDS));
101+
mockTransportServiceNode1.clearAllRules();
102+
mockTransportServiceNode2.clearAllRules();
103+
}
104+
105+
public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
106+
String node1 = internalCluster().startDataOnlyNode();
107+
String node2 = internalCluster().startDataOnlyNode();
108+
createIndex(
109+
INDEX_NAME,
110+
Settings.builder()
111+
.put(indexSettings())
112+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
113+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
114+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
115+
.build()
116+
);
117+
ensureGreen(INDEX_NAME);
118+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
119+
TransportService.class,
120+
node1
121+
);
122+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
123+
TransportService.class,
124+
node2
125+
);
126+
CountDownLatch latch = new CountDownLatch(2);
127+
AtomicLong numInvocations = new AtomicLong(0);
128+
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
129+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
130+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
131+
assertTrue(
132+
((TransportReplicationAction.ConcreteReplicaRequest) request)
133+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
134+
);
135+
latch.countDown();
136+
numInvocations.incrementAndGet();
137+
executingThreads.add(Thread.currentThread().getName());
138+
}
139+
connection.sendRequest(requestId, action, request, options);
140+
};
141+
142+
mockTransportServiceNode1.addSendBehavior(behavior);
143+
mockTransportServiceNode2.addSendBehavior(behavior);
144+
145+
for (int i = 0; i < 30; i++) {
146+
client().prepareIndex(INDEX_NAME)
147+
.setId(String.valueOf(i))
148+
.setSource("foo" + i, "bar" + i)
149+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
150+
.get();
151+
}
152+
153+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
154+
155+
waitForSegmentCount(INDEX_NAME, 2, logger);
156+
logger.info("Number of merge invocations: {}", numInvocations.get());
157+
assertTrue(latch.await(10, TimeUnit.SECONDS));
158+
assertTrue(executingThreads.size() > 1);
159+
// Verify concurrent execution by checking that multiple unique threads handled merge operations
160+
assertTrue(numInvocations.get() > 1);
161+
mockTransportServiceNode1.clearAllRules();
162+
mockTransportServiceNode2.clearAllRules();
163+
}
164+
165+
public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
166+
internalCluster().startDataOnlyNode();
167+
createIndex(INDEX_NAME);
168+
ensureYellowAndNoInitializingShards(INDEX_NAME);
169+
170+
for (int i = 0; i < 30; i++) {
171+
client().prepareIndex(INDEX_NAME)
172+
.setId(String.valueOf(i))
173+
.setSource("foo" + i, "bar" + i)
174+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
175+
.get();
176+
}
177+
178+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
179+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
180+
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
181+
}
182+
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
package org.opensearch.indices.replication;
1010

11+
import org.apache.logging.log4j.Logger;
1112
import org.apache.lucene.index.SegmentInfos;
13+
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
14+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
15+
import org.opensearch.action.admin.indices.segments.ShardSegments;
1216
import org.opensearch.action.search.SearchResponse;
1317
import org.opensearch.cluster.ClusterState;
1418
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -22,11 +26,13 @@
2226
import org.opensearch.common.lease.Releasable;
2327
import org.opensearch.common.settings.Settings;
2428
import org.opensearch.common.unit.TimeValue;
29+
import org.opensearch.common.util.set.Sets;
2530
import org.opensearch.core.index.Index;
2631
import org.opensearch.index.IndexModule;
2732
import org.opensearch.index.IndexService;
2833
import org.opensearch.index.SegmentReplicationShardStats;
2934
import org.opensearch.index.engine.Engine;
35+
import org.opensearch.index.engine.Segment;
3036
import org.opensearch.index.shard.IndexShard;
3137
import org.opensearch.index.store.Store;
3238
import org.opensearch.index.store.StoreFileMetadata;
@@ -244,4 +250,26 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
244250
return closeable.get();
245251
}
246252
}
253+
254+
public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
255+
assertBusy(() -> {
256+
Set<String> primarySegments = Sets.newHashSet();
257+
Set<String> replicaSegments = Sets.newHashSet();
258+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
259+
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
260+
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
261+
for (Segment segment : shardSegment.getSegments()) {
262+
if (shardSegment.getShardRouting().primary()) {
263+
primarySegments.add(segment.getName());
264+
} else {
265+
replicaSegments.add(segment.getName());
266+
}
267+
}
268+
}
269+
}
270+
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
271+
assertEquals(segmentCount, primarySegments.size());
272+
assertEquals(segmentCount, replicaSegments.size());
273+
}, 1, TimeUnit.MINUTES);
274+
}
247275
}

0 commit comments

Comments
 (0)