Skip to content

Commit 73068b4

Browse files
support docrep equivalent in pull-based ingestion
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 10f8c8e commit 73068b4

File tree

12 files changed

+506
-23
lines changed

12 files changed

+506
-23
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- Grok processor supports capturing multiple values for same field name ([#18799](https://github.com/opensearch-project/OpenSearch/pull/18799))
2323
- Upgrade opensearch-protobufs dependency to 0.13.0 and update transport-grpc module compatibility ([#19007](https://github.com/opensearch-project/OpenSearch/issues/19007))
2424
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
25+
- Grok processor supports capturing multiple values for same field name ([#18799](https://github.com/opensearch-project/OpenSearch/pull/18799)
26+
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316)
2527

2628
### Changed
2729
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 330 additions & 0 deletions
Large diffs are not rendered by default.

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,39 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
139139
});
140140
}
141141

142+
public void testAllActiveIngestion() throws Exception {
143+
// Create pull-based index in default replication mode (docrep) and publish some messages
144+
internalCluster().startClusterManagerOnlyNode();
145+
final String nodeA = internalCluster().startDataOnlyNode();
146+
final String nodeB = internalCluster().startDataOnlyNode();
147+
for (int i = 0; i < 10; i++) {
148+
produceData(Integer.toString(i), "name" + i, "30");
149+
}
150+
151+
createIndex(
152+
"test",
153+
Settings.builder()
154+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
155+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
156+
.put("ingestion_source.type", "kinesis")
157+
.put("ingestion_source.pointer.init.reset", "earliest")
158+
.put("ingestion_source.param.stream", "test")
159+
.put("ingestion_source.param.region", localstack.getRegion())
160+
.put("ingestion_source.param.access_key", localstack.getAccessKey())
161+
.put("ingestion_source.param.secret_key", localstack.getSecretKey())
162+
.put(
163+
"ingestion_source.param.endpoint_override",
164+
localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS).toString()
165+
)
166+
.build(),
167+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
168+
);
169+
170+
ensureGreen(indexName);
171+
flush(indexName);
172+
waitForSearchableDocs(10, List.of(nodeA, nodeB));
173+
}
174+
142175
private boolean isRewinded(String sequenceNumber) {
143176
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(
144177
DescribeStreamRequest.builder().streamName(streamName).build()

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionState.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@
2929
*/
3030
@ExperimentalApi
3131
public record ShardIngestionState(String index, int shardId, String pollerState, String errorPolicy, boolean isPollerPaused,
32-
boolean isWriteBlockEnabled, String batchStartPointer) implements Writeable, ToXContentFragment {
32+
boolean isWriteBlockEnabled, String batchStartPointer, boolean isPrimary, String nodeName) implements Writeable, ToXContentFragment {
3333

3434
private static final String SHARD = "shard";
3535
private static final String POLLER_STATE = "poller_state";
3636
private static final String ERROR_POLICY = "error_policy";
3737
private static final String POLLER_PAUSED = "poller_paused";
3838
private static final String WRITE_BLOCK_ENABLED = "write_block_enabled";
3939
private static final String BATCH_START_POINTER = "batch_start_pointer";
40+
private static final String PRIMARY_OR_REPLICA = "prirep";
41+
private static final String NODE_NAME = "node";
4042

4143
public ShardIngestionState() {
42-
this("", -1, "", "", false, false, "");
44+
this("", -1, "", "", false, false, "", true, "");
4345
}
4446

4547
public ShardIngestionState(StreamInput in) throws IOException {
@@ -50,6 +52,8 @@ public ShardIngestionState(StreamInput in) throws IOException {
5052
in.readOptionalString(),
5153
in.readBoolean(),
5254
in.readBoolean(),
55+
in.readString(),
56+
in.readBoolean(),
5357
in.readString()
5458
);
5559
}
@@ -61,7 +65,9 @@ public ShardIngestionState(
6165
@Nullable String errorPolicy,
6266
boolean isPollerPaused,
6367
boolean isWriteBlockEnabled,
64-
String batchStartPointer
68+
String batchStartPointer,
69+
boolean isPrimary,
70+
String nodeName
6571
) {
6672
this.index = index;
6773
this.shardId = shardId;
@@ -70,6 +76,8 @@ public ShardIngestionState(
7076
this.isPollerPaused = isPollerPaused;
7177
this.isWriteBlockEnabled = isWriteBlockEnabled;
7278
this.batchStartPointer = batchStartPointer;
79+
this.isPrimary = isPrimary;
80+
this.nodeName = nodeName;
7381
}
7482

7583
@Override
@@ -81,6 +89,8 @@ public void writeTo(StreamOutput out) throws IOException {
8189
out.writeBoolean(isPollerPaused);
8290
out.writeBoolean(isWriteBlockEnabled);
8391
out.writeString(batchStartPointer);
92+
out.writeBoolean(isPrimary);
93+
out.writeString(nodeName);
8494
}
8595

8696
@Override
@@ -92,6 +102,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
92102
builder.field(POLLER_PAUSED, isPollerPaused);
93103
builder.field(WRITE_BLOCK_ENABLED, isWriteBlockEnabled);
94104
builder.field(BATCH_START_POINTER, batchStartPointer);
105+
builder.field(PRIMARY_OR_REPLICA, isPrimary ? "p" : "r");
106+
builder.field(NODE_NAME, nodeName);
95107
builder.endObject();
96108
return builder;
97109
}

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportGetIngestionStateAction.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.cluster.ClusterState;
1919
import org.opensearch.cluster.block.ClusterBlockException;
2020
import org.opensearch.cluster.block.ClusterBlockLevel;
21+
import org.opensearch.cluster.metadata.IndexMetadata;
2122
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
2223
import org.opensearch.cluster.routing.ShardRouting;
2324
import org.opensearch.cluster.routing.ShardsIterator;
@@ -30,13 +31,15 @@
3031
import org.opensearch.index.shard.IndexShard;
3132
import org.opensearch.index.shard.ShardNotFoundException;
3233
import org.opensearch.indices.IndicesService;
34+
import org.opensearch.indices.replication.common.ReplicationType;
3335
import org.opensearch.tasks.Task;
3436
import org.opensearch.threadpool.ThreadPool;
3537
import org.opensearch.transport.TransportService;
3638
import org.opensearch.transport.client.node.NodeClient;
3739

3840
import java.io.IOException;
3941
import java.util.Arrays;
42+
import java.util.HashSet;
4043
import java.util.List;
4144
import java.util.Map;
4245
import java.util.Set;
@@ -148,10 +151,20 @@ public void onFailure(Exception e) {
148151
*/
149152
@Override
150153
protected ShardsIterator shards(ClusterState clusterState, GetIngestionStateRequest request, String[] concreteIndices) {
151-
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
154+
Set<String> docRepIndexSet = new HashSet<>();
155+
for (String index : concreteIndices) {
156+
IndexMetadata indexMetadata = clusterState.metadata().index(index);
157+
if (indexMetadata != null) {
158+
ReplicationType replicationType = getReplicationType(indexMetadata);
159+
if (replicationType == ReplicationType.DOCUMENT) {
160+
docRepIndexSet.add(index);
161+
}
162+
}
163+
}
152164

153-
// add filters for index and shard from the request
154-
Predicate<ShardRouting> shardFilter = ShardRouting::primary;
165+
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
166+
Predicate<ShardRouting> shardFilter = shardRouting -> shardRouting.primary()
167+
|| docRepIndexSet.contains(shardRouting.getIndexName());
155168
if (shardSet.isEmpty() == false) {
156169
shardFilter = shardFilter.and(shardRouting -> shardSet.contains(shardRouting.shardId().getId()));
157170
}
@@ -217,9 +230,27 @@ protected ShardIngestionState shardOperation(GetIngestionStateRequest request, S
217230
}
218231

219232
try {
220-
return indexShard.getIngestionState();
233+
ShardIngestionState baseState = indexShard.getIngestionState();
234+
String nodeName = clusterService.localNode().getName();
235+
236+
// rebuild ingestion state with primary/replica and node details
237+
return new ShardIngestionState(
238+
baseState.index(),
239+
baseState.shardId(),
240+
baseState.pollerState(),
241+
baseState.errorPolicy(),
242+
baseState.isPollerPaused(),
243+
baseState.isWriteBlockEnabled(),
244+
baseState.batchStartPointer(),
245+
shardRouting.primary(),
246+
nodeName != null ? nodeName : ""
247+
);
221248
} catch (final AlreadyClosedException e) {
222249
throw new ShardNotFoundException(indexShard.shardId());
223250
}
224251
}
252+
253+
private ReplicationType getReplicationType(IndexMetadata indexMetadata) {
254+
return IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings());
255+
}
225256
}

server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/state/TransportUpdateIngestionStateAction.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.cluster.ClusterState;
1616
import org.opensearch.cluster.block.ClusterBlockException;
1717
import org.opensearch.cluster.block.ClusterBlockLevel;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
1819
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
1920
import org.opensearch.cluster.routing.ShardRouting;
2021
import org.opensearch.cluster.routing.ShardsIterator;
@@ -28,11 +29,13 @@
2829
import org.opensearch.indices.IndicesService;
2930
import org.opensearch.indices.pollingingest.IngestionSettings;
3031
import org.opensearch.indices.pollingingest.StreamPoller;
32+
import org.opensearch.indices.replication.common.ReplicationType;
3133
import org.opensearch.threadpool.ThreadPool;
3234
import org.opensearch.transport.TransportService;
3335

3436
import java.io.IOException;
3537
import java.util.Arrays;
38+
import java.util.HashSet;
3639
import java.util.List;
3740
import java.util.Set;
3841
import java.util.function.Predicate;
@@ -81,9 +84,20 @@ public TransportUpdateIngestionStateAction(
8184
*/
8285
@Override
8386
protected ShardsIterator shards(ClusterState clusterState, UpdateIngestionStateRequest request, String[] concreteIndices) {
84-
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
87+
Set<String> docRepIndexSet = new HashSet<>();
88+
for (String index : concreteIndices) {
89+
IndexMetadata indexMetadata = clusterState.metadata().index(index);
90+
if (indexMetadata != null) {
91+
ReplicationType replicationType = getReplicationType(indexMetadata);
92+
if (replicationType == ReplicationType.DOCUMENT) {
93+
docRepIndexSet.add(index);
94+
}
95+
}
96+
}
8597

86-
Predicate<ShardRouting> shardFilter = ShardRouting::primary;
98+
Set<Integer> shardSet = Arrays.stream(request.getShards()).boxed().collect(Collectors.toSet());
99+
Predicate<ShardRouting> shardFilter = shardRouting -> shardRouting.primary()
100+
|| docRepIndexSet.contains(shardRouting.getIndexName());
87101
if (shardSet.isEmpty() == false) {
88102
shardFilter = shardFilter.and(shardRouting -> shardSet.contains(shardRouting.shardId().getId()));
89103
}
@@ -178,4 +192,8 @@ private ResumeIngestionRequest.ResetSettings getResetSettingsForShard(UpdateInge
178192
int targetShardId = indexShard.shardId().id();
179193
return Arrays.stream(resetSettings).filter(setting -> setting.getShard() == targetShardId).findFirst().orElse(null);
180194
}
195+
196+
private ReplicationType getReplicationType(IndexMetadata indexMetadata) {
197+
return IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings());
198+
}
181199
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,14 +610,18 @@ private void resetStreamPoller(StreamPoller.ResetState resetState, String resetV
610610
*/
611611
public ShardIngestionState getIngestionState() {
612612
IngestionShardPointer shardPointer = streamPoller.getBatchStartPointer();
613+
614+
// Node and routing details are set at the routing layer
613615
return new ShardIngestionState(
614616
engineConfig.getIndexSettings().getIndex().getName(),
615617
engineConfig.getShardId().getId(),
616618
streamPoller.getState().toString(),
617619
streamPoller.getErrorStrategy().getName(),
618620
streamPoller.isPaused(),
619621
streamPoller.isWriteBlockEnabled(),
620-
shardPointer != null ? shardPointer.toString() : ""
622+
shardPointer != null ? shardPointer.toString() : "",
623+
true,
624+
""
621625
);
622626
}
623627
}

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88

99
package org.opensearch.indices.pollingingest;
1010

11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.index.IndexSettings;
1113
import org.opensearch.index.IngestionConsumerFactory;
1214
import org.opensearch.index.engine.Engine;
1315
import org.opensearch.index.engine.EngineConfig;
1416
import org.opensearch.index.engine.EngineFactory;
1517
import org.opensearch.index.engine.IngestionEngine;
1618
import org.opensearch.index.engine.NRTReplicationEngine;
19+
import org.opensearch.indices.replication.common.ReplicationType;
1720

1821
import java.util.Objects;
1922

@@ -28,14 +31,26 @@ public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory)
2831
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
2932
}
3033

34+
/**
35+
* Document replication equivalent in pull-based ingestion is to ingest on both primary and replica nodes using the
36+
* IngestionEngine. Segment replication will use the NRTReplicationEngine on replicas.
37+
*/
3138
@Override
3239
public Engine newReadWriteEngine(EngineConfig config) {
33-
if (config.isReadOnlyReplica()) {
40+
boolean isDocRep = getReplicationType(config) == ReplicationType.DOCUMENT;
41+
42+
// NRTReplicationEngine is used for segment replication on replicas
43+
if (isDocRep == false && config.isReadOnlyReplica()) {
3444
return new NRTReplicationEngine(config);
3545
}
3646

3747
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory);
3848
ingestionEngine.start();
3949
return ingestionEngine;
4050
}
51+
52+
private ReplicationType getReplicationType(EngineConfig config) {
53+
IndexSettings indexSettings = config.getIndexSettings();
54+
return indexSettings.getValue(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING);
55+
}
4156
}

server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/GetIngestionStateResponseTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class GetIngestionStateResponseTests extends OpenSearchTestCase {
1919

2020
public void testSerialization() throws IOException {
2121
ShardIngestionState[] shardStates = new ShardIngestionState[] {
22-
new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false, ""),
23-
new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true, false, "") };
22+
new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false, "", true, "id"),
23+
new ShardIngestionState("index1", 1, "PAUSED", "BLOCK", true, false, "", true, "id") };
2424
GetIngestionStateResponse response = new GetIngestionStateResponse(shardStates, 2, 2, 0, null, Collections.emptyList());
2525

