1313import org .opensearch .cluster .ClusterChangedEvent ;
1414import org .opensearch .cluster .ClusterState ;
1515import org .opensearch .cluster .block .ClusterBlockLevel ;
16+ import org .opensearch .cluster .metadata .IngestionSource ;
1617import org .opensearch .common .Nullable ;
1718import org .opensearch .common .metrics .CounterMetric ;
1819import org .opensearch .index .IndexSettings ;
@@ -66,7 +67,6 @@ public class DefaultStreamPoller implements StreamPoller {
6667
6768 // start of the batch, inclusive
6869 private IngestionShardPointer initialBatchStartPointer ;
69- private boolean includeBatchStartPointer = false ;
7070
7171 private ResetState resetState ;
7272 private final String resetValue ;
@@ -86,6 +86,9 @@ public class DefaultStreamPoller implements StreamPoller {
8686
8787 private PartitionedBlockingQueueContainer blockingQueueContainer ;
8888
89+ // Force the consumer to start reading from this pointer. This is used in case of failures, or during reinitialization.
90+ private IngestionShardPointer forcedShardPointer = null ;
91+
8992 private DefaultStreamPoller (
9093 IngestionShardPointer startPointer ,
9194 IngestionConsumerFactory consumerFactory ,
@@ -173,8 +176,6 @@ public void start() {
173176 }
174177
175178 started = true ;
176- // when we start, we need to include the batch start pointer in the read for the first read
177- includeBatchStartPointer = true ;
178179 consumerThread .submit (this ::startPoll );
179180 blockingQueueContainer .startProcessorThreads ();
180181 }
@@ -191,33 +192,20 @@ protected void startPoll() {
191192 }
192193 logger .info ("Starting poller for shard {}" , shardId );
193194
194- // Force the consumer to start reading from this pointer. This is used in case of failures, or during reinitialization.
195- IngestionShardPointer forcedShardPointer = null ;
196195 while (true ) {
197196 try {
198197 if (closed ) {
199198 state = State .CLOSED ;
199+ closeConsumer ();
200200 break ;
201201 }
202202
203- // Initialize consumer if not already initialized or if reinitialization is requested
203+ // Initialize/ reinitialization consumer
204204 if (this .consumer == null || reinitializeConsumer ) {
205- handleConsumerInitialization (CONSUMER_INIT_RETRY_INTERVAL_MS );
206-
207- // If consumer reinitialization is requested, update the new consumer's start offset to the latest batch start pointer.
208- // First clear the blocking queue partitions, and then retrieve the latest batch start pointer. This
209- // will ensure we resume from the earliest offset possible without too much duplicate processing.
210- if (reinitializeConsumer && includeBatchStartPointer == false ) {
211- blockingQueueContainer .clearAllQueues ();
212- forcedShardPointer = getBatchStartPointer ();
213- }
214- reinitializeConsumer = false ;
205+ handleConsumerInitialization ();
215206 continue ;
216207 }
217208
218- // reset the consumer offset
219- handleResetState ();
220-
221209 // Update lag periodically. Lag is updated even if the poller is paused.
222210 updatePointerBasedLagIfNeeded ();
223211
@@ -234,10 +222,8 @@ protected void startPoll() {
234222 state = State .POLLING ;
235223 List <IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message >> results ;
236224
237- if (includeBatchStartPointer ) {
238- results = consumer .readNext (initialBatchStartPointer , true , maxPollSize , pollTimeout );
239- includeBatchStartPointer = false ;
240- } else if (forcedShardPointer != null ) {
225+ // Force the consumer to start from forcedShardPointer if available
226+ if (forcedShardPointer != null ) {
241227 results = consumer .readNext (forcedShardPointer , true , maxPollSize , pollTimeout );
242228 forcedShardPointer = null ;
243229 } else {
@@ -335,7 +321,6 @@ public void close() {
335321 closed = true ;
336322 if (!started ) {
337323 logger .info ("consumer thread not started" );
338- closeConsumer ();
339324 return ;
340325 }
341326 long startTime = System .currentTimeMillis (); // Record the start time
@@ -354,7 +339,6 @@ public void close() {
354339 }
355340 consumerThread .shutdown ();
356341 blockingQueueContainer .close ();
357- closeConsumer ();
358342 logger .info ("closed the poller of shard {}" , shardId );
359343 }
360344
@@ -478,15 +462,18 @@ public IngestionShardConsumer getConsumer() {
478462
479463 /**
480464 * Mark the poller's consumer for reinitialization. A new consumer will be initialized and start consuming from the
481- * latest batchStartPointer.
465+ * latest batchStartPointer. This method also reinitializes the consumer factory with the updated ingestion source.
466+ * @param updatedIngestionSource the updated ingestion source with new configuration parameters
482467 */
483468 @ Override
484- public void requestConsumerReinitialization () {
469+ public synchronized void requestConsumerReinitialization (IngestionSource updatedIngestionSource ) {
485470 if (closed ) {
486471 logger .warn ("Cannot reinitialize consumer for closed poller of shard {}" , shardId );
487472 return ;
488473 }
489474
475+ // Reinitialize the consumer factory with updated configuration
476+ consumerFactory .initialize (updatedIngestionSource );
490477 logger .info ("Configuration parameters updated for index {} shard {}, requesting consumer reinitialization" , indexName , shardId );
491478 reinitializeConsumer = true ;
492479 }
@@ -508,63 +495,75 @@ public void clusterChanged(ClusterChangedEvent event) {
508495 }
509496
510497 /**
511- * Handles the reset state logic, updating the initialBatchStartPointer based on the reset state.
498+ * Handles the reset state logic.
499+ * Returns the resulting IngestionShardPointer for reset or null if no reset required.
512500 */
513- private void handleResetState () {
514- if (resetState != ResetState .NONE ) {
501+ private IngestionShardPointer getResetShardPointer () {
502+ IngestionShardPointer resetPointer = null ;
503+ if (resetState != ResetState .NONE && consumer != null ) {
515504 switch (resetState ) {
516505 case EARLIEST :
517- initialBatchStartPointer = consumer .earliestPointer ();
518- logger .info ("Resetting pointer by seeking to earliest pointer {}" , initialBatchStartPointer .asString ());
506+ resetPointer = consumer .earliestPointer ();
507+ logger .info ("Resetting pointer by seeking to earliest pointer {}" , resetPointer .asString ());
519508 break ;
520509 case LATEST :
521- initialBatchStartPointer = consumer .latestPointer ();
522- logger .info ("Resetting pointer by seeking to latest pointer {}" , initialBatchStartPointer .asString ());
510+ resetPointer = consumer .latestPointer ();
511+ logger .info ("Resetting pointer by seeking to latest pointer {}" , resetPointer .asString ());
523512 break ;
524513 case RESET_BY_OFFSET :
525- initialBatchStartPointer = consumer .pointerFromOffset (resetValue );
526- logger .info ("Resetting pointer by seeking to pointer {}" , initialBatchStartPointer .asString ());
514+ resetPointer = consumer .pointerFromOffset (resetValue );
515+ logger .info ("Resetting pointer by seeking to pointer {}" , resetPointer .asString ());
527516 break ;
528517 case RESET_BY_TIMESTAMP :
529- initialBatchStartPointer = consumer .pointerFromTimestampMillis (Long .parseLong (resetValue ));
518+ resetPointer = consumer .pointerFromTimestampMillis (Long .parseLong (resetValue ));
530519 logger .info (
531520 "Resetting pointer by seeking to timestamp {}, corresponding pointer {}" ,
532521 resetValue ,
533- initialBatchStartPointer .asString ()
522+ resetPointer .asString ()
534523 );
535524 break ;
536525 }
537526 resetState = ResetState .NONE ;
538527 }
528+
529+ return resetPointer ;
539530 }
540531
541532 /**
542- * Handles consumer initialization and reinitialization logic.
543- * If reinitializing, closes the existing consumer before creating a new one.
544- *
545- * @param sleepDurationOnError duration to sleep if initialization fails
533+ * Handles consumer initialization and reinitialization logic. Closes existing consumer if available and clears the
534+ * blocking queues before initializing a new consumer. Also forces the consumer to start reading from the initial
535+ * batchStartPointer if first time initialization, or from the latest available batchStartPointer on reinitialization.
546536 */
547- private void handleConsumerInitialization (int sleepDurationOnError ) {
548- if (reinitializeConsumer ) {
549- logger .info ("Reinitializing consumer for index {} shard {} due to configuration changes" , indexName , shardId );
550- closeConsumer ();
537+ private void handleConsumerInitialization () {
538+ closeConsumer ();
539+ blockingQueueContainer .clearAllQueues ();
540+ initializeConsumer ();
541+
542+ // Handle consumer offset reset the first time an index is created. The reset offset takes precedence if available.
543+ IngestionShardPointer resetShardPointer = getResetShardPointer ();
544+ if (resetShardPointer != null ) {
545+ initialBatchStartPointer = resetShardPointer ;
551546 }
552- initializeConsumer (sleepDurationOnError );
547+
548+ // Force the consumer to start from the batchStartPointer. This will be the initialBatchStartPointer for first
549+ // time initialization, or the latest batchStartPointer based on processed messages.
550+ forcedShardPointer = getBatchStartPointer ();
553551 }
554552
555553 /**
556554 * Initializes the consumer using the provided consumerFactory. If an error is encountered during initialization,
557555 * the poller thread sleeps for the provided duration before retrying/proceeding with the polling loop.
558556 */
559- private void initializeConsumer (int sleepDurationOnError ) {
557+ private synchronized void initializeConsumer () {
560558 try {
559+ reinitializeConsumer = false ;
561560 this .consumer = consumerFactory .createShardConsumer (consumerClientId , shardId );
562561 logger .info ("Successfully initialized consumer for shard {}" , shardId );
563562 } catch (Exception e ) {
564563 logger .warn ("Failed to create consumer for shard {}: {}" , shardId , e .getMessage ());
565564 totalConsumerErrorCount .inc ();
566565 try {
567- Thread .sleep (sleepDurationOnError );
566+ Thread .sleep (CONSUMER_INIT_RETRY_INTERVAL_MS );
568567 } catch (InterruptedException ie ) {
569568 Thread .currentThread ().interrupt ();
570569 }
0 commit comments