From 1e558995966b3d047d5515625b0b50bbbe663278 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Wed, 10 Jul 2024 18:49:50 +0800 Subject: [PATCH] change disk circuit breaker to cluster settings Signed-off-by: zane-neo --- .../ml/breaker/DiskCircuitBreaker.java | 24 +++---- .../ml/breaker/MLCircuitBreakerService.java | 3 +- .../ml/breaker/MemoryCircuitBreaker.java | 13 ++-- .../breaker/NativeMemoryCircuitBreaker.java | 16 ++--- .../ml/breaker/ThresholdCircuitBreaker.java | 7 +- .../ml/plugin/MachineLearningPlugin.java | 1 + .../ml/settings/MLCommonsSettings.java | 3 + .../ml/breaker/DiskCircuitBreakerTests.java | 66 +++++++++++++++++++ .../ml/rest/MLCommonsRestTestCase.java | 1 + 9 files changed, 97 insertions(+), 37 deletions(-) create mode 100644 plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java index 2b5dd6e060..2764624a4c 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java @@ -9,26 +9,26 @@ import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.Optional; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.ml.common.exception.MLException; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE; + /** * A circuit breaker for disk usage. */ -public class DiskCircuitBreaker extends ThresholdCircuitBreaker { - // TODO: make this value configurable as cluster setting +public class DiskCircuitBreaker extends ThresholdCircuitBreaker { private static final String ML_DISK_CB = "Disk Circuit Breaker"; - public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L; + public static final int DEFAULT_DISK_SHORTAGE_THRESHOLD = 5; private static final long GB = 1024 * 1024 * 1024; - private String diskDir; - - public DiskCircuitBreaker(String diskDir) { - super(DEFAULT_DISK_SHORTAGE_THRESHOLD); - this.diskDir = diskDir; - } + private final File diskDir; - public DiskCircuitBreaker(long threshold, String diskDir) { - super(threshold); + public DiskCircuitBreaker(Settings settings, ClusterService clusterService, File diskDir) { + super(Optional.ofNullable(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.get(settings)).orElse(DEFAULT_DISK_SHORTAGE_THRESHOLD)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE, super::setThreshold); this.diskDir = diskDir; } @@ -42,7 +42,7 @@ public String getName() { public boolean isOpen() { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB + return diskDir.getFreeSpace() / GB < getThreshold(); // in GB }); } catch (PrivilegedActionException e) { throw new MLException("Failed to run disk circuit breaker"); diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java index 156c71b69c..8db1b72ffc 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java @@ -5,6 +5,7 @@ package org.opensearch.ml.breaker; +import java.io.File; import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,7 +77,7 @@ public MLCircuitBreakerService init(Path path) { // Register memory circuit breaker registerBreaker(BreakerName.MEMORY, new MemoryCircuitBreaker(this.settings, this.clusterService, this.jvmService)); log.info("Registered ML memory breaker."); - registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString())); + registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(this.settings, this.clusterService, new File(path.toString()))); log.info("Registered ML disk breaker."); // Register native memory circuit breaker, disabling due to unstability. // registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService)); diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java index c1287ef481..a618f8f8ba 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java @@ -11,15 +11,15 @@ import org.opensearch.common.settings.Settings; import org.opensearch.monitor.jvm.JvmService; +import java.util.Optional; + /** * A circuit breaker for memory usage. */ public class MemoryCircuitBreaker extends ThresholdCircuitBreaker { - // TODO: make this value configurable as cluster setting private static final String ML_MEMORY_CB = "Memory Circuit Breaker"; public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85; private final JvmService jvmService; - private volatile Integer jvmHeapMemThreshold = 85; public MemoryCircuitBreaker(JvmService jvmService) { super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD); @@ -32,10 +32,9 @@ public MemoryCircuitBreaker(short threshold, JvmService jvmService) { } public MemoryCircuitBreaker(Settings settings, ClusterService clusterService, JvmService jvmService) { - super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD); + super(Optional.ofNullable(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings)).map(Integer::shortValue).orElse(DEFAULT_JVM_HEAP_USAGE_THRESHOLD)); this.jvmService = jvmService; - this.jvmHeapMemThreshold = ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> jvmHeapMemThreshold = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue())); } @Override @@ -43,10 +42,6 @@ public String getName() { return ML_MEMORY_CB; } - @Override - public Short getThreshold() { - return this.jvmHeapMemThreshold.shortValue(); - } @Override public boolean isOpen() { diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java index 195a017648..d297807cdf 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java @@ -11,6 +11,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.monitor.os.OsService; +import java.util.Optional; + /** * A circuit breaker for native memory usage. */ @@ -18,18 +20,15 @@ public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker { private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker"; public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90; private final OsService osService; - private volatile Integer nativeMemThreshold = 90; public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) { - super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD); + super(Optional.ofNullable(ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings)).map(Integer::shortValue).orElse(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD)); this.osService = osService; - this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue())); } public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) { super(threshold.shortValue()); - this.nativeMemThreshold = threshold; this.osService = osService; } @@ -38,13 +37,8 @@ public String getName() { return ML_MEMORY_CB; } - @Override - public Short getThreshold() { - return this.nativeMemThreshold.shortValue(); - } - @Override public boolean isOpen() { - return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue(); + return osService.stats().getMem().getUsedPercent() > getThreshold(); } } diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java index 1feec444ab..bab8b0558c 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java @@ -5,10 +5,13 @@ package org.opensearch.ml.breaker; +import lombok.Data; + /** * An abstract class for all breakers with threshold. * @param data type of threshold */ +@Data public abstract class ThresholdCircuitBreaker implements CircuitBreaker { private T threshold; @@ -17,10 +20,6 @@ public ThresholdCircuitBreaker(T threshold) { this.threshold = threshold; } - public T getThreshold() { - return threshold; - } - @Override public abstract boolean isOpen(); } diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index 73fd060a92..de1bb9901b 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -911,6 +911,7 @@ public List> getSettings() { MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE, MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX, MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD, + MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE, MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, MLCommonsSettings.ML_COMMONS_EXCLUDE_NODE_NAMES, MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN, diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java index 84bdef95fe..76bc69d260 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java @@ -77,6 +77,9 @@ private MLCommonsSettings() {} public static final Setting ML_COMMONS_JVM_HEAP_MEM_THRESHOLD = Setting .intSetting("plugins.ml_commons.jvm_heap_memory_threshold", 85, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE = Setting + .intSetting("plugins.ml_commons.disk_free_space_min_value", 5, 0, Integer.MAX_VALUE, Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ML_COMMONS_EXCLUDE_NODE_NAMES = Setting .simpleString("plugins.ml_commons.exclude_nodes._name", Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN = Setting diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java new file mode 100644 index 0000000000..553676c456 --- /dev/null +++ b/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java @@ -0,0 +1,66 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.ml.breaker; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +import java.io.File; +import java.util.HashSet; +import java.util.List; + +import static org.mockito.Mockito.when; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE; + +public class DiskCircuitBreakerTests { + @Mock + ClusterService clusterService; + + @Mock + File file; + + @Before + public void setup() { + MockitoAnnotations.openMocks(this); + when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, new HashSet<>(List.of( + ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE)))); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceIsHigherThanMinValue_breakerIsNotOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file); + when(file.getFreeSpace()).thenReturn(5 * 1024 * 1024 * 1024L); + Assert.assertFalse(breaker.isOpen()); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceIsLessThanMinValue_breakerIsOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file); + when(file.getFreeSpace()).thenReturn(4 * 1024 * 1024 * 1024L); + Assert.assertTrue(breaker.isOpen()); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceConfiguredToZero_breakerIsNotOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker(Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_MIN_VALUE.getKey(), 5).build(), clusterService, file); + when(file.getFreeSpace()).thenReturn((long)(Math.random() * 1024 * 1024 * 1024 * 1024L)); + Assert.assertFalse(breaker.isOpen()); + } + + @Test + public void test_getName() { + CircuitBreaker breaker = new DiskCircuitBreaker(Settings.EMPTY, clusterService, file); + Assert.assertEquals("Disk Circuit Breaker", breaker.getName()); + } +} diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index 8e013d4305..746391d565 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -179,6 +179,7 @@ public void setupSettings() throws IOException { + " \"persistent\" : {\n" + " \"plugins.ml_commons.jvm_heap_memory_threshold\" : 100, \n" + " \"plugins.ml_commons.native_memory_threshold\" : 100 \n" + + " \"plugins.ml_commons.disk_free_space_min_value\" : 0 \n" + " }\n" + "}"; response = TestHelper