Skip to content

Commit

Permalink
change disk circuit breaker to cluster settings
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Jul 10, 2024
1 parent f2406a9 commit 1e55899
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.ml.common.exception.MLException;

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE;

/**
* A circuit breaker for disk usage.
*/
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
// TODO: make this value configurable as cluster setting
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Integer> {
private static final String ML_DISK_CB = "Disk Circuit Breaker";
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
public static final int DEFAULT_DISK_SHORTAGE_THRESHOLD = 5;
private static final long GB = 1024 * 1024 * 1024;
private String diskDir;

public DiskCircuitBreaker(String diskDir) {
super(DEFAULT_DISK_SHORTAGE_THRESHOLD);
this.diskDir = diskDir;
}
private final File diskDir;

public DiskCircuitBreaker(long threshold, String diskDir) {
super(threshold);
public DiskCircuitBreaker(Settings settings, ClusterService clusterService, File diskDir) {
super(Optional.ofNullable(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.get(settings)).orElse(DEFAULT_DISK_SHORTAGE_THRESHOLD));
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE, super::setThreshold);
this.diskDir = diskDir;
}

Expand All @@ -42,7 +42,7 @@ public String getName() {
public boolean isOpen() {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
return diskDir.getFreeSpace() / GB < getThreshold(); // in GB
});
} catch (PrivilegedActionException e) {
throw new MLException("Failed to run disk circuit breaker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.breaker;

import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -76,7 +77,7 @@ public MLCircuitBreakerService init(Path path) {
// Register memory circuit breaker
registerBreaker(BreakerName.MEMORY, new MemoryCircuitBreaker(this.settings, this.clusterService, this.jvmService));
log.info("Registered ML memory breaker.");
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(this.settings, this.clusterService, new File(path.toString())));
log.info("Registered ML disk breaker.");
// Register native memory circuit breaker, disabling due to unstability.
// registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmService;

import java.util.Optional;

/**
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
// TODO: make this value configurable as cluster setting
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;
private volatile Integer jvmHeapMemThreshold = 85;

public MemoryCircuitBreaker(JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
Expand All @@ -32,21 +32,16 @@ public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
}

public MemoryCircuitBreaker(Settings settings, ClusterService clusterService, JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
super(Optional.ofNullable(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings)).map(Integer::shortValue).orElse(DEFAULT_JVM_HEAP_USAGE_THRESHOLD));
this.jvmService = jvmService;
this.jvmHeapMemThreshold = ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> jvmHeapMemThreshold = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

@Override
public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.jvmHeapMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.os.OsService;

import java.util.Optional;

/**
* A circuit breaker for native memory usage.
*/
public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
private final OsService osService;
private volatile Integer nativeMemThreshold = 90;

public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
super(Optional.ofNullable(ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings)).map(Integer::shortValue).orElse(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD));
this.osService = osService;
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
super(threshold.shortValue());
this.nativeMemThreshold = threshold;
this.osService = osService;
}

Expand All @@ -38,13 +37,8 @@ public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.nativeMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
return osService.stats().getMem().getUsedPercent() > getThreshold();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.ml.breaker;

import lombok.Data;

/**
* An abstract class for all breakers with threshold.
* @param <T> data type of threshold
*/
@Data
public abstract class ThresholdCircuitBreaker<T> implements CircuitBreaker {

private T threshold;
Expand All @@ -17,10 +20,6 @@ public ThresholdCircuitBreaker(T threshold) {
this.threshold = threshold;
}

public T getThreshold() {
return threshold;
}

@Override
public abstract boolean isOpen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE,
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX,
MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE,
MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_EXCLUDE_NODE_NAMES,
MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ private MLCommonsSettings() {}
public static final Setting<Integer> ML_COMMONS_JVM_HEAP_MEM_THRESHOLD = Setting
.intSetting("plugins.ml_commons.jvm_heap_memory_threshold", 85, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE = Setting
.intSetting("plugins.ml_commons.disk_free_space_min_value", 5, 0, Integer.MAX_VALUE, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<String> ML_COMMONS_EXCLUDE_NODE_NAMES = Setting
.simpleString("plugins.ml_commons.exclude_nodes._name", Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<Boolean> ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN = Setting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.ml.breaker;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

import java.io.File;
import java.util.HashSet;
import java.util.List;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE;

public class DiskCircuitBreakerTests {
@Mock
ClusterService clusterService;

@Mock
File file;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, new HashSet<>(List.of(
ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE))));
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsHigherThanMinValue_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file);
when(file.getFreeSpace()).thenReturn(5 * 1024 * 1024 * 1024L);
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsLessThanMinValue_breakerIsOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file);
when(file.getFreeSpace()).thenReturn(4 * 1024 * 1024 * 1024L);
Assert.assertTrue(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceConfiguredToZero_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file);
when(file.getFreeSpace()).thenReturn((long)(Math.random() * 1024 * 1024 * 1024 * 1024L));
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_getName() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.EMPTY, clusterService, file);
Assert.assertEquals("Disk Circuit Breaker", breaker.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void setupSettings() throws IOException {
+ " \"persistent\" : {\n"
+ " \"plugins.ml_commons.jvm_heap_memory_threshold\" : 100, \n"
+ " \"plugins.ml_commons.native_memory_threshold\" : 100 \n"
+ " \"plugins.ml_commons.disk_free_space_min_value\" : 0 \n"
+ " }\n"
+ "}";
response = TestHelper
Expand Down

0 comments on commit 1e55899

Please sign in to comment.