From 475c37a4c5c91c1d0e7daa1256d7b689a1e1b475 Mon Sep 17 00:00:00 2001 From: Ketan Verma Date: Mon, 31 Oct 2022 20:06:59 +0530 Subject: [PATCH] [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap Signed-off-by: Ketan Verma --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 14 +- .../opensearch/common/util/TokenBucket.java | 6 +- .../SearchBackpressureService.java | 18 ++- .../settings/SearchBackpressureSettings.java | 57 +++++-- .../settings/SearchShardTaskSettings.java | 128 ++------------- .../trackers/CpuUsageTracker.java | 82 ++++++++++ .../trackers/ElapsedTimeTracker.java | 86 ++++++++++ .../trackers/HeapUsageTracker.java | 148 ++++++++++++++++++ .../trackers/TaskResourceUsageTracker.java | 4 +- .../TaskResourceUsageTrackerType.java | 28 ++++ .../opensearch/tasks/TaskCancellation.java | 5 +- .../common/{ => util}/StreakTests.java | 3 +- .../SearchBackpressureServiceTests.java | 6 +- .../trackers/CpuUsageTrackerTests.java | 48 ++++++ .../trackers/ElapsedTimeTrackerTests.java | 49 ++++++ .../trackers/HeapUsageTrackerTests.java | 83 ++++++++++ .../tasks/TaskCancellationTests.java | 2 +- 18 files changed, 611 insertions(+), 157 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java rename server/src/test/java/org/opensearch/common/{ => util}/StreakTests.java (92%) create mode 100644 server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java create mode 100644 server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java create mode 100644 server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 16a607cb40393..b75c3b6626f81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update GeoGrid base class access modifier to support extensibility ([#4921](https://github.com/opensearch-project/OpenSearch/pull/4921)) - Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902)) - Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565)) +- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805)) ### Dependencies - Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0 diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d3719b0815496..f81a0bfee7800 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -45,6 +45,9 @@ import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; @@ -596,11 +599,12 @@ public void apply(Settings value, Settings current, Settings previous) { NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES, NodeDuressSettings.SETTING_CPU_THRESHOLD, NodeDuressSettings.SETTING_HEAP_THRESHOLD, - SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, - SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD, - SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD + SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD, + ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/common/util/TokenBucket.java b/server/src/main/java/org/opensearch/common/util/TokenBucket.java index e47f152d71363..d2e7e836bf07f 100644 --- a/server/src/main/java/org/opensearch/common/util/TokenBucket.java +++ b/server/src/main/java/org/opensearch/common/util/TokenBucket.java @@ -101,9 +101,9 @@ public boolean request() { */ private static class State { final double tokens; - final double lastRefilledAt; + final long lastRefilledAt; - public State(double tokens, double lastRefilledAt) { + public State(double tokens, long lastRefilledAt) { this.tokens = tokens; this.lastRefilledAt = lastRefilledAt; } @@ -113,7 +113,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; State state = (State) o; - return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0; + return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt; } @Override diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 885846a177d60..66efa832dec37 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -17,6 +17,9 @@ import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; @@ -29,7 +32,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -84,7 +86,7 @@ public SearchBackpressureService( () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() ) ), - Collections.emptyList() + List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime)) ); } @@ -97,7 +99,7 @@ public SearchBackpressureService( List taskResourceUsageTrackers ) { this.settings = settings; - this.settings.setListener(this); + this.settings.addListener(this); this.taskResourceTrackingService = taskResourceTrackingService; this.taskResourceTrackingService.addTaskCompletionListener(this); this.threadPool = threadPool; @@ -181,10 +183,10 @@ boolean isNodeInDuress() { * Returns true if the increase in heap usage is due to search requests. */ boolean isHeapUsageDominatedBySearch(List searchShardTasks) { - long runningTasksHeapUsage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); - long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapThresholdBytes(); - if (runningTasksHeapUsage < searchTasksHeapThreshold) { - logger.debug("heap usage not dominated by search requests [{}/{}]", runningTasksHeapUsage, searchTasksHeapThreshold); + long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); + long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold(); + if (usage < threshold) { + logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold); return false; } @@ -213,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) { List callbacks = new ArrayList<>(); for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { - Optional reason = tracker.cancellationReason(task); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); if (reason.isPresent()) { reasons.add(reason.get()); callbacks.add(tracker::incrementCancellations); diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index 4834808d768f1..5ceb01666757f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -8,13 +8,16 @@ package org.opensearch.search.backpressure.settings; -import org.apache.lucene.util.SetOnce; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Settings related to search backpressure and cancellation of in-flight requests. @@ -23,7 +26,7 @@ */ public class SearchBackpressureSettings { private static class Defaults { - private static final long INTERVAL = 1000; + private static final long INTERVAL_MILLIS = 1000; private static final boolean ENABLED = true; private static final boolean ENFORCED = false; @@ -37,9 +40,9 @@ private static class Defaults { * Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks. */ private final TimeValue interval; - public static final Setting SETTING_INTERVAL = Setting.longSetting( - "search_backpressure.interval", - Defaults.INTERVAL, + public static final Setting SETTING_INTERVAL_MILLIS = Setting.longSetting( + "search_backpressure.interval_millis", + Defaults.INTERVAL_MILLIS, 1, Setting.Property.NodeScope ); @@ -116,15 +119,19 @@ public interface Listener { void onCancellationBurstChanged(); } - private final SetOnce listener = new SetOnce<>(); + private final List listeners = new ArrayList<>(); + private final Settings settings; + private final ClusterSettings clusterSettings; private final NodeDuressSettings nodeDuressSettings; private final SearchShardTaskSettings searchShardTaskSettings; public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { + this.settings = settings; + this.clusterSettings = clusterSettings; this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings); - interval = new TimeValue(SETTING_INTERVAL.get(settings)); + interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings)); enabled = SETTING_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled); @@ -142,8 +149,16 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); } - public void setListener(Listener listener) { - this.listener.set(listener); + public void addListener(Listener listener) { + listeners.add(listener); + } + + public Settings getSettings() { + return settings; + } + + public ClusterSettings getClusterSettings() { + return clusterSettings; } public NodeDuressSettings getNodeDuressSettings() { @@ -180,9 +195,7 @@ public double getCancellationRatio() { private void setCancellationRatio(double cancellationRatio) { this.cancellationRatio = cancellationRatio; - if (listener.get() != null) { - listener.get().onCancellationRatioChanged(); - } + notifyListeners(Listener::onCancellationRatioChanged); } public double getCancellationRate() { @@ -195,9 +208,7 @@ public double getCancellationRateNanos() { private void setCancellationRate(double cancellationRate) { this.cancellationRate = cancellationRate; - if (listener.get() != null) { - listener.get().onCancellationRateChanged(); - } + notifyListeners(Listener::onCancellationRateChanged); } public double getCancellationBurst() { @@ -206,8 +217,20 @@ public double getCancellationBurst() { private void setCancellationBurst(double cancellationBurst) { this.cancellationBurst = cancellationBurst; - if (listener.get() != null) { - listener.get().onCancellationBurstChanged(); + notifyListeners(Listener::onCancellationBurstChanged); + } + + private void notifyListeners(Consumer consumer) { + List exceptions = new ArrayList<>(); + + for (Listener listener : listeners) { + try { + consumer.accept(listener); + } catch (Exception e) { + exceptions.add(e); + } } + + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java index 1126dad78f554..7e40f1c0eab53 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -22,139 +22,37 @@ public class SearchShardTaskSettings { private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); private static class Defaults { - private static final double TOTAL_HEAP_THRESHOLD = 0.05; - private static final double HEAP_THRESHOLD = 0.005; - private static final double HEAP_VARIANCE_THRESHOLD = 2.0; - private static final long CPU_TIME_THRESHOLD = 15; - private static final long ELAPSED_TIME_THRESHOLD = 30000; + private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; } /** * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search shard tasks * before in-flight cancellation is applied. */ - private volatile double totalHeapThreshold; - public static final Setting SETTING_TOTAL_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.total_heap_threshold", - Defaults.TOTAL_HEAP_THRESHOLD, + private volatile double totalHeapPercentThreshold; + public static final Setting SETTING_TOTAL_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.total_heap_percent_threshold", + Defaults.TOTAL_HEAP_PERCENT_THRESHOLD, 0.0, 1.0, Setting.Property.Dynamic, Setting.Property.NodeScope ); - /** - * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. - */ - private volatile double heapThreshold; - public static final Setting SETTING_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_threshold", - Defaults.HEAP_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage variance for an individual task before it is considered for cancellation. - * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. - */ - private volatile double heapVarianceThreshold; - public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_variance", - Defaults.HEAP_VARIANCE_THRESHOLD, - 0.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long cpuTimeThreshold; - public static final Setting SETTING_CPU_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.cpu_time_threshold", - Defaults.CPU_TIME_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long elapsedTimeThreshold; - public static final Setting SETTING_ELAPSED_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.elapsed_time_threshold", - Defaults.ELAPSED_TIME_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) { - totalHeapThreshold = SETTING_TOTAL_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_THRESHOLD, this::setTotalHeapThreshold); - - heapThreshold = SETTING_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_THRESHOLD, this::setHeapThreshold); - - heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); - - cpuTimeThreshold = SETTING_CPU_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_THRESHOLD, this::setCpuTimeThreshold); - - elapsedTimeThreshold = SETTING_ELAPSED_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_THRESHOLD, this::setElapsedTimeThreshold); - } - - public double getTotalHeapThreshold() { - return totalHeapThreshold; - } - - public long getTotalHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getTotalHeapThreshold()); - } - - private void setTotalHeapThreshold(double totalHeapThreshold) { - this.totalHeapThreshold = totalHeapThreshold; - } - - public double getHeapThreshold() { - return heapThreshold; - } - - public long getHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getHeapThreshold()); - } - - private void setHeapThreshold(double heapThreshold) { - this.heapThreshold = heapThreshold; - } - - public double getHeapVarianceThreshold() { - return heapVarianceThreshold; - } - - private void setHeapVarianceThreshold(double heapVarianceThreshold) { - this.heapVarianceThreshold = heapVarianceThreshold; - } - - public long getCpuTimeThreshold() { - return cpuTimeThreshold; + totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold); } - private void setCpuTimeThreshold(long cpuTimeThreshold) { - this.cpuTimeThreshold = cpuTimeThreshold; + public double getTotalHeapPercentThreshold() { + return totalHeapPercentThreshold; } - public long getElapsedTimeThreshold() { - return elapsedTimeThreshold; + public long getTotalHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * getTotalHeapPercentThreshold()); } - private void setElapsedTimeThreshold(long elapsedTimeThreshold) { - this.elapsedTimeThreshold = elapsedTimeThreshold; + private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { + this.totalHeapPercentThreshold = totalHeapPercentThreshold; } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java new file mode 100644 index 0000000000000..fa4f88d8a54e6 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; + +/** + * CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed. + * + * @opensearch.internal + */ +public class CpuUsageTracker extends TaskResourceUsageTracker { + private static class Defaults { + private static final long CPU_TIME_MILLIS_THRESHOLD = 15000; + } + + /** + * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long cpuTimeMillisThreshold; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.cpu_time_millis_threshold", + Defaults.CPU_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public CpuUsageTracker(SearchBackpressureSettings settings) { + this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); + } + + @Override + public String name() { + return CPU_USAGE_TRACKER.getName(); + } + + @Override + public Optional checkAndMaybeGetCancellationReason(Task task) { + long usage = task.getTotalResourceStats().getCpuTimeInNanos(); + long threshold = getCpuTimeNanosThreshold(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "cpu usage exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + } + + public long getCpuTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); + } + + public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { + this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java new file mode 100644 index 0000000000000..d76ccdb76db36 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER; + +/** + * ElapsedTimeTracker evaluates if the task has been running for more time than allowed. + * + * @opensearch.internal + */ +public class ElapsedTimeTracker extends TaskResourceUsageTracker { + private static class Defaults { + private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; + } + + /** + * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long elapsedTimeMillisThreshold; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.elapsed_time_millis_threshold", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final LongSupplier timeNanosSupplier; + + public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) { + this.timeNanosSupplier = timeNanosSupplier; + this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); + } + + @Override + public String name() { + return ELAPSED_TIME_TRACKER.getName(); + } + + @Override + public Optional checkAndMaybeGetCancellationReason(Task task) { + long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); + long threshold = getElapsedTimeNanosThreshold(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "elapsed time exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + } + + public long getElapsedTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); + } + + public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { + this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java new file mode 100644 index 0000000000000..983a2e1152511 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER; + +/** + * HeapUsageTracker evaluates if the task has consumed too much heap than allowed. + * It also compares the task's heap usage against a historical moving average of previously completed tasks. + * + * @opensearch.internal + */ +public class HeapUsageTracker extends TaskResourceUsageTracker { + private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + + private static class Defaults { + private static final double HEAP_PERCENT_THRESHOLD = 0.005; + private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + } + + /** + * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. + */ + private volatile double heapPercentThreshold; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_percent_threshold", + Defaults.HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThreshold; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_variance", + Defaults.HEAP_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the window size to calculate the moving average of heap usage of completed tasks. + */ + private volatile int heapMovingAverageWindowSize; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "search_backpressure.search_shard_task.heap_moving_average_window_size", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final AtomicReference movingAverageReference; + + public HeapUsageTracker(SearchBackpressureSettings settings) { + heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); + + heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + + heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + + this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); + } + + @Override + public String name() { + return HEAP_USAGE_TRACKER.getName(); + } + + @Override + public void update(Task task) { + movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); + } + + @Override + public Optional checkAndMaybeGetCancellationReason(Task task) { + MovingAverage movingAverage = movingAverageReference.get(); + + // There haven't been enough measurements. + if (movingAverage.isReady() == false) { + return Optional.empty(); + } + + double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); + double averageUsage = movingAverage.getAverage(); + double allowedUsage = averageUsage * getHeapVarianceThreshold(); + + if (currentUsage < getHeapBytesThreshold() || currentUsage < allowedUsage) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]", + (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight + ) + ); + } + + public long getHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * heapPercentThreshold); + } + + public void setHeapPercentThreshold(double heapPercentThreshold) { + this.heapPercentThreshold = heapPercentThreshold; + } + + public double getHeapVarianceThreshold() { + return heapVarianceThreshold; + } + + public void setHeapVarianceThreshold(double heapVarianceThreshold) { + this.heapVarianceThreshold = heapVarianceThreshold; + } + + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { + this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; + this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java index 8f1842efa5771..1765dee42ae15 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java @@ -41,10 +41,10 @@ public long getCancellations() { /** * Notifies the tracker to update its state when a task execution completes. */ - public abstract void update(Task task); + public void update(Task task) {} /** * Returns the cancellation reason for the given task, if it's eligible for cancellation. */ - public abstract Optional cancellationReason(Task task); + public abstract Optional checkAndMaybeGetCancellationReason(Task task); } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java new file mode 100644 index 0000000000000..7a74321241534 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackerType.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +/** + * Defines the type of TaskResourceUsageTracker. + */ +public enum TaskResourceUsageTrackerType { + CPU_USAGE_TRACKER("cpu_usage_tracker"), + HEAP_USAGE_TRACKER("heap_usage_tracker"), + ELAPSED_TIME_TRACKER("elapsed_time_tracker"); + + private final String name; + + TaskResourceUsageTrackerType(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java index 2c302172cf6bc..d09312f38e3eb 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java @@ -15,7 +15,10 @@ import java.util.stream.Collectors; /** - * TaskCancellation is a wrapper for a task and its cancellation reasons. + * TaskCancellation represents a task eligible for cancellation. + * It doesn't guarantee that the task will actually get cancelled or not; that decision is left to the caller. + * + * It contains a list of cancellation reasons along with callbacks that are invoked when cancel() is called. * * @opensearch.internal */ diff --git a/server/src/test/java/org/opensearch/common/StreakTests.java b/server/src/test/java/org/opensearch/common/util/StreakTests.java similarity index 92% rename from server/src/test/java/org/opensearch/common/StreakTests.java rename to server/src/test/java/org/opensearch/common/util/StreakTests.java index 80080ad3e4027..682a28d3a3a8b 100644 --- a/server/src/test/java/org/opensearch/common/StreakTests.java +++ b/server/src/test/java/org/opensearch/common/util/StreakTests.java @@ -6,9 +6,8 @@ * compatible open source license. */ -package org.opensearch.common; +package org.opensearch.common.util; -import org.opensearch.common.util.Streak; import org.opensearch.test.OpenSearchTestCase; public class StreakTests extends OpenSearchTestCase { diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index ac5a8229718ba..7e16e55e16e59 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -132,7 +132,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { return Optional.empty(); } @@ -167,10 +167,10 @@ public Optional cancellationReason(Task task) { service.doRun(); service.doRun(); - // Mocking 'settings' with predictable searchHeapThresholdBytes so that cancellation logic doesn't get skipped. + // Mocking 'settings' with predictable totalHeapBytesThreshold so that cancellation logic doesn't get skipped. long taskHeapUsageBytes = 500; SearchShardTaskSettings shardTaskSettings = mock(SearchShardTaskSettings.class); - when(shardTaskSettings.getTotalHeapThresholdBytes()).thenReturn(taskHeapUsageBytes); + when(shardTaskSettings.getTotalHeapBytesThreshold()).thenReturn(taskHeapUsageBytes); when(settings.getSearchShardTaskSettings()).thenReturn(shardTaskSettings); // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java new file mode 100644 index 0000000000000..c790fb2e60eea --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class CpuUsageTrackerTests extends OpenSearchTestCase { + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + public void testEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertFalse(reason.isPresent()); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java new file mode 100644 index 0000000000000..67ed6059a1914 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class ElapsedTimeTrackerTests extends OpenSearchTestCase { + + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + public void testEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); + ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); + ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertFalse(reason.isPresent()); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java new file mode 100644 index 0000000000000..b9967da22fbf1 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class HeapUsageTrackerTests extends OpenSearchTestCase { + private static final long HEAP_BYTES_THRESHOLD = 100; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 2.0) + .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), HEAP_MOVING_AVERAGE_WINDOW_SIZE) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + public void testEligibleForCancellation() { + HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); + when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); + + // Record enough observations to make the moving average 'ready'. + for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { + tracker.update(task); + } + + // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(4, reason.get().getCancellationScore()); + assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task; + Optional reason; + HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); + when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); + + // Task with heap usage < heapBytesThreshold. + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); + + // Not enough observations. + reason = tracker.checkAndMaybeGetCancellationReason(task); + assertFalse(reason.isPresent()); + + // Record enough observations to make the moving average 'ready'. + for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { + tracker.update(task); + } + + // Task with heap usage < heapBytesThreshold should not be cancelled. + reason = tracker.checkAndMaybeGetCancellationReason(task); + assertFalse(reason.isPresent()); + + // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled. + double allowedHeapUsage = 99.0 * 2.0; + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); + reason = tracker.checkAndMaybeGetCancellationReason(task); + assertFalse(reason.isPresent()); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index d6a82702e074d..50a510f954677 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -64,7 +64,7 @@ public String name() { public void update(Task task) {} @Override - public Optional cancellationReason(Task task) { + public Optional checkAndMaybeGetCancellationReason(Task task) { return Optional.empty(); } };