Skip to content

Commit 08a64bc

Browse files
add setting for all-active ingestion
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent abc3657 commit 08a64bc

File tree

12 files changed

+158
-32
lines changed

12 files changed

+158
-32
lines changed

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void testFileIngestion() throws Exception {
110110
assertEquals(0, ingestionState.getFailedShards());
111111
assertTrue(
112112
Arrays.stream(ingestionState.getShardStates())
113-
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"))
113+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"))
114114
);
115115
});
116116

@@ -129,7 +129,7 @@ public void testFileIngestion() throws Exception {
129129
Arrays.stream(ingestionState.getShardStates())
130130
.allMatch(
131131
state -> state.isPollerPaused() == false
132-
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
132+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
133133
)
134134
);
135135
});

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ public void testAllActiveIngestion() throws Exception {
269269
.put("ingestion_source.param.topic", topicName)
270270
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
271271
.put("ingestion_source.pointer.init.reset", "earliest")
272+
.put("ingestion_source.all_active", true)
272273
.build(),
273274
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
274275
);
@@ -387,6 +388,7 @@ public void testReplicaPromotionOnAllActiveIngestion() throws Exception {
387388
.put("ingestion_source.param.topic", topicName)
388389
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
389390
.put("ingestion_source.pointer.init.reset", "earliest")
391+
.put("ingestion_source.all_active", true)
390392
.build(),
391393
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
392394
);
@@ -438,6 +440,7 @@ public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
438440
.put("ingestion_source.param.topic", topicName)
439441
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
440442
.put("ingestion_source.pointer.init.reset", "earliest")
443+
.put("ingestion_source.all_active", true)
441444
.build(),
442445
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
443446
);

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public void testAllActiveIngestion() throws Exception {
159159
.put("ingestion_source.param.region", localstack.getRegion())
160160
.put("ingestion_source.param.access_key", localstack.getAccessKey())
161161
.put("ingestion_source.param.secret_key", localstack.getSecretKey())
162+
.put("ingestion_source.all_active", true)
162163
.put(
163164
"ingestion_source.param.endpoint_override",
164165
localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS).toString()
@@ -167,7 +168,6 @@ public void testAllActiveIngestion() throws Exception {
167168
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
168169
);
169170

170-
flush(indexName);
171171
waitForSearchableDocs(10, List.of(nodeA, nodeB));
172172
}
173173

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateAction.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.index.shard.IndexShard;
3232
import org.opensearch.index.shard.ShardNotFoundException;
3333
import org.opensearch.indices.IndicesService;
34-
import org.opensearch.indices.replication.common.ReplicationType;
3534
import org.opensearch.tasks.Task;
3635
import org.opensearch.threadpool.ThreadPool;
3736
import org.opensearch.transport.TransportService;
@@ -151,20 +150,17 @@ public void onFailure(Exception e) {
151150
*/
152151
@Override
153152
protected ShardsIterator shards(ClusterState clusterState, GetIngestionStateRequest request, String[] concreteIndices) {
154-
Set<String> docRepIndexSet = new HashSet<>();
153+
Set<String> allActiveIndexSet = new HashSet<>();
155154
for (String index : concreteIndices) {
156155
IndexMetadata indexMetadata = clusterState.metadata().index(index);
157-
if (indexMetadata != null) {
158-
ReplicationType replicationType = getReplicationType(indexMetadata);
159-
if (replicationType == ReplicationType.DOCUMENT) {
160-
docRepIndexSet.add(index);
161-
}
156+
if (indexMetadata != null && isAllActiveIngestionEnabled(indexMetadata)) {
157+
allActiveIndexSet.add(index);
162158
}
163159
}
164160

165161
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
166162
Predicate<ShardRouting> shardFilter = shardRouting -> shardRouting.primary()
167-
|| docRepIndexSet.contains(shardRouting.getIndexName());
163+
|| allActiveIndexSet.contains(shardRouting.getIndexName());
168164
if (shardSet.isEmpty() == false) {
169165
shardFilter = shardFilter.and(shardRouting -> shardSet.contains(shardRouting.shardId().getId()));
170166
}
@@ -239,7 +235,9 @@ protected ShardIngestionState shardOperation(GetIngestionStateRequest request, S
239235
}
240236
}
241237

242-
private ReplicationType getReplicationType(IndexMetadata indexMetadata) {
243-
return IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings());
238+
private boolean isAllActiveIngestionEnabled(IndexMetadata indexMetadata) {
239+
return indexMetadata.useIngestionSource()
240+
&& indexMetadata.getIngestionSource() != null
241+
&& indexMetadata.getIngestionSource().isAllActiveIngestionEnabled();
244242
}
245243
}

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateAction.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.opensearch.indices.IndicesService;
3030
import org.opensearch.indices.pollingingest.IngestionSettings;
3131
import org.opensearch.indices.pollingingest.StreamPoller;
32-
import org.opensearch.indices.replication.common.ReplicationType;
3332
import org.opensearch.threadpool.ThreadPool;
3433
import org.opensearch.transport.TransportService;
3534

@@ -84,20 +83,17 @@ public TransportUpdateIngestionStateAction(
8483
*/
8584
@Override
8685
protected ShardsIterator shards(ClusterState clusterState, UpdateIngestionStateRequest request, String[] concreteIndices) {
87-
Set<String> docRepIndexSet = new HashSet<>();
86+
Set<String> allActiveIndexSet = new HashSet<>();
8887
for (String index : concreteIndices) {
8988
IndexMetadata indexMetadata = clusterState.metadata().index(index);
90-
if (indexMetadata != null) {
91-
ReplicationType replicationType = getReplicationType(indexMetadata);
92-
if (replicationType == ReplicationType.DOCUMENT) {
93-
docRepIndexSet.add(index);
94-
}
89+
if (indexMetadata != null && isAllActiveIngestionEnabled(indexMetadata)) {
90+
allActiveIndexSet.add(index);
9591
}
9692
}
9793

9894
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
9995
Predicate<ShardRouting> shardFilter = shardRouting -> shardRouting.primary()
100-
|| docRepIndexSet.contains(shardRouting.getIndexName());
96+
|| allActiveIndexSet.contains(shardRouting.getIndexName());
10197
if (shardSet.isEmpty() == false) {
10298
shardFilter = shardFilter.and(shardRouting -> shardSet.contains(shardRouting.shardId().getId()));
10399
}
@@ -193,7 +189,9 @@ private ResumeIngestionRequest.ResetSettings getResetSettingsForShard(UpdateInge
193189
return Arrays.stream(resetSettings).filter(setting -> setting.getShard() == targetShardId).findFirst().orElse(null);
194190
}
195191

196-
private ReplicationType getReplicationType(IndexMetadata indexMetadata) {
197-
return IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings());
192+
private boolean isAllActiveIngestionEnabled(IndexMetadata indexMetadata) {
193+
return indexMetadata.useIngestionSource()
194+
&& indexMetadata.getIngestionSource() != null
195+
&& indexMetadata.getIngestionSource().isAllActiveIngestionEnabled();
198196
}
199197
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,18 @@ public Iterator<Setting<?>> settings() {
907907
Setting.Property.Final
908908
);
909909

910+
/**
911+
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
912+
* streaming source and process the updates.
913+
*/
914+
public static final String SETTING_INGESTION_SOURCE_ALL_ACTIVE_INGESTION = "index.ingestion_source.all_active";
915+
public static final Setting<Boolean> INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING = Setting.boolSetting(
916+
SETTING_INGESTION_SOURCE_ALL_ACTIVE_INGESTION,
917+
false,
918+
Property.IndexScope,
919+
Setting.Property.Final
920+
);
921+
910922
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
911923
"index.ingestion_source.param.",
912924
key -> new Setting<>(key, "", (value) -> {
@@ -1151,6 +1163,7 @@ public IngestionSource getIngestionSource() {
11511163
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
11521164
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
11531165
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
1166+
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
11541167

11551168
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
11561169
.setPointerInitReset(pointerInitReset)
@@ -1159,6 +1172,7 @@ public IngestionSource getIngestionSource() {
11591172
.setPollTimeout(pollTimeout)
11601173
.setNumProcessorThreads(numProcessorThreads)
11611174
.setBlockingQueueSize(blockingQueueSize)
1175+
.setAllActiveIngestion(allActiveIngestionEnabled)
11621176
.build();
11631177
}
11641178
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Map;
1818
import java.util.Objects;
1919

20+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING;
2021
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
2122
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
2223
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
@@ -35,6 +36,7 @@ public class IngestionSource {
3536
private final int pollTimeout;
3637
private int numProcessorThreads;
3738
private int blockingQueueSize;
39+
private final boolean allActiveIngestion;
3840

3941
private IngestionSource(
4042
String type,
@@ -44,7 +46,8 @@ private IngestionSource(
4446
long maxPollSize,
4547
int pollTimeout,
4648
int numProcessorThreads,
47-
int blockingQueueSize
49+
int blockingQueueSize,
50+
boolean allActiveIngestion
4851
) {
4952
this.type = type;
5053
this.pointerInitReset = pointerInitReset;
@@ -54,6 +57,7 @@ private IngestionSource(
5457
this.pollTimeout = pollTimeout;
5558
this.numProcessorThreads = numProcessorThreads;
5659
this.blockingQueueSize = blockingQueueSize;
60+
this.allActiveIngestion = allActiveIngestion;
5761
}
5862

5963
public String getType() {
@@ -88,6 +92,10 @@ public int getBlockingQueueSize() {
8892
return blockingQueueSize;
8993
}
9094

95+
public boolean isAllActiveIngestionEnabled() {
96+
return allActiveIngestion;
97+
}
98+
9199
@Override
92100
public boolean equals(Object o) {
93101
if (this == o) return true;
@@ -100,7 +108,8 @@ public boolean equals(Object o) {
100108
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
101109
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout)
102110
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
103-
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize);
111+
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
112+
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion);
104113
}
105114

106115
@Override
@@ -113,7 +122,8 @@ public int hashCode() {
113122
maxPollSize,
114123
pollTimeout,
115124
numProcessorThreads,
116-
blockingQueueSize
125+
blockingQueueSize,
126+
allActiveIngestion
117127
);
118128
}
119129

@@ -139,6 +149,8 @@ public String toString() {
139149
+ numProcessorThreads
140150
+ ", blockingQueueSize="
141151
+ blockingQueueSize
152+
+ ", allActiveIngestion="
153+
+ allActiveIngestion
142154
+ '}';
143155
}
144156

@@ -196,6 +208,7 @@ public static class Builder {
196208
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);
197209
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
198210
private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY);
211+
private boolean allActiveIngestion = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getDefault(Settings.EMPTY);
199212

200213
public Builder(String type) {
201214
this.type = type;
@@ -208,6 +221,7 @@ public Builder(IngestionSource ingestionSource) {
208221
this.errorStrategy = ingestionSource.errorStrategy;
209222
this.params = ingestionSource.params;
210223
this.blockingQueueSize = ingestionSource.blockingQueueSize;
224+
this.allActiveIngestion = ingestionSource.allActiveIngestion;
211225
}
212226

213227
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -250,6 +264,11 @@ public Builder setBlockingQueueSize(int blockingQueueSize) {
250264
return this;
251265
}
252266

267+
public Builder setAllActiveIngestion(boolean allActiveIngestion) {
268+
this.allActiveIngestion = allActiveIngestion;
269+
return this;
270+
}
271+
253272
public IngestionSource build() {
254273
return new IngestionSource(
255274
type,
@@ -259,7 +278,8 @@ public IngestionSource build() {
259278
maxPollSize,
260279
pollTimeout,
261280
numProcessorThreads,
262-
blockingQueueSize
281+
blockingQueueSize,
282+
allActiveIngestion
263283
);
264284
}
265285

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
277277
IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT,
278278
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
279279
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
280+
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
280281

281282
// Settings for search replica
282283
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.indices.pollingingest;
1010

1111
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.metadata.IngestionSource;
1213
import org.opensearch.index.IndexSettings;
1314
import org.opensearch.index.IngestionConsumerFactory;
1415
import org.opensearch.index.engine.Engine;
@@ -32,15 +33,31 @@ public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory)
3233
}
3334

3435
/**
35-
* Document replication equivalent in pull-based ingestion is to ingest on both primary and replica nodes using the
36-
* IngestionEngine. Segment replication will use the NRTReplicationEngine on replicas.
36+
* Segment replication will use the IngestionEngine on primary and NRTReplicationEngine on replicas.
37+
* All-active ingestion mode is supported, where the replicas will consume and process messages from the streaming
38+
* source similar to the primary. This mode is currently not supported with segment replication.
3739
*/
3840
@Override
3941
public Engine newReadWriteEngine(EngineConfig config) {
40-
boolean isDocRep = getReplicationType(config) == ReplicationType.DOCUMENT;
42+
IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource();
43+
boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled();
44+
boolean isSegRep = getReplicationType(config) == ReplicationType.SEGMENT;
4145

46+
// all-active ingestion is currently not supported with segment replication.
47+
if (isAllActiveIngestion && isSegRep) {
48+
throw new IllegalArgumentException("All-active ingestion is not supported with segment replication");
49+
}
50+
51+
if (isAllActiveIngestion) {
52+
// use ingestion engine on both primary and replica
53+
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory);
54+
ingestionEngine.start();
55+
return ingestionEngine;
56+
}
57+
58+
// For non all-active modes, fallback to the standard segrep model
4259
// NRTReplicationEngine is used for segment replication on replicas
43-
if (isDocRep == false && config.isReadOnlyReplica()) {
60+
if (config.isReadOnlyReplica()) {
4461
return new NRTReplicationEngine(config);
4562
}
4663

server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateActionTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.cluster.block.ClusterBlockLevel;
1717
import org.opensearch.cluster.metadata.IndexMetadata;
1818
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.opensearch.cluster.metadata.IngestionSource;
1920
import org.opensearch.cluster.metadata.Metadata;
2021
import org.opensearch.cluster.node.DiscoveryNode;
2122
import org.opensearch.cluster.routing.ShardRouting;
@@ -203,4 +204,28 @@ public void testNewResponse() {
203204
assertThat(response.getShardStates()[0].getIndex(), equalTo("test-index"));
204205
assertThat(response.getShardStates()[0].getShardId(), equalTo(0));
205206
}
207+
208+
public void testShardsWithAllActiveIngestionEnabled() {
209+
GetIngestionStateRequest request = new GetIngestionStateRequest(new String[] { "test-index" });
210+
request.setShards(new int[] { 0, 1 });
211+
ClusterState clusterState = mock(ClusterState.class);
212+
ShardsIterator shardsIterator = mock(ShardsIterator.class);
213+
214+
when(clusterState.routingTable()).thenReturn(mock(org.opensearch.cluster.routing.RoutingTable.class));
215+
when(clusterState.routingTable().allShardsSatisfyingPredicate(any(), any())).thenReturn(shardsIterator);
216+
217+
Metadata metadata = mock(Metadata.class);
218+
IndexMetadata indexMetadata = mock(IndexMetadata.class);
219+
IngestionSource ingestionSource = mock(IngestionSource.class);
220+
221+
// Set up mocks for all-active ingestion enabled
222+
when(clusterState.metadata()).thenReturn(metadata);
223+
when(metadata.index("test-index")).thenReturn(indexMetadata);
224+
when(indexMetadata.useIngestionSource()).thenReturn(true);
225+
when(indexMetadata.getIngestionSource()).thenReturn(ingestionSource);
226+
when(ingestionSource.isAllActiveIngestionEnabled()).thenReturn(true);
227+
228+
ShardsIterator result = action.shards(clusterState, request, new String[] { "test-index" });
229+
assertThat(result, equalTo(shardsIterator));
230+
}
206231
}

0 commit comments

Comments
 (0)