Skip to content

Commit af82f9b

Browse files
Move consumer initialization to the poller to prevent engine failure
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent d52d404 commit af82f9b

File tree

6 files changed

+389
-81
lines changed

6 files changed

+389
-81
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838
- Add support for Combined Fields query ([#18724](https://github.com/opensearch-project/OpenSearch/pull/18724))
3939
- Make GRPC transport extensible to allow plugins to register and expose their own GRPC services ([#18516](https://github.com/opensearch-project/OpenSearch/pull/18516))
4040
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
41+
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4142

4243
### Changed
4344
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,59 @@ public void testIndexRelocation() throws Exception {
749749
waitForSearchableDocs(4, List.of(nodeB));
750750
}
751751

752+
public void testKafkaConnectionLost() throws Exception {
753+
// Step 1: Create 2 nodes
754+
internalCluster().startClusterManagerOnlyNode();
755+
final String nodeA = internalCluster().startDataOnlyNode();
756+
final String nodeB = internalCluster().startDataOnlyNode();
757+
758+
// Step 2: Create index
759+
createIndex(
760+
indexName,
761+
Settings.builder()
762+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
763+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
764+
.put("ingestion_source.type", "kafka")
765+
.put("ingestion_source.param.topic", topicName)
766+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
767+
.put("ingestion_source.param.auto.offset.reset", "earliest")
768+
.put("index.routing.allocation.require._name", nodeA)
769+
.build(),
770+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
771+
);
772+
ensureGreen(indexName);
773+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
774+
775+
// Step 3: Write documents and verify
776+
produceData("1", "name1", "24");
777+
produceData("2", "name2", "20");
778+
refresh(indexName);
779+
waitForSearchableDocs(2, List.of(nodeA));
780+
flush(indexName);
781+
782+
// Step 4: Stop kafka and relocate index to nodeB
783+
kafka.stop();
784+
assertAcked(
785+
client().admin()
786+
.indices()
787+
.prepareUpdateSettings(indexName)
788+
.setSettings(Settings.builder().put("index.routing.allocation.require._name", nodeB))
789+
.get()
790+
);
791+
792+
// Step 5: Wait for relocation to complete
793+
waitForState(() -> nodeB.equals(primaryNodeName(indexName)));
794+
795+
// Step 6: Ensure index is searchable on nodeB even though kafka is down
796+
ensureGreen(indexName);
797+
waitForSearchableDocs(2, List.of(nodeB));
798+
waitForState(() -> {
799+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
800+
.getPollingIngestStats();
801+
return stats.getConsumerStats().totalConsumerErrorCount() > 0;
802+
});
803+
}
804+
752805
private void verifyRemoteStoreEnabled(String node) {
753806
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
754807
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ public void testPauseAndResumeAPIs() throws Exception {
133133
});
134134
}
135135

136+
// This test validates shard initialization does not fail due to kafka connection errors.
137+
public void testShardInitializationUsingUnknownTopic() throws Exception {
138+
createIndexWithMappingSource(
139+
indexName,
140+
Settings.builder()
141+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
142+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
143+
.put("ingestion_source.type", "kafka")
144+
.put("ingestion_source.pointer.init.reset", "earliest")
145+
.put("ingestion_source.param.topic", "unknownTopic")
146+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
147+
.put("index.replication.type", "SEGMENT")
148+
.build(),
149+
mappings
150+
);
151+
ensureGreen(indexName);
152+
}
153+
136154
private void setupKafka() {
137155
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
138156
// disable topic auto creation

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.opensearch.common.util.concurrent.ReleasableLock;
2626
import org.opensearch.core.common.Strings;
2727
import org.opensearch.index.IngestionConsumerFactory;
28-
import org.opensearch.index.IngestionShardConsumer;
2928
import org.opensearch.index.IngestionShardPointer;
3029
import org.opensearch.index.VersionType;
3130
import org.opensearch.index.mapper.DocumentMapperForType;
@@ -98,11 +97,7 @@ private void initializeStreamPoller(
9897
+ engineConfig.getIndexSettings().getIndex().getName()
9998
+ "-"
10099
+ engineConfig.getShardId().getId();
101-
IngestionShardConsumer ingestionShardConsumer = this.ingestionConsumerFactory.createShardConsumer(
102-
clientId,
103-
engineConfig.getShardId().getId()
104-
);
105-
logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId());
100+
106101
Map<String, String> commitData = commitDataAsMap(indexWriter);
107102
StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType();
108103
String resetValue = ingestionSource.getPointerInitReset().getValue();
@@ -146,20 +141,25 @@ private void initializeStreamPoller(
146141
StreamPoller.State initialPollerState = indexMetadata.getIngestionStatus().isPaused()
147142
? StreamPoller.State.PAUSED
148143
: StreamPoller.State.NONE;
149-
streamPoller = new DefaultStreamPoller(
144+
145+
// initialize the stream poller
146+
DefaultStreamPoller.Builder streamPollerBuilder = new DefaultStreamPoller.Builder(
150147
startPointer,
151148
persistedPointers,
152-
ingestionShardConsumer,
153-
this,
154-
resetState,
155-
resetValue,
156-
ingestionErrorStrategy,
157-
initialPollerState,
158-
ingestionSource.getMaxPollSize(),
159-
ingestionSource.getPollTimeout(),
160-
ingestionSource.getNumProcessorThreads(),
161-
ingestionSource.getBlockingQueueSize()
149+
ingestionConsumerFactory,
150+
clientId,
151+
engineConfig.getShardId().getId(),
152+
this
162153
);
154+
streamPoller = streamPollerBuilder.resetState(resetState)
155+
.resetValue(resetValue)
156+
.errorStrategy(ingestionErrorStrategy)
157+
.initialState(initialPollerState)
158+
.maxPollSize(ingestionSource.getMaxPollSize())
159+
.pollTimeout(ingestionSource.getPollTimeout())
160+
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
161+
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
162+
.build();
163163
registerStreamPollerListener();
164164

165165
// start the polling loop
@@ -575,6 +575,10 @@ private void resetStreamPoller(StreamPoller.ResetState resetState, String resetV
575575
throw new IllegalStateException("Cannot reset consumer when poller is not paused");
576576
}
577577

578+
if (streamPoller.getConsumer() == null) {
579+
throw new OpenSearchException("Consumer is not yet initialized");
580+
}
581+
578582
try {
579583
// refresh is needed for persisted pointers to be visible
580584
refresh("reset poller", SearcherScope.INTERNAL, true);

0 commit comments

Comments
 (0)