2626
try (BytesStreamOutput out = new BytesStreamOutput()) {

server/src/test/java/org/opensearch/action/admin/indices/streamingingestion/state/ShardIngestionStateTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
public class ShardIngestionStateTests extends OpenSearchTestCase {
2020

2121
public void testSerialization() throws IOException {
22-
ShardIngestionState state = new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false, "");
22+
ShardIngestionState state = new ShardIngestionState("index1", 0, "POLLING", "DROP", false, false, "", true, "id1");
2323

2424
try (BytesStreamOutput out = new BytesStreamOutput()) {
2525
state.writeTo(out);
@@ -31,12 +31,15 @@ public void testSerialization() throws IOException {
3131
assertEquals(state.pollerState(), deserializedState.pollerState());
3232
assertEquals(state.isPollerPaused(), deserializedState.isPollerPaused());
3333
assertEquals(state.isWriteBlockEnabled(), deserializedState.isWriteBlockEnabled());
34+
assertEquals(state.batchStartPointer(), deserializedState.batchStartPointer());
35+
assertEquals(state.isPrimary(), deserializedState.isPrimary());
36+
assertEquals(state.nodeName(), deserializedState.nodeName());
3437
}
3538
}
3639
}
3740

3841
public void testSerializationWithNullValues() throws IOException {
39-
ShardIngestionState state = new ShardIngestionState("index1", 0, null, null, false, false, "");
42+
ShardIngestionState state = new ShardIngestionState("index1", 0, null, null, false, false, "", true, "");
4043

4144
try (BytesStreamOutput out = new BytesStreamOutput()) {
4245
state.writeTo(out);
@@ -53,9 +56,9 @@ public void testSerializationWithNullValues() throws IOException {
5356

5457
public void testGroupShardStateByIndex() {
5558
ShardIngestionState[] states = new ShardIngestionState[] {
56-
new ShardIngestionState("index1", 0, "POLLING", "DROP", true, false, ""),
57-
new ShardIngestionState("index1", 1, "PAUSED", "DROP", false, false, ""),
58-
new ShardIngestionState("index2", 0, "POLLING", "DROP", true, false, "") };
59+
new ShardIngestionState("index1", 0, "POLLING", "DROP", true, false, "", true, "id"),
60+
new ShardIngestionState("index1", 1, "PAUSED", "DROP", false, false, "", true, "id"),
61+
new ShardIngestionState("index2", 0, "POLLING", "DROP", true, false, "", true, "id") };
5962

6063
Map<String, List<ShardIngestionState>> groupedStates = ShardIngestionState.groupShardStateByIndex(states);
6164

0 commit comments

Comments
 (0)