Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Security Manager Replacement] Enhance Java Agent to intercept Runtime::halt ([#17757](https://github.com/opensearch-project/OpenSearch/pull/17757))
- [Security Manager Replacement] Phase off SecurityManager usage in favor of Java Agent ([#17861](https://github.com/opensearch-project/OpenSearch/pull/17861))
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
- Introduce new dynamic cluster setting to tune maxMergeAtOnce at cluster level ([#17774](https://github.com/opensearch-project/OpenSearch/pull/17774))
- Add TLS enabled SecureNetty4GrpcServerTransport ([#17796](https://github.com/opensearch-project/OpenSearch/pull/17796))
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
- [Tiered caching] Create a single cache manager for all the disk caches. ([#17513](https://github.com/opensearch-project/OpenSearch/pull/17513))
Expand All @@ -48,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Increase the floor segment size to 16MB ([#17699](https://github.com/opensearch-project/OpenSearch/pull/17699))
- Unwrap singleton DocValues in global ordinal value source of composite histogram aggregation ([#17740](https://github.com/opensearch-project/OpenSearch/pull/17740))
- Unwrap singleton DocValues in date histogram aggregation. ([#17643](https://github.com/opensearch-project/OpenSearch/pull/17643))
- Increase the default maxMergesAtOnce to 30 ([#17774](https://github.com/opensearch-project/OpenSearch/pull/17774))
- Introduce 512 byte limit to search and ingest pipeline IDs ([#17786](https://github.com/opensearch-project/OpenSearch/pull/17786))
- Avoid skewed segment replication lag metric ([#17831](https://github.com/opensearch-project/OpenSearch/pull/17831))
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.index;

import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.indices.IndicesService;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2)
public class ClusterMaxMergesAtOnceIT extends AbstractSnapshotIntegTestCase {

@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();
internalCluster().startClusterManagerOnlyNode();
}

public void testClusterLevelDefaultUpdatesMergePolicy() throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
List<String> dataNodes = new ArrayList<>(internalCluster().getDataNodeNames());

String indexName = "log-myindex-1";
createIndex(indexName);
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes));
String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
assertEquals(30, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());

client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.getKey(), 20))
.get();

indexName = "log-myindex-2";
createIndex(indexName);
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes));
uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService secondIndexService = indicesService.indexService(new Index(indexName, uuid));
assertEquals(20, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(20, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());

// Create index with index level override in settings
indexName = "log-myindex-3";
createIndex(
indexName,
Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 15).build()
);
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes));
uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService thirdIndexService = indicesService.indexService(new Index(indexName, uuid));
assertEquals(15, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());

// changing cluster level default should only affect indices without index level override
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.getKey(), 35))
.get();
assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(15, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());

// removing index level override should pick up the cluster level default
UpdateSettingsRequestBuilder builder = client().admin().indices().prepareUpdateSettings(indexName);
builder.setSettings(
Settings.builder().putNull(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey()).build()
);
builder.execute().actionGet();

assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(35, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());

// update index level setting to override cluster level default
builder = client().admin().indices().prepareUpdateSettings(indexName);
builder.setSettings(
Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 17).build()
);
builder.execute().actionGet();

assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
assertEquals(17, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
Expand Down
12 changes: 8 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,8 @@
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
) throws IOException {
return newIndexService(
indexCreationContext,
Expand All @@ -658,7 +659,8 @@
recoverySettings,
remoteStoreSettings,
(s) -> {},
shardId -> ReplicationStats.empty()
shardId -> ReplicationStats.empty(),

Check warning on line 662 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L662

Added line #L662 was not covered by tests
clusterDefaultMaxMergeAtOnceSupplier
);
}

Expand Down Expand Up @@ -686,7 +688,8 @@
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -750,7 +753,8 @@
fileCache,
compositeIndexSettings,
replicator,
segmentReplicationStatsProvider
segmentReplicationStatsProvider,
clusterDefaultMaxMergeAtOnceSupplier
);
success = true;
return indexService;
Expand Down
17 changes: 14 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator,
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -326,6 +327,7 @@
this.fileCache = fileCache;
this.replicator = replicator;
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get());
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -364,7 +366,8 @@
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
RemoteStoreSettings remoteStoreSettings,
Supplier<Integer> clusterDefaultMaxMergeAtOnce
) {
this(
indexSettings,
Expand Down Expand Up @@ -405,7 +408,8 @@
null,
null,
s -> {},
(shardId) -> ReplicationStats.empty()
(shardId) -> ReplicationStats.empty(),

Check warning on line 411 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L411

Added line #L411 was not covered by tests
clusterDefaultMaxMergeAtOnce
);
}

Expand Down Expand Up @@ -1124,6 +1128,13 @@
}
}

/**
* Called whenever the cluster level {@code cluster.default.index.max_merge_at_once} changes.
*/
public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce);
}

Check warning on line 1136 in server/src/main/java/org/opensearch/index/IndexService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexService.java#L1135-L1136

Added lines #L1135 - L1136 were not covered by tests

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down
31 changes: 29 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
tieredMergePolicyProvider::setFloorSegmentSetting
);
scopedSettings.addSettingsUpdateConsumer(
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
tieredMergePolicyProvider::setMaxMergesAtOnce
this::updateMaxMergesAtOnce,
List.of(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING)
);
scopedSettings.addSettingsUpdateConsumer(
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
Expand Down Expand Up @@ -1250,6 +1250,33 @@ private void setRefreshInterval(TimeValue timeValue) {
this.refreshInterval = timeValue;
}

/**
* Update the default maxMergesAtOnce
* 1. sets the new default in {@code TieredMergePolicyProvider}
* 2. sets the maxMergesAtOnce on the actual TieredMergePolicy used by the engine if no index level override exists
*/
void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) {
tieredMergePolicyProvider.setDefaultMaxMergesAtOnce(newDefaultMaxMergesAtOnce);
if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(getSettings()) == false) {
tieredMergePolicyProvider.setMaxMergesAtOnceToDefault();
}
}

/**
* Updates the maxMergesAtOnce for actual TieredMergePolicy used by the engine.
* Sets it to default maxMergesAtOnce if index level settings is being removed
*/
void updateMaxMergesAtOnce(Settings updatedSettings) {
if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(updatedSettings) == false) {
logger.debug("Resetting maxMergesAtOnce to cluster default");
tieredMergePolicyProvider.setMaxMergesAtOnceToDefault();
} else {
tieredMergePolicyProvider.setMaxMergesAtOnce(
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.get(updatedSettings)
);
}
}

/**
* Returns the settings for this index. These settings contain the node and index level settings where
* settings that are specified on both index and node level are overwritten by the index settings.
Expand Down
Loading
Loading