Skip to content

Commit 218fc82

Browse files
add listener for ingestion source params for dynamic consumer reinitialization
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent d5c5e4b commit 218fc82

File tree

7 files changed

+492
-20
lines changed

7 files changed

+492
-20
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 207 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -846,53 +846,249 @@ public void testDynamicUpdateKafkaParams() throws Exception {
846846
// Step 2: Create index with pointer.init.reset to offset 1 and auto.offset.reset=latest
847847
internalCluster().startClusterManagerOnlyNode();
848848
final String nodeA = internalCluster().startDataOnlyNode();
849+
final String nodeB = internalCluster().startDataOnlyNode();
849850
createIndex(
850851
indexName,
851852
Settings.builder()
852853
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
853-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
854+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
854855
.put("ingestion_source.type", "kafka")
855856
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
856857
.put("ingestion_source.pointer.init.reset.value", "1")
857858
.put("ingestion_source.param.topic", topicName)
858859
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
859860
.put("ingestion_source.param.auto.offset.reset", "latest")
861+
.put("ingestion_source.param.max.poll.records", "100")
860862
.put("ingestion_source.all_active", true)
861863
.build(),
862864
mapping
863865
);
864866

865867
ensureGreen(indexName);
866868

867-
// Step 3: Wait for 9 messages to be visible
868-
waitForSearchableDocs(9, Arrays.asList(nodeA));
869+
// Step 3: Wait for 9 messages to be visible on both nodes
870+
waitForSearchableDocs(9, Arrays.asList(nodeA, nodeB));
869871

870-
// Step 4: Update the index settings to set auto.offset.reset to earliest
872+
// Step 4: Update the index settings to set auto.offset.reset to earliest and max.poll.records to 200
871873
client().admin()
872874
.indices()
873875
.prepareUpdateSettings(indexName)
874-
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "earliest"))
876+
.setSettings(
877+
Settings.builder()
878+
.put("ingestion_source.param.auto.offset.reset", "earliest")
879+
.put("ingestion_source.param.max.poll.records", "200")
880+
)
875881
.get();
876882

877883
// Verify the setting was updated
878884
String autoOffsetReset = getSettings(indexName, "index.ingestion_source.param.auto.offset.reset");
879885
assertEquals("earliest", autoOffsetReset);
880886

881887
// Step 5: Pause and resume ingestion, setting offset to 100 (out-of-range, hence expect auto.offset.reset to be used)
882-
pauseIngestionAndWait(indexName, 1);
883-
resumeIngestionWithResetAndWait(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "100", 1);
888+
pauseIngestionAndWait(indexName, 2);
889+
resumeIngestionWithResetAndWait(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "100", 2);
884890

885-
// Step 6: Wait for version conflict count to be 9 and total messages processed to be 10.
891+
// Step 6: Wait for version conflict count to be 9 and total messages processed to be 10 on both shards.
886892
// Since offset 100 doesn't exist, it will fall back to earliest (offset 0).
887893
waitForState(() -> {
894+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
895+
PollingIngestStats primaryStats = shardTypeToStats.get("primary");
896+
PollingIngestStats replicaStats = shardTypeToStats.get("replica");
897+
898+
return primaryStats != null
899+
&& primaryStats.getMessageProcessorStats().totalProcessedCount() == 10L
900+
&& primaryStats.getMessageProcessorStats().totalVersionConflictsCount() == 9L
901+
&& replicaStats != null
902+
&& replicaStats.getMessageProcessorStats().totalProcessedCount() == 10L
903+
&& replicaStats.getMessageProcessorStats().totalVersionConflictsCount() == 9L;
904+
});
905+
906+
waitForSearchableDocs(10, Arrays.asList(nodeA, nodeB));
907+
908+
// Step 7: Pause ingestion
909+
pauseIngestionAndWait(indexName, 2);
910+
911+
// Step 8: Update auto.offset.reset back to "latest". This is surrounded by pause/resume to indirectly infer
912+
// the config change has been applied.
913+
client().admin()
914+
.indices()
915+
.prepareUpdateSettings(indexName)
916+
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "latest"))
917+
.get();
918+
919+
// Verify the setting was updated
920+
autoOffsetReset = getSettings(indexName, "index.ingestion_source.param.auto.offset.reset");
921+
assertEquals("latest", autoOffsetReset);
922+
923+
// Step 9: Verify processed count is still 10 on both shards
924+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
925+
assertEquals(10L, shardTypeToStats.get("primary").getMessageProcessorStats().totalProcessedCount());
926+
assertEquals(9L, shardTypeToStats.get("primary").getMessageProcessorStats().totalVersionConflictsCount());
927+
assertEquals(10L, shardTypeToStats.get("replica").getMessageProcessorStats().totalProcessedCount());
928+
assertEquals(9L, shardTypeToStats.get("replica").getMessageProcessorStats().totalVersionConflictsCount());
929+
930+
// Step 10: Resume ingestion. This does not recreate the poller as consumer is not reset.
931+
resumeIngestionAndWait(indexName, 2);
932+
933+
// Step 11: Publish 10 more messages
934+
for (int i = 10; i < 20; i++) {
935+
produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index");
936+
}
937+
938+
// Step 12: Wait for processed count to be 21 and version conflict to be 10 on both shards. On updating the Kafka settings, the
939+
// last message (offset=9) is reprocessed resulting in additional processed message and version conflict.
940+
waitForState(() -> {
941+
Map<String, PollingIngestStats> updatedStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
942+
PollingIngestStats primaryStats = updatedStats.get("primary");
943+
PollingIngestStats replicaStats = updatedStats.get("replica");
944+
945+
return primaryStats != null
946+
&& primaryStats.getMessageProcessorStats().totalProcessedCount() == 21L
947+
&& primaryStats.getMessageProcessorStats().totalVersionConflictsCount() == 10L
948+
&& replicaStats != null
949+
&& replicaStats.getMessageProcessorStats().totalProcessedCount() == 21L
950+
&& replicaStats.getMessageProcessorStats().totalVersionConflictsCount() == 10L;
951+
});
952+
953+
waitForSearchableDocs(20, Arrays.asList(nodeA, nodeB));
954+
}
955+
956+
public void testConsumerInitializationFailureAndRecovery() throws Exception {
957+
// Step 1: Create index with auto.offset.reset=none and pointer.init.reset to offset 100 (invalid offset)
958+
// This should cause consumer initialization to fail
959+
internalCluster().startClusterManagerOnlyNode();
960+
final String nodeA = internalCluster().startDataOnlyNode();
961+
createIndex(
962+
indexName,
963+
Settings.builder()
964+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
965+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
966+
.put("ingestion_source.type", "kafka")
967+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
968+
.put("ingestion_source.pointer.init.reset.value", "100")
969+
.put("ingestion_source.param.topic", topicName)
970+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
971+
.put("ingestion_source.param.auto.offset.reset", "none")
972+
.put("ingestion_source.all_active", true)
973+
.build(),
974+
mapping
975+
);
976+
977+
ensureGreen(indexName);
978+
979+
// Step 2: Wait for consumer error and paused status
980+
waitForState(() -> {
981+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
888982
PollingIngestStats stats = client(nodeA).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
889983
.getPollingIngestStats();
890984

891-
return stats != null
892-
&& stats.getMessageProcessorStats().totalProcessedCount() == 10L
893-
&& stats.getMessageProcessorStats().totalVersionConflictsCount() == 9L;
985+
return ingestionState.getShardStates().length == 1
986+
&& ingestionState.getShardStates()[0].isPollerPaused()
987+
&& stats != null
988+
&& stats.getConsumerStats().totalConsumerErrorCount() >= 1L;
894989
});
895990

991+
// Step 3: Publish 10 messages
992+
for (int i = 0; i < 10; i++) {
993+
produceData(Integer.toString(i), "name" + i, "25");
994+
}
995+
996+
// Step 4: Update auto.offset.reset to earliest
997+
client().admin()
998+
.indices()
999+
.prepareUpdateSettings(indexName)
1000+
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "earliest"))
1001+
.get();
1002+
1003+
// Verify the setting was updated
1004+
String autoOffsetReset = getSettings(indexName, "index.ingestion_source.param.auto.offset.reset");
1005+
assertEquals("earliest", autoOffsetReset);
1006+
1007+
// Step 5: Resume ingestion and wait for 10 searchable docs
1008+
resumeIngestionAndWait(indexName, 1);
1009+
8961010
waitForSearchableDocs(10, Arrays.asList(nodeA));
1011+
1012+
// Verify all 10 messages were processed
1013+
PollingIngestStats stats = client(nodeA).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1014+
.getPollingIngestStats();
1015+
assertEquals(10L, stats.getMessageProcessorStats().totalProcessedCount());
1016+
1017+
// Step 6: Update auto.offset.reset to earliest again. Consumer must not be reinitialized again as no config change.
1018+
client().admin()
1019+
.indices()
1020+
.prepareUpdateSettings(indexName)
1021+
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "earliest"))
1022+
.get();
1023+
1024+
// Step 7: Publish 1 more message
1025+
produceData("10", "name10", "30");
1026+
1027+
// Step 8: Wait for 11 searchable docs
1028+
waitForSearchableDocs(11, Arrays.asList(nodeA));
1029+
1030+
// Step 9: Verify total processed message count is 11
1031+
PollingIngestStats finalStats = client(nodeA).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1032+
.getPollingIngestStats();
1033+
assertEquals(11L, finalStats.getMessageProcessorStats().totalProcessedCount());
1034+
}
1035+
1036+
public void testDynamicConfigUpdateOnNoMessages() throws Exception {
1037+
// Step 1: Create index with pointer.init.reset to offset 100 and auto.offset.reset to earliest
1038+
// Since offset 100 doesn't exist, it will fall back to earliest (offset 0)
1039+
internalCluster().startClusterManagerOnlyNode();
1040+
final String nodeA = internalCluster().startDataOnlyNode();
1041+
createIndex(
1042+
indexName,
1043+
Settings.builder()
1044+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1045+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1046+
.put("ingestion_source.type", "kafka")
1047+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
1048+
.put("ingestion_source.pointer.init.reset.value", "100")
1049+
.put("ingestion_source.param.topic", topicName)
1050+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
1051+
.put("ingestion_source.param.auto.offset.reset", "earliest")
1052+
.put("ingestion_source.all_active", true)
1053+
.build(),
1054+
mapping
1055+
);
1056+
1057+
ensureGreen(indexName);
1058+
1059+
// Step 2: Wait for poller state to be polling
1060+
waitForState(() -> {
1061+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
1062+
return ingestionState.getShardStates().length == 1
1063+
&& ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling");
1064+
});
1065+
1066+
// Step 3: Pause ingestion
1067+
pauseIngestionAndWait(indexName, 1);
1068+
1069+
// Step 4: Update auto.offset.reset to latest. This is surrounded by pause/resume to indirectly infer the config change has been
1070+
// applied.
1071+
client().admin()
1072+
.indices()
1073+
.prepareUpdateSettings(indexName)
1074+
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "latest"))
1075+
.get();
1076+
1077+
// Verify the setting was updated
1078+
String autoOffsetReset = getSettings(indexName, "index.ingestion_source.param.auto.offset.reset");
1079+
assertEquals("latest", autoOffsetReset);
1080+
1081+
// Step 5: Resume ingestion
1082+
resumeIngestionAndWait(indexName, 1);
1083+
1084+
// Step 6: Publish 1 message and wait for it to be searchable
1085+
produceData("1", "name1", "25");
1086+
1087+
waitForSearchableDocs(1, Arrays.asList(nodeA));
1088+
1089+
// Verify 1 message was processed
1090+
PollingIngestStats stats = client(nodeA).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
1091+
.getPollingIngestStats();
1092+
assertEquals(1L, stats.getMessageProcessorStats().totalProcessedCount());
8971093
}
8981094
}

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,65 @@ public void testShardInitializationUsingUnknownTopic() throws Exception {
157157
ensureGreen(indexName);
158158
}
159159

160+
public void testConsumerSettingUpdateWithMultipleProcessorThreads() throws Exception {
161+
// Create index with num_processor_threads = 5, pointer.init.reset to offset 100, and auto.offset.reset = none
162+
// This should cause consumer initialization to fail
163+
createIndexWithMappingSource(
164+
indexName,
165+
Settings.builder()
166+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
167+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
168+
.put("ingestion_source.type", "kafka")
169+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
170+
.put("ingestion_source.pointer.init.reset.value", "100")
171+
.put("ingestion_source.param.topic", topicName)
172+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
173+
.put("ingestion_source.param.auto.offset.reset", "none")
174+
.put("ingestion_source.num_processor_threads", 5)
175+
.put("index.replication.type", "SEGMENT")
176+
.build(),
177+
mappings
178+
);
179+
180+
ensureGreen(indexName);
181+
182+
// Wait for paused status due to consumer error
183+
waitForState(() -> {
184+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
185+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
186+
.getPollingIngestStats();
187+
188+
return ingestionState.getShardStates().length == 1
189+
&& ingestionState.getShardStates()[0].isPollerPaused()
190+
&& stats != null
191+
&& stats.getConsumerStats().totalConsumerErrorCount() >= 1L;
192+
});
193+
194+
// Publish 5 messages
195+
for (int i = 0; i < 5; i++) {
196+
produceData(
197+
"{\"_id\":\"" + i + "\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"name" + i + "\", \"age\": 25}}"
198+
);
199+
}
200+
201+
// Update auto.offset.reset to earliest
202+
client().admin()
203+
.indices()
204+
.prepareUpdateSettings(indexName)
205+
.setSettings(Settings.builder().put("ingestion_source.param.auto.offset.reset", "earliest"))
206+
.get();
207+
208+
// Resume ingestion
209+
client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
210+
211+
// Wait for 5 searchable docs
212+
waitForState(() -> {
213+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
214+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
215+
return response.getHits().getTotalHits().value() == 5;
216+
});
217+
}
218+
160219
private void setupKafka() {
161220
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
162221
// disable topic auto creation

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ private void registerStreamPollerListener() {
165165
.indexBlocked(ClusterBlockLevel.WRITE, engineConfig.getIndexSettings().getIndex().getName());
166166
streamPoller.setWriteBlockEnabled(isWriteBlockEnabled);
167167
}
168+
169+
// Register listener for dynamic ingestion source params updates
170+
engineConfig.getIndexSettings()
171+
.getScopedSettings()
172+
.addAffixMapUpdateConsumer(IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING, this::updateIngestionSourceParams, (x, y) -> {});
168173
}
169174

170175
private void unregisterStreamPollerListener() {
@@ -519,6 +524,37 @@ private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy er
519524
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
520525
}
521526

527+
/**
528+
* Handler for updating ingestion source params on dynamic index settings update.
529+
* This will reinitialize the streamPoller's consumer with new configurations.
530+
*/
531+
private void updateIngestionSourceParams(Map<String, Object> updatedParams) {
532+
if (streamPoller.getConsumer() == null) {
533+
logger.debug("Consumer not yet initialized, skipping consumer reinitialization for ingestion source params update");
534+
return;
535+
}
536+
537+
try {
538+
logger.info("Ingestion source params updated, reinitializing consumer");
539+
540+
// Get current ingestion source with updated params from index metadata
541+
IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata();
542+
assert indexMetadata != null;
543+
IngestionSource updatedIngestionSource = Objects.requireNonNull(indexMetadata.getIngestionSource());
544+
545+
// Initialize the factory with updated params
546+
ingestionConsumerFactory.initialize(updatedIngestionSource);
547+
548+
// Request consumer reinitialization in the poller
549+
streamPoller.requestConsumerReinitialization();
550+
551+
logger.info("Successfully processed ingestion source params update");
552+
} catch (Exception e) {
553+
logger.error("Failed to update ingestion source params", e);
554+
throw new OpenSearchException("Failed to update ingestion source params", e);
555+
}
556+
}
557+
522558
/**
523559
* Validates document version for pull-based ingestion. Only external versioning is supported.
524560
*/

0 commit comments

Comments
 (0)