Skip to content

Commit 96481cc

Browse files
authored
Implement parallel shard refresh behind cluster settings (#17782)
* Implement parallel shard refresh behind cluster settings Signed-off-by: Ashish Singh <ssashish@amazon.com> * Incorporate PR review comments Signed-off-by: Ashish Singh <ssashish@amazon.com> * Incorporate PR review comments Signed-off-by: Ashish Singh <ssashish@amazon.com> * Fix compilation failure Signed-off-by: Ashish Singh <ssashish@amazon.com> --------- Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 473665f commit 96481cc

File tree

10 files changed

+342
-14
lines changed

10 files changed

+342
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 3.x]
77
### Added
88
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
9+
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
910

1011
### Changed
1112

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import org.opensearch.test.IndexSettingsModule;
9696
import org.opensearch.test.InternalSettingsPlugin;
9797
import org.opensearch.test.OpenSearchSingleNodeTestCase;
98+
import org.opensearch.test.OpenSearchTestCase;
9899
import org.junit.Assert;
99100

100101
import java.io.IOException;
@@ -721,7 +722,11 @@ public static final IndexShard newIndexShard(
721722
false,
722723
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
723724
mock(Function.class),
724-
new MergedSegmentWarmerFactory(null, null, null)
725+
new MergedSegmentWarmerFactory(null, null, null),
726+
false,
727+
OpenSearchTestCase::randomBoolean,
728+
() -> indexService.getIndexSettings().getRefreshInterval(),
729+
indexService.getRefreshMutex()
725730
);
726731
}
727732

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,8 @@ public void apply(Settings value, Settings current, Settings previous) {
828828
),
829829

830830
// Setting related to refresh optimisations
831-
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
831+
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
832+
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING
832833
)
833834
)
834835
);

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ public IndexService newIndexService(
631631
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
632632
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
633633
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
634+
Supplier<Boolean> shardLevelRefreshEnabled,
634635
RecoverySettings recoverySettings,
635636
RemoteStoreSettings remoteStoreSettings,
636637
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
@@ -656,6 +657,7 @@ public IndexService newIndexService(
656657
translogFactorySupplier,
657658
clusterDefaultRefreshIntervalSupplier,
658659
fixedRefreshIntervalSchedulingEnabled,
660+
shardLevelRefreshEnabled,
659661
recoverySettings,
660662
remoteStoreSettings,
661663
(s) -> {},
@@ -685,6 +687,7 @@ public IndexService newIndexService(
685687
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
686688
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
687689
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
690+
Supplier<Boolean> shardLevelRefreshEnabled,
688691
RecoverySettings recoverySettings,
689692
RemoteStoreSettings remoteStoreSettings,
690693
Consumer<IndexShard> replicator,
@@ -748,6 +751,7 @@ public IndexService newIndexService(
748751
translogFactorySupplier,
749752
clusterDefaultRefreshIntervalSupplier,
750753
fixedRefreshIntervalSchedulingEnabled,
754+
shardLevelRefreshEnabled.get(),
751755
recoverySettings,
752756
remoteStoreSettings,
753757
fileCache,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
199199
private final CompositeIndexSettings compositeIndexSettings;
200200
private final Consumer<IndexShard> replicator;
201201
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
202+
private final Object refreshMutex = new Object();
203+
private volatile TimeValue refreshInterval;
204+
private volatile boolean shardLevelRefreshEnabled;
202205

203206
public IndexService(
204207
IndexSettings indexSettings,
@@ -234,6 +237,7 @@ public IndexService(
234237
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
235238
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
236239
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
240+
boolean shardLevelRefreshEnabled,
237241
RecoverySettings recoverySettings,
238242
RemoteStoreSettings remoteStoreSettings,
239243
FileCache fileCache,
@@ -311,8 +315,9 @@ public IndexService(
311315
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
312316
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
313317
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
318+
this.shardLevelRefreshEnabled = shardLevelRefreshEnabled;
319+
this.refreshInterval = getRefreshInterval();
314320
// kick off async ops for the first shard in this index
315-
this.refreshTask = new AsyncRefreshTask(this);
316321
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
317322
// disable these checks for ingestion source engine
318323
if (!indexSettings.getIndexMetadata().useIngestionSource()) {
@@ -329,6 +334,12 @@ public IndexService(
329334
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
330335
indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get());
331336
updateFsyncTaskIfNecessary();
337+
synchronized (refreshMutex) {
338+
if (shardLevelRefreshEnabled == false) {
339+
logger.debug("shard level refresh is disabled for index [{}]", indexSettings.getIndex().getName());
340+
startIndexLevelRefreshTask();
341+
}
342+
}
332343
}
333344

334345
public IndexService(
@@ -365,6 +376,7 @@ public IndexService(
365376
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
366377
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
367378
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
379+
boolean shardLevelRefreshEnabled,
368380
RecoverySettings recoverySettings,
369381
RemoteStoreSettings remoteStoreSettings,
370382
Supplier<Integer> clusterDefaultMaxMergeAtOnce
@@ -403,6 +415,7 @@ public IndexService(
403415
translogFactorySupplier,
404416
clusterDefaultRefreshIntervalSupplier,
405417
fixedRefreshIntervalSchedulingEnabled,
418+
shardLevelRefreshEnabled,
406419
recoverySettings,
407420
remoteStoreSettings,
408421
null,
@@ -708,7 +721,11 @@ protected void closeInternal() {
708721
seedRemote,
709722
discoveryNodes,
710723
segmentReplicationStatsProvider,
711-
mergedSegmentWarmerFactory
724+
mergedSegmentWarmerFactory,
725+
shardLevelRefreshEnabled,
726+
fixedRefreshIntervalSchedulingEnabled,
727+
this::getRefreshInterval,
728+
refreshMutex
712729
);
713730
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
714731
eventListener.afterIndexShardCreated(indexShard);
@@ -1142,9 +1159,10 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
11421159
* 2. {@code index.refresh_interval} index setting changes.
11431160
*/
11441161
public void onRefreshIntervalChange() {
1145-
if (refreshTask.getInterval().equals(getRefreshInterval())) {
1162+
if (refreshInterval.equals(getRefreshInterval())) {
11461163
return;
11471164
}
1165+
refreshInterval = getRefreshInterval();
11481166
// once we change the refresh interval we schedule yet another refresh
11491167
// to ensure we are in a clean and predictable state.
11501168
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
@@ -1186,10 +1204,20 @@ private void updateFsyncTaskIfNecessary() {
11861204
}
11871205

11881206
private void rescheduleRefreshTasks() {
1189-
try {
1190-
refreshTask.close();
1191-
} finally {
1192-
refreshTask = new AsyncRefreshTask(this);
1207+
synchronized (refreshMutex) {
1208+
if (shardLevelRefreshEnabled) {
1209+
try {
1210+
stopShardLevelRefreshTasks();
1211+
} finally {
1212+
startShardLevelRefreshTasks();
1213+
}
1214+
} else {
1215+
try {
1216+
stopIndexLevelRefreshTask();
1217+
} finally {
1218+
startIndexLevelRefreshTask();
1219+
}
1220+
}
11931221
}
11941222
}
11951223

@@ -1593,4 +1621,69 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String...
15931621
return clearedAtLeastOne;
15941622
}
15951623

1624+
/**
1625+
* This method is called when the refresh level is changed from index level to shard level or vice versa.
1626+
*/
1627+
public void onRefreshLevelChange(boolean newShardLevelRefreshVal) {
1628+
synchronized (refreshMutex) {
1629+
if (this.shardLevelRefreshEnabled != newShardLevelRefreshVal) {
1630+
boolean prevShardLevelRefreshVal = this.shardLevelRefreshEnabled;
1631+
this.shardLevelRefreshEnabled = newShardLevelRefreshVal;
1632+
// The refresh mode has changed from index level to shard level and vice versa
1633+
logger.info("refresh tasks rescheduled oldVal={} newVal={}", prevShardLevelRefreshVal, newShardLevelRefreshVal);
1634+
if (newShardLevelRefreshVal) {
1635+
try {
1636+
stopIndexLevelRefreshTask();
1637+
} finally {
1638+
startShardLevelRefreshTasks();
1639+
}
1640+
} else {
1641+
try {
1642+
stopShardLevelRefreshTasks();
1643+
} finally {
1644+
startIndexLevelRefreshTask();
1645+
}
1646+
}
1647+
}
1648+
}
1649+
}
1650+
1651+
private void stopIndexLevelRefreshTask() {
1652+
assert Thread.holdsLock(refreshMutex);
1653+
// The refresh task is expected to be non-null at this point.
1654+
assert Objects.nonNull(refreshTask);
1655+
refreshTask.close();
1656+
refreshTask = null;
1657+
}
1658+
1659+
private void startShardLevelRefreshTasks() {
1660+
assert Thread.holdsLock(refreshMutex);
1661+
for (IndexShard shard : this.shards.values()) {
1662+
shard.startRefreshTask();
1663+
}
1664+
}
1665+
1666+
private void stopShardLevelRefreshTasks() {
1667+
assert Thread.holdsLock(refreshMutex);
1668+
for (IndexShard shard : this.shards.values()) {
1669+
shard.stopRefreshTask();
1670+
}
1671+
}
1672+
1673+
private void startIndexLevelRefreshTask() {
1674+
assert Thread.holdsLock(refreshMutex);
1675+
// The refresh task is expected to be null at this point.
1676+
assert Objects.isNull(refreshTask);
1677+
refreshTask = new AsyncRefreshTask(this);
1678+
}
1679+
1680+
// Visible for testing
1681+
boolean isShardLevelRefreshEnabled() {
1682+
return shardLevelRefreshEnabled;
1683+
}
1684+
1685+
// Visible for testing
1686+
public Object getRefreshMutex() {
1687+
return refreshMutex;
1688+
}
15961689
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.opensearch.common.CheckedRunnable;
8383
import org.opensearch.common.Nullable;
8484
import org.opensearch.common.SetOnce;
85+
import org.opensearch.common.annotation.ExperimentalApi;
8586
import org.opensearch.common.annotation.PublicApi;
8687
import org.opensearch.common.collect.Tuple;
8788
import org.opensearch.common.concurrent.GatedCloseable;
@@ -95,6 +96,7 @@
9596
import org.opensearch.common.settings.Settings;
9697
import org.opensearch.common.unit.TimeValue;
9798
import org.opensearch.common.util.BigArrays;
99+
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
98100
import org.opensearch.common.util.concurrent.AbstractRunnable;
99101
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
100102
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
@@ -369,6 +371,10 @@ Runnable getGlobalCheckpointSyncer() {
369371
private DiscoveryNodes discoveryNodes;
370372
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
371373
private final MergedSegmentWarmerFactory mergedSegmentWarmerFactory;
374+
private final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled;
375+
private final Supplier<TimeValue> refreshInterval;
376+
private final Object refreshMutex;
377+
private volatile AsyncShardRefreshTask refreshTask;
372378

373379
public IndexShard(
374380
final ShardRouting shardRouting,
@@ -401,7 +407,11 @@ public IndexShard(
401407
boolean seedRemote,
402408
final DiscoveryNodes discoveryNodes,
403409
final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
404-
final MergedSegmentWarmerFactory mergedSegmentWarmerFactory
410+
final MergedSegmentWarmerFactory mergedSegmentWarmerFactory,
411+
final boolean shardLevelRefreshEnabled,
412+
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
413+
final Supplier<TimeValue> refreshInterval,
414+
final Object refreshMutex
405415
) throws IOException {
406416
super(shardRouting.shardId(), indexSettings);
407417
assert shardRouting.initializing();
@@ -505,6 +515,14 @@ public boolean shouldCache(Query query) {
505515
this.discoveryNodes = discoveryNodes;
506516
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
507517
this.mergedSegmentWarmerFactory = mergedSegmentWarmerFactory;
518+
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
519+
this.refreshInterval = refreshInterval;
520+
this.refreshMutex = Objects.requireNonNull(refreshMutex);
521+
synchronized (this.refreshMutex) {
522+
if (shardLevelRefreshEnabled) {
523+
startRefreshTask();
524+
}
525+
}
508526
}
509527

510528
/**
@@ -2111,7 +2129,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
21112129
} finally {
21122130
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
21132131
// Also closing refreshListeners to prevent us from accumulating any more listeners
2114-
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
2132+
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, refreshTask);
21152133

21162134
if (deleted && engine != null && isPrimaryMode()) {
21172135
// Translog Clean up
@@ -5497,4 +5515,62 @@ public ShardIngestionState getIngestionState() {
54975515
IngestionEngine ingestionEngine = (IngestionEngine) engine;
54985516
return ingestionEngine.getIngestionState();
54995517
}
5518+
5519+
/**
5520+
* Async shard refresh task for running refreshes at shard level independently
5521+
*/
5522+
@ExperimentalApi
5523+
public final class AsyncShardRefreshTask extends AbstractAsyncTask {
5524+
5525+
private final IndexShard indexShard;
5526+
private final Logger logger;
5527+
5528+
public AsyncShardRefreshTask(IndexShard indexShard) {
5529+
super(indexShard.logger, indexShard.threadPool, refreshInterval.get(), true, fixedRefreshIntervalSchedulingEnabled);
5530+
this.logger = indexShard.logger;
5531+
this.indexShard = indexShard;
5532+
rescheduleIfNecessary();
5533+
}
5534+
5535+
@Override
5536+
protected boolean mustReschedule() {
5537+
return indexShard.state != IndexShardState.CLOSED
5538+
&& indexShard.indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN;
5539+
}
5540+
5541+
@Override
5542+
protected void runInternal() {
5543+
indexShard.scheduledRefresh();
5544+
}
5545+
5546+
@Override
5547+
protected String getThreadPool() {
5548+
return ThreadPool.Names.REFRESH;
5549+
}
5550+
5551+
@Override
5552+
public String toString() {
5553+
return "shard_refresh";
5554+
}
5555+
}
5556+
5557+
public void startRefreshTask() {
5558+
assert Thread.holdsLock(refreshMutex);
5559+
// The refresh task is expected to be null at this point.
5560+
assert Objects.isNull(refreshTask);
5561+
refreshTask = new AsyncShardRefreshTask(this);
5562+
}
5563+
5564+
public void stopRefreshTask() {
5565+
assert Thread.holdsLock(refreshMutex);
5566+
// The refresh task is expected to be non-null at this point.
5567+
assert Objects.nonNull(refreshTask);
5568+
refreshTask.close();
5569+
refreshTask = null;
5570+
}
5571+
5572+
// Visible for testing
5573+
public AsyncShardRefreshTask getRefreshTask() {
5574+
return refreshTask;
5575+
}
55005576
}

0 commit comments

Comments
 (0)