Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.x]
### Added
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))

### Changed

Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Kunal Kotwani | [kotwanikunal](https://github.com/kotwanikunal) | Amazon |
| Varun Bansal | [linuxpi](https://github.com/linuxpi) | Amazon |
| Marc Handalian | [mch2](https://github.com/mch2) | Amazon |
| Michael Froh | [msfroh](https://github.com/msfroh) | Amazon |
| Michael Froh | [msfroh](https://github.com/msfroh) | Uber |
| Nick Knize | [nknize](https://github.com/nknize) | Lucenia |
| Owais Kazi | [owaiskazi19](https://github.com/owaiskazi19) | Amazon |
| Pan Guixin | [bugmakerrrrrr](https://github.com/bugmakerrrrrr) | ByteDance |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.NormalizationProcessor;
import org.opensearch.search.pipeline.NormalizationProcessorFactory;
import org.opensearch.search.pipeline.NormalizationProcessorWorkflow;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.ScoreCombinationFactory;
import org.opensearch.search.pipeline.ScoreCombiner;
import org.opensearch.search.pipeline.ScoreNormalizationFactory;
import org.opensearch.search.pipeline.ScoreNormalizer;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
Expand Down Expand Up @@ -107,7 +114,18 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces

@Override
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
return filterForAllowlistSetting(SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING, parameters.env.settings(), Map.of());
return filterForAllowlistSetting(
SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING,
parameters.env.settings(),
Map.of(
NormalizationProcessor.TYPE,
new NormalizationProcessorFactory(
new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner()),
new ScoreNormalizationFactory(),
new ScoreCombinationFactory()
)
)
);
}

private <T extends Processor> Map<String, Processor.Factory<T>> filterForAllowlistSetting(
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ dependencies {
api "org.apache.commons:commons-compress:${versions.commonscompress}"
api 'org.apache.commons:commons-configuration2:2.11.0'
api "commons-io:commons-io:${versions.commonsio}"
api 'org.apache.commons:commons-lang3:3.17.0'
api "org.apache.commons:commons-lang3:${versions.commonslang}"
implementation 'com.google.re2j:re2j:1.8'
api 'javax.servlet:servlet-api:2.5'
api "org.slf4j:slf4j-api:${versions.slf4j}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ public void testCloseClientStreamingRequest() throws Exception {
})
.then(() -> scheduler.advanceTimeBy(delay))
.expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException)
.verify();
.verify(Duration.ofSeconds(10));
}
}
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
api project(":libs:opensearch-geo")
api project(":libs:opensearch-telemetry")
api project(":libs:opensearch-task-commons")
compileOnly group: 'commons-lang', name: 'commons-lang', version: '2.6'

compileOnly project(":libs:agent-sm:bootstrap")
compileOnly project(':libs:opensearch-plugin-classloader')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

import java.io.IOException;
Expand Down Expand Up @@ -721,7 +722,11 @@ public static final IndexShard newIndexShard(
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
mock(Function.class),
new MergedSegmentWarmerFactory(null, null, null)
new MergedSegmentWarmerFactory(null, null, null),
false,
OpenSearchTestCase::randomBoolean,
() -> indexService.getIndexSettings().getRefreshInterval(),
indexService.getRefreshMutex()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.env.Environment;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
Expand Down Expand Up @@ -450,7 +451,12 @@ public void testSnapshotWithStuckNode() throws Exception {
createRepository(
"test-repo",
"mock",
Settings.builder().put("location", repo).put("random", randomAlphaOfLength(10)).put("wait_after_unblock", 200)
Settings.builder()
.put("location", repo)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
// TODO: There's likely a bug with other path types where cleanup seems to leave unexpected files
.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), RemoteStoreEnums.PathType.FIXED)
);

// Pick one node and block it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,8 @@ public void apply(Settings value, Settings current, Settings previous) {
),

// Setting related to refresh optimisations
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
IndicesService.CLUSTER_REFRESH_SHARD_LEVEL_ENABLED_SETTING
)
)
);
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
Supplier<Boolean> shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
Expand All @@ -656,6 +657,7 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled,
recoverySettings,
remoteStoreSettings,
(s) -> {},
Expand Down Expand Up @@ -685,6 +687,7 @@ public IndexService newIndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
Supplier<Boolean> shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator,
Expand Down Expand Up @@ -748,6 +751,7 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled.get(),
recoverySettings,
remoteStoreSettings,
fileCache,
Expand Down
107 changes: 100 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
private final Object refreshMutex = new Object();
private volatile TimeValue refreshInterval;
private volatile boolean shardLevelRefreshEnabled;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -234,6 +237,7 @@ public IndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
boolean shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
Expand Down Expand Up @@ -311,8 +315,9 @@ public IndexService(
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
this.shardLevelRefreshEnabled = shardLevelRefreshEnabled;
this.refreshInterval = getRefreshInterval();
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
// disable these checks for ingestion source engine
if (!indexSettings.getIndexMetadata().useIngestionSource()) {
Expand All @@ -329,6 +334,12 @@ public IndexService(
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get());
updateFsyncTaskIfNecessary();
synchronized (refreshMutex) {
if (shardLevelRefreshEnabled == false) {
logger.debug("shard level refresh is disabled for index [{}]", indexSettings.getIndex().getName());
startIndexLevelRefreshTask();
}
}
}

public IndexService(
Expand Down Expand Up @@ -365,6 +376,7 @@ public IndexService(
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
boolean shardLevelRefreshEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnce
Expand Down Expand Up @@ -403,6 +415,7 @@ public IndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
shardLevelRefreshEnabled,
recoverySettings,
remoteStoreSettings,
null,
Expand Down Expand Up @@ -708,7 +721,11 @@ protected void closeInternal() {
seedRemote,
discoveryNodes,
segmentReplicationStatsProvider,
mergedSegmentWarmerFactory
mergedSegmentWarmerFactory,
shardLevelRefreshEnabled,
fixedRefreshIntervalSchedulingEnabled,
this::getRefreshInterval,
refreshMutex
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -1142,9 +1159,10 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
* 2. {@code index.refresh_interval} index setting changes.
*/
public void onRefreshIntervalChange() {
if (refreshTask.getInterval().equals(getRefreshInterval())) {
if (refreshInterval.equals(getRefreshInterval())) {
return;
}
refreshInterval = getRefreshInterval();
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
Expand Down Expand Up @@ -1186,10 +1204,20 @@ private void updateFsyncTaskIfNecessary() {
}

private void rescheduleRefreshTasks() {
try {
refreshTask.close();
} finally {
refreshTask = new AsyncRefreshTask(this);
synchronized (refreshMutex) {
if (shardLevelRefreshEnabled) {
try {
stopShardLevelRefreshTasks();
} finally {
startShardLevelRefreshTasks();
}
} else {
try {
stopIndexLevelRefreshTask();
} finally {
startIndexLevelRefreshTask();
}
}
}
}

Expand Down Expand Up @@ -1593,4 +1621,69 @@ public boolean clearCaches(boolean queryCache, boolean fieldDataCache, String...
return clearedAtLeastOne;
}

/**
* This method is called when the refresh level is changed from index level to shard level or vice versa.
*/
public void onRefreshLevelChange(boolean newShardLevelRefreshVal) {
synchronized (refreshMutex) {
if (this.shardLevelRefreshEnabled != newShardLevelRefreshVal) {
boolean prevShardLevelRefreshVal = this.shardLevelRefreshEnabled;
this.shardLevelRefreshEnabled = newShardLevelRefreshVal;
// The refresh mode has changed from index level to shard level and vice versa
logger.info("refresh tasks rescheduled oldVal={} newVal={}", prevShardLevelRefreshVal, newShardLevelRefreshVal);
if (newShardLevelRefreshVal) {
try {
stopIndexLevelRefreshTask();
} finally {
startShardLevelRefreshTasks();
}
} else {
try {
stopShardLevelRefreshTasks();
} finally {
startIndexLevelRefreshTask();
}
}
}
}
}

private void stopIndexLevelRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be non-null at this point.
assert Objects.nonNull(refreshTask);
refreshTask.close();
refreshTask = null;
}

private void startShardLevelRefreshTasks() {
assert Thread.holdsLock(refreshMutex);
for (IndexShard shard : this.shards.values()) {
shard.startRefreshTask();
}
}

private void stopShardLevelRefreshTasks() {
assert Thread.holdsLock(refreshMutex);
for (IndexShard shard : this.shards.values()) {
shard.stopRefreshTask();
}
}

private void startIndexLevelRefreshTask() {
assert Thread.holdsLock(refreshMutex);
// The refresh task is expected to be null at this point.
assert Objects.isNull(refreshTask);
refreshTask = new AsyncRefreshTask(this);
}

// Visible for testing
boolean isShardLevelRefreshEnabled() {
return shardLevelRefreshEnabled;
}

// Visible for testing
public Object getRefreshMutex() {
return refreshMutex;
}
}
Loading
Loading