Skip to content

Commit b1d1e33

Browse files
[Pull-based Ingestion] Add time based periodic flush support (#19878)
--------- Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 814a16f commit b1d1e33

File tree

11 files changed

+567
-8
lines changed

11 files changed

+567
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))
2222
- Implement GRPC Search params `Highlight`and `Sort` ([#19868](https://github.com/opensearch-project/OpenSearch/pull/19868))
2323
- Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854))
24+
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))
2425

2526
### Changed
2627
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,4 +653,131 @@ private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
653653
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
654654
return valid;
655655
}
656+
657+
public void testAllActiveIngestionBatchStartPointerOnReplicaPromotion() throws Exception {
658+
// Step 1: Publish 10 messages
659+
for (int i = 1; i <= 10; i++) {
660+
produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index");
661+
}
662+
663+
// Step 2: Start nodes
664+
internalCluster().startClusterManagerOnlyNode();
665+
final String nodeA = internalCluster().startDataOnlyNode();
666+
667+
// Step 3: Create all-active index
668+
createIndex(
669+
indexName,
670+
Settings.builder()
671+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
672+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
673+
.put("ingestion_source.type", "kafka")
674+
.put("ingestion_source.pointer.init.reset", "earliest")
675+
.put("ingestion_source.param.topic", topicName)
676+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
677+
.put("ingestion_source.all_active", true)
678+
.build(),
679+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
680+
);
681+
682+
ensureGreen(indexName);
683+
684+
// Step 4: Wait for 10 messages to be searchable on nodeA
685+
waitForSearchableDocs(10, Arrays.asList(nodeA));
686+
687+
// Step 5: Flush to persist data
688+
flush(indexName);
689+
690+
// Step 6: Add second node
691+
final String nodeB = internalCluster().startDataOnlyNode();
692+
693+
// Step 7: Relocate shard from nodeA to nodeB
694+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(indexName, 0, nodeA, nodeB)).get();
695+
ensureGreen(indexName);
696+
assertTrue(nodeB.equals(primaryNodeName(indexName)));
697+
698+
// Step 8: Publish 1 new message
699+
produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index");
700+
701+
// Step 9: Wait for 11 messages to be visible on nodeB
702+
waitForSearchableDocs(11, Arrays.asList(nodeB));
703+
704+
// Step 10: Flush to persist data
705+
flush(indexName);
706+
707+
// Step 11: Validate processed messages and version conflict count on nodeB
708+
PollingIngestStats nodeBStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
709+
.getPollingIngestStats();
710+
assertNotNull(nodeBStats);
711+
assertEquals(2L, nodeBStats.getMessageProcessorStats().totalProcessedCount());
712+
assertEquals(1L, nodeBStats.getMessageProcessorStats().totalVersionConflictsCount());
713+
714+
// Step 12: Add third node
715+
final String nodeC = internalCluster().startDataOnlyNode();
716+
717+
// Step 13: Bring down nodeA so the new replica will be allocated to nodeC
718+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
719+
720+
// Step 14: Add a replica (will be allocated to nodeC since only nodeB and nodeC are available)
721+
client().admin()
722+
.indices()
723+
.prepareUpdateSettings(indexName)
724+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
725+
.get();
726+
ensureGreen(indexName);
727+
728+
// Step 15: Wait for 11 messages to be searchable on nodeC (replica)
729+
waitForSearchableDocs(11, Arrays.asList(nodeC));
730+
731+
// Step 16: Bring down nodeB (primary) and wait for nodeC to become primary
732+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB));
733+
ensureYellowAndNoInitializingShards(indexName);
734+
assertTrue(nodeC.equals(primaryNodeName(indexName)));
735+
736+
// Step 17: Publish 1 more message
737+
produceDataWithExternalVersion("12", 1, "name12", "25", defaultMessageTimestamp, "index");
738+
739+
// Step 18: Wait for 12 messages to be visible on nodeC
740+
waitForSearchableDocs(12, Arrays.asList(nodeC));
741+
742+
// Step 19: Validate processed messages and version conflict count on nodeC
743+
PollingIngestStats nodeCStats = client(nodeC).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
744+
.getPollingIngestStats();
745+
assertNotNull(nodeCStats);
746+
747+
assertEquals(2L, nodeCStats.getMessageProcessorStats().totalProcessedCount());
748+
assertEquals(1L, nodeCStats.getMessageProcessorStats().totalVersionConflictsCount());
749+
}
750+
751+
public void testAllActiveIngestionPeriodicFlush() throws Exception {
752+
// Publish 10 messages
753+
for (int i = 1; i <= 10; i++) {
754+
produceData(String.valueOf(i), "name" + i, "25");
755+
}
756+
757+
// Start nodes
758+
internalCluster().startClusterManagerOnlyNode();
759+
final String nodeA = internalCluster().startDataOnlyNode();
760+
761+
// Create all-active index with 5 second periodic flush interval
762+
createIndex(
763+
indexName,
764+
Settings.builder()
765+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
766+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
767+
.put("ingestion_source.type", "kafka")
768+
.put("ingestion_source.pointer.init.reset", "earliest")
769+
.put("ingestion_source.param.topic", topicName)
770+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
771+
.put("ingestion_source.all_active", true)
772+
.put("index.periodic_flush_interval", "5s")
773+
.build(),
774+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
775+
);
776+
777+
ensureGreen(indexName);
778+
779+
waitForSearchableDocs(10, Arrays.asList(nodeA));
780+
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);
781+
782+
}
656783
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,17 @@ protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) {
222222
.setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled))
223223
.get();
224224
}
225+
226+
/**
227+
* Gets the periodic flush count for the specified index from the specified node.
228+
*
229+
* @param nodeName the name of the node to query
230+
* @param indexName the name of the index
231+
* @return the periodic flush count
232+
*/
233+
protected long getPeriodicFlushCount(String nodeName, String indexName) {
234+
return client(nodeName).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0].getStats()
235+
.getFlush()
236+
.getPeriodic();
237+
}
225238
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,4 +813,100 @@ private void verifyRemoteStoreEnabled(String node) {
813813
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
814814
assertEquals("Remote store should be enabled", "true", remoteStoreEnabled);
815815
}
816+
817+
public void testBatchStartPointerOnReplicaPromotion() throws Exception {
818+
// Step 1: Publish 10 messages
819+
for (int i = 1; i <= 10; i++) {
820+
produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index");
821+
}
822+
823+
// Step 2: Start nodes
824+
internalCluster().startClusterManagerOnlyNode();
825+
final String nodeA = internalCluster().startDataOnlyNode();
826+
827+
// Step 3: Create index with 1 replica
828+
createIndex(
829+
indexName,
830+
Settings.builder()
831+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
832+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
833+
.put("ingestion_source.type", "kafka")
834+
.put("ingestion_source.pointer.init.reset", "earliest")
835+
.put("ingestion_source.param.topic", topicName)
836+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
837+
.put("index.replication.type", "SEGMENT")
838+
.build(),
839+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
840+
);
841+
842+
ensureYellowAndNoInitializingShards(indexName);
843+
844+
// Step 4: Add second node and verify green status
845+
final String nodeB = internalCluster().startDataOnlyNode();
846+
ensureGreen(indexName);
847+
848+
// Step 5: Verify nodeA has the primary shard
849+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
850+
assertTrue(nodeB.equals(replicaNodeName(indexName)));
851+
verifyRemoteStoreEnabled(nodeA);
852+
verifyRemoteStoreEnabled(nodeB);
853+
854+
// Step 6: Wait for 10 messages to be searchable on both nodes
855+
waitForSearchableDocs(10, Arrays.asList(nodeA, nodeB));
856+
857+
// Step 7: Flush to persist data
858+
flush(indexName);
859+
860+
// Step 8: Bring down nodeA (primary) and wait for nodeB to become primary
861+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
862+
ensureYellowAndNoInitializingShards(indexName);
863+
assertTrue(nodeB.equals(primaryNodeName(indexName)));
864+
865+
// Step 9: Publish 1 new message
866+
produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index");
867+
868+
// Step 10: Wait for 11 messages to be visible on nodeB
869+
waitForSearchableDocs(11, Arrays.asList(nodeB));
870+
871+
// Step 11: Validate version conflict count is exactly 1
872+
PollingIngestStats finalStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
873+
.getPollingIngestStats();
874+
assertNotNull(finalStats);
875+
876+
assertEquals(1L, finalStats.getMessageProcessorStats().totalVersionConflictsCount());
877+
assertEquals(2L, finalStats.getMessageProcessorStats().totalProcessedCount());
878+
}
879+
880+
public void testPeriodicFlush() throws Exception {
881+
// Publish 10 messages
882+
for (int i = 1; i <= 10; i++) {
883+
produceData(String.valueOf(i), "name" + i, "25");
884+
}
885+
886+
// Start nodes
887+
internalCluster().startClusterManagerOnlyNode();
888+
final String nodeA = internalCluster().startDataOnlyNode();
889+
890+
// Create index with 5 second periodic flush interval
891+
createIndex(
892+
indexName,
893+
Settings.builder()
894+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
895+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
896+
.put("ingestion_source.type", "kafka")
897+
.put("ingestion_source.pointer.init.reset", "earliest")
898+
.put("ingestion_source.param.topic", topicName)
899+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
900+
.put("index.replication.type", "SEGMENT")
901+
.put("index.periodic_flush_interval", "5s")
902+
.build(),
903+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
904+
);
905+
906+
ensureGreen(indexName);
907+
verifyRemoteStoreEnabled(nodeA);
908+
909+
waitForSearchableDocs(10, Arrays.asList(nodeA));
910+
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);
911+
}
816912
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
147147
IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING,
148148
IndexSettings.INDEX_WARMER_ENABLED_SETTING,
149149
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING,
150+
IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING,
150151
IndexSettings.MAX_RESULT_WINDOW_SETTING,
151152
IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING,
152153
IndexSettings.MAX_TOKEN_COUNT_SETTING,

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,28 @@ public static IndexMergePolicy fromString(String text) {
380380
Property.Dynamic,
381381
Property.IndexScope
382382
);
383+
384+
/**
385+
* Periodic flush interval setting. By default, periodic flush is disabled (-1).
386+
* For pull-based ingestion indices, this defaults to 10 minutes to ensure offsets are regularly committed.
387+
*/
388+
public static final TimeValue DEFAULT_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE;
389+
public static final TimeValue MINIMUM_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE;
390+
public static final Setting<TimeValue> INDEX_PERIODIC_FLUSH_INTERVAL_SETTING = Setting.timeSetting(
391+
"index.periodic_flush_interval",
392+
(settings) -> {
393+
// Default to 10 minutes for pull-based ingestion indices, disabled otherwise
394+
String ingestionSourceType = IndexMetadata.INGESTION_SOURCE_TYPE_SETTING.get(settings);
395+
if (ingestionSourceType != null && !IndexMetadata.NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType)) {
396+
return TimeValue.timeValueMinutes(10);
397+
}
398+
return DEFAULT_PERIODIC_FLUSH_INTERVAL;
399+
},
400+
MINIMUM_PERIODIC_FLUSH_INTERVAL,
401+
Property.Dynamic,
402+
Property.IndexScope
403+
);
404+
383405
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting(
384406
"index.translog.flush_threshold_size",
385407
new ByteSizeValue(512, ByteSizeUnit.MB),
@@ -839,6 +861,7 @@ public static IndexMergePolicy fromString(String text) {
839861
private volatile TimeValue syncInterval;
840862
private volatile TimeValue publishReferencedSegmentsInterval;
841863
private volatile TimeValue refreshInterval;
864+
private volatile TimeValue periodicFlushInterval;
842865
private volatile ByteSizeValue flushThresholdSize;
843866
private volatile TimeValue translogRetentionAge;
844867
private volatile ByteSizeValue translogRetentionSize;
@@ -1057,6 +1080,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10571080
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
10581081
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
10591082
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
1083+
periodicFlushInterval = scopedSettings.get(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING);
10601084
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
10611085
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
10621086
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
@@ -1205,6 +1229,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12051229
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
12061230
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
12071231
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
1232+
scopedSettings.addSettingsUpdateConsumer(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING, this::setPeriodicFlushInterval);
12081233
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
12091234
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
12101235
scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount);
@@ -1302,6 +1327,10 @@ private void setRefreshInterval(TimeValue timeValue) {
13021327
this.refreshInterval = timeValue;
13031328
}
13041329

1330+
private void setPeriodicFlushInterval(TimeValue timeValue) {
1331+
this.periodicFlushInterval = timeValue;
1332+
}
1333+
13051334
/**
13061335
* Update the default maxMergesAtOnce
13071336
* 1. sets the new default in {@code TieredMergePolicyProvider}
@@ -1644,6 +1673,13 @@ public TimeValue getRefreshInterval() {
16441673
return refreshInterval;
16451674
}
16461675

1676+
/**
1677+
* Returns the interval at which a periodic flush should be executed. {@code -1} means periodic flush is disabled.
1678+
*/
1679+
public TimeValue getPeriodicFlushInterval() {
1680+
return periodicFlushInterval;
1681+
}
1682+
16471683
/**
16481684
* Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
16491685
*/

0 commit comments

Comments
 (0)