Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Shard Indexing Pressure Settings. (#478) #716

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.Iterator;

/**
* This class contains all the settings which are required and owned by {TODO link ShardIndexingPressure}. These will be
* referenced/used in ShardIndexingPressure, as well as its dependent components, i.e.
* {TODO link ShardIndexingPressureMemoryManager} and {TODO link ShardIndexingPressureStore}
*/
public final class ShardIndexingPressureSettings {

public static final String SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY = "shard_indexing_pressure_enabled";

public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENABLED =
Setting.boolSetting("shard_indexing_pressure.enabled", false, Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set to true, shard level
* rejection will be performed, otherwise only rejection metrics will be populated.
*/
public static final Setting<Boolean> SHARD_INDEXING_PRESSURE_ENFORCED =
Setting.boolSetting("shard_indexing_pressure.enforced", false, Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This represents the window size of last N request sampled and considered for throughput evaluation.
*/
public static final Setting<Integer> REQUEST_SIZE_WINDOW =
Setting.intSetting("shard_indexing_pressure.secondary_parameter.throughput.request_size_window", 2000,
Setting.Property.NodeScope, Setting.Property.Dynamic);

/**
* This represents the base limit set for the utilization of every shard. Will be initilized as 1/1000th bytes of node limits.
*/
public static final Setting<Double> SHARD_MIN_LIMIT =
Setting.doubleSetting("shard_indexing_pressure.primary_parameter.shard.min_limit", 0.001d, 0.0d,
Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile boolean shardIndexingPressureEnabled;
private volatile boolean shardIndexingPressureEnforced;
private volatile long shardPrimaryAndCoordinatingBaseLimits;
private volatile long shardReplicaBaseLimits;
private volatile int requestSizeWindow;
private volatile double shardMinLimit;
private final long primaryAndCoordinatingNodeLimits;
private static ClusterService clusterService;

public ShardIndexingPressureSettings(ClusterService clusterService, Settings settings, long primaryAndCoordinatingLimits) {
ShardIndexingPressureSettings.clusterService = clusterService;
ClusterSettings clusterSettings = clusterService.getClusterSettings();

this.shardIndexingPressureEnabled = SHARD_INDEXING_PRESSURE_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SHARD_INDEXING_PRESSURE_ENABLED, this::setShardIndexingPressureEnabled);

this.shardIndexingPressureEnforced = SHARD_INDEXING_PRESSURE_ENFORCED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SHARD_INDEXING_PRESSURE_ENFORCED, this::setShardIndexingPressureEnforced);

this.requestSizeWindow = REQUEST_SIZE_WINDOW.get(settings).intValue();
clusterSettings.addSettingsUpdateConsumer(REQUEST_SIZE_WINDOW, this::setRequestSizeWindow);

this.primaryAndCoordinatingNodeLimits = primaryAndCoordinatingLimits;

this.shardMinLimit = SHARD_MIN_LIMIT.get(settings).floatValue();
this.shardPrimaryAndCoordinatingBaseLimits = (long) (primaryAndCoordinatingLimits * shardMinLimit);
this.shardReplicaBaseLimits = (long) (shardPrimaryAndCoordinatingBaseLimits * 1.5);
clusterSettings.addSettingsUpdateConsumer(SHARD_MIN_LIMIT, this::setShardMinLimit);
}

public static boolean isShardIndexingPressureAttributeEnabled() {
Iterator<DiscoveryNode> nodes = clusterService.state().getNodes().getNodes().valuesIt();
while (nodes.hasNext()) {
if (Boolean.parseBoolean(nodes.next().getAttributes().get(SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY)) == false) {
return false;
}
}
return true;
}

private void setShardIndexingPressureEnabled(Boolean shardIndexingPressureEnableValue) {
this.shardIndexingPressureEnabled = shardIndexingPressureEnableValue;
}

private void setShardIndexingPressureEnforced(Boolean shardIndexingPressureEnforcedValue) {
this.shardIndexingPressureEnforced = shardIndexingPressureEnforcedValue;
}

private void setRequestSizeWindow(int requestSizeWindow) {
this.requestSizeWindow = requestSizeWindow;
}

private void setShardMinLimit(double shardMinLimit) {
this.shardMinLimit = shardMinLimit;

//Updating the dependent value once when the dynamic settings update
this.setShardPrimaryAndCoordinatingBaseLimits();
this.setShardReplicaBaseLimits();
}

private void setShardPrimaryAndCoordinatingBaseLimits() {
shardPrimaryAndCoordinatingBaseLimits = (long) (primaryAndCoordinatingNodeLimits * shardMinLimit);
}

private void setShardReplicaBaseLimits() {
shardReplicaBaseLimits = (long) (shardPrimaryAndCoordinatingBaseLimits * 1.5);
}

public boolean isShardIndexingPressureEnabled() {
return shardIndexingPressureEnabled;
}

public boolean isShardIndexingPressureEnforced() {
return shardIndexingPressureEnforced;
}

public int getRequestSizeWindow() {
return requestSizeWindow;
}

public long getShardPrimaryAndCoordinatingBaseLimits() {
return shardPrimaryAndCoordinatingBaseLimits;
}

public long getShardReplicaBaseLimits() {
return shardReplicaBaseLimits;
}

public long getNodePrimaryAndCoordinatingLimits() {
return primaryAndCoordinatingNodeLimits;
}

public long getNodeReplicaLimits() {
return (long) (primaryAndCoordinatingNodeLimits * 1.5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

public class ShardIndexingPressureSettingsTests extends OpenSearchTestCase {

private final Settings settings = Settings.builder()
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10MB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 2000)
.put(ShardIndexingPressureSettings.SHARD_MIN_LIMIT.getKey(), 0.001d)
.build();

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterService clusterService = new ClusterService(settings, clusterSettings, null);

public void testFromSettings() {
ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings,
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());

assertTrue(shardIndexingPressureSettings.isShardIndexingPressureEnabled());
assertTrue(shardIndexingPressureSettings.isShardIndexingPressureEnforced());
assertEquals(2000, shardIndexingPressureSettings.getRequestSizeWindow());

// Node level limits
long nodePrimaryAndCoordinatingLimits = shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits();
long tenMB = 10 * 1024 * 1024;
assertEquals(tenMB, nodePrimaryAndCoordinatingLimits);
assertEquals((long)(tenMB * 1.5), shardIndexingPressureSettings.getNodeReplicaLimits());

// Shard Level Limits
long shardPrimaryAndCoordinatingBaseLimits = (long) (nodePrimaryAndCoordinatingLimits * 0.001d);
assertEquals(shardPrimaryAndCoordinatingBaseLimits, shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits());
assertEquals((long)(shardPrimaryAndCoordinatingBaseLimits * 1.5),
shardIndexingPressureSettings.getShardReplicaBaseLimits());
}

public void testUpdateSettings() {
ShardIndexingPressureSettings shardIndexingPressureSettings = new ShardIndexingPressureSettings(clusterService, settings,
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());

Settings.Builder updated = Settings.builder();
clusterSettings.updateDynamicSettings(Settings.builder()
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), false)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), false)
.put(ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW.getKey(), 4000)
.put(ShardIndexingPressureSettings.SHARD_MIN_LIMIT.getKey(), 0.003d)
.build(),
Settings.builder().put(settings), updated, getTestClass().getName());
clusterSettings.applySettings(updated.build());

assertFalse(shardIndexingPressureSettings.isShardIndexingPressureEnabled());
assertFalse(shardIndexingPressureSettings.isShardIndexingPressureEnforced());
assertEquals(4000, shardIndexingPressureSettings.getRequestSizeWindow());

// Node level limits
long nodePrimaryAndCoordinatingLimits = shardIndexingPressureSettings.getNodePrimaryAndCoordinatingLimits();
long tenMB = 10 * 1024 * 1024;
assertEquals(tenMB, nodePrimaryAndCoordinatingLimits);
assertEquals((long)(tenMB * 1.5), shardIndexingPressureSettings.getNodeReplicaLimits());

// Shard Level Limits
long shardPrimaryAndCoordinatingBaseLimits = (long) (nodePrimaryAndCoordinatingLimits * 0.003d);
assertEquals(shardPrimaryAndCoordinatingBaseLimits, shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits());
assertEquals((long)(shardPrimaryAndCoordinatingBaseLimits * 1.5),
shardIndexingPressureSettings.getShardReplicaBaseLimits());
}
}