diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index dfecf4f462c4d..fa935c8792581 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -37,6 +37,7 @@ import org.opensearch.core.tasks.TaskId; import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.sandboxing.SandboxTask; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; @@ -50,9 +51,10 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class SearchShardTask extends CancellableTask implements SearchBackpressureTask { +public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, SandboxTask { // generating metadata in a lazy way since source can be quite big private final MemoizedSupplier metadataSupplier; + private String sandboxId; public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, () -> ""); @@ -84,4 +86,14 @@ public boolean supportsResourceTracking() { public boolean shouldCancelChildrenOnCancellation() { return false; } + + @Override + public void setSandboxId(String sandboxId) { + this.sandboxId = sandboxId; + } + + @Override + public String getSandboxId() { + return "sandboxId"; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index d3c1043c50cce..d775c39ab477d 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -35,6 +35,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.tasks.TaskId; +import org.opensearch.search.sandboxing.SandboxTask; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; @@ -49,10 +50,11 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class SearchTask extends CancellableTask implements SearchBackpressureTask { +public class SearchTask extends CancellableTask implements SearchBackpressureTask, SandboxTask { // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; + private String sandboxId; public SearchTask( long id, @@ -106,4 +108,13 @@ public final SearchProgressListener getProgressListener() { public boolean shouldCancelChildrenOnCancellation() { return true; } + + public void setSandboxId(String sandboxId) { + this.sandboxId = sandboxId; + } + + @Override + public String getSandboxId() { + return "sandboxId"; + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index d016501dd0910..5b682708294e2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -126,7 +126,12 @@ public boolean isSegmentReplicationEnabled(String indexName) { .orElse(false); } - /** + public Map sandboxes() { + // stub + return Collections.emptyMap(); + } + + /** * Context of the XContent. * * @opensearch.api diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Sandbox.java b/server/src/main/java/org/opensearch/cluster/metadata/Sandbox.java new file mode 100644 index 0000000000000..c868be280ea4b --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/Sandbox.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; + +import java.util.Collections; +import java.util.List; + +@ExperimentalApi +public class Sandbox { + //TODO Kaushal should have implemented hashcode and equals + private SandboxMode mode; + + public SandboxMode getMode() { + return mode; + } + + public ResourceLimit getResourceLimitFor(SandboxResourceType resourceType) { + return null; + } + + public String getName() { + return ""; + } + + public String getId() { + return ""; + } + + public List getResourceLimits() { + return Collections.emptyList(); + } + + @ExperimentalApi + public class ResourceLimit { + public Long getThresholdInLong() { + return 0L; + } + + public SandboxResourceType getResourceType() { + return null; + } + + public Long getThreshold() { + return 0L; + } + } + + @ExperimentalApi + public enum SandboxMode { + SOFT("soft"), + ENFORCED("enforced"), + MONITOR("monitor"); + + private final String name; + + SandboxMode(String mode) { + this.name = mode; + } + + public String getName() { + return name; + } + + public static SandboxMode fromName(String s) { + switch (s) { + case "soft": + return SOFT; + case "enforced": + return ENFORCED; + case "monitor": + return MONITOR; + default: + throw new IllegalArgumentException("Invalid value for SandboxMode: " + s); + } + } + + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/SandboxLevelResourceUsageView.java b/server/src/main/java/org/opensearch/search/sandboxing/SandboxLevelResourceUsageView.java new file mode 100644 index 0000000000000..aa181bb85c3c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/SandboxLevelResourceUsageView.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.tasks.Task; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@ExperimentalApi +public class SandboxLevelResourceUsageView { + + private final String sandboxId; + private final Map resourceUsage; + private final List activeTasks; + + public SandboxLevelResourceUsageView(String sandboxId) { + this.sandboxId = sandboxId; + this.resourceUsage = new HashMap<>(); + this.activeTasks = new ArrayList<>(); + } + + public SandboxLevelResourceUsageView(String sandboxId, Map resourceUsage, List activeTasks) { + this.sandboxId = sandboxId; + this.resourceUsage = resourceUsage; + this.activeTasks = activeTasks; + } + + public Map getResourceUsageData() { + return resourceUsage; + } + + public List getActiveTasks() { + return activeTasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SandboxLevelResourceUsageView that = (SandboxLevelResourceUsageView) o; + return Objects.equals(sandboxId, that.sandboxId); + } + + @Override + public int hashCode() { + return Objects.hashCode(sandboxId); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/SandboxService.java b/server/src/main/java/org/opensearch/search/sandboxing/SandboxService.java new file mode 100644 index 0000000000000..662258bb7fd03 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/SandboxService.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.Sandbox; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.search.sandboxing.cancellation.DefaultTaskCancellation; +import org.opensearch.search.sandboxing.cancellation.LongestRunningTaskFirstStrategy; +import org.opensearch.search.sandboxing.tracker.SandboxUsageTracker; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Main service which will run periodically to track and cancel resource constraint violating tasks in sandboxes + */ +public class SandboxService extends AbstractLifecycleComponent { + private static final Logger logger = LogManager.getLogger(SandboxService.class); + + private final SandboxUsageTracker sandboxUsageTracker; + private volatile Scheduler.Cancellable scheduledFuture; + private final ThreadPool threadPool; + private final ClusterService clusterService; + + /** + * Guice managed constructor + * + * @param sandboxUsageTracker + * @param sandboxPruner + * @param sandboxServiceSettings + * @param threadPool + */ + @Inject + public SandboxService( + SandboxUsageTracker sandboxUsageTracker, + ClusterService clusterService, + ThreadPool threadPool + ) { + this.sandboxUsageTracker = sandboxUsageTracker; + this.sandboxServiceSettings = sandboxServiceSettings; + this.sandboxPruner = sandboxPruner; + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + /** + * run at regular interval + */ + private void doRun() { + Map sandboxLevelResourceUsageViews = sandboxUsageTracker.constructSandboxLevelUsageViews(); + Set activeSandboxes = getActiveSandboxes(); + DefaultTaskCancellation taskCancellation = new DefaultTaskCancellation( + new LongestRunningTaskFirstStrategy(), + sandboxLevelResourceUsageViews, + activeSandboxes + ); + taskCancellation.cancelTasks(); + // TODO Prune the sandboxes + } + + private Set getActiveSandboxes() { + return new HashSet<>(clusterService.state().metadata().sandboxes().values()); + } + + /** + * {@link AbstractLifecycleComponent} lifecycle method + */ + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + try { + doRun(); + } catch (Exception e) { + logger.debug("Exception occurred in Query Sandbox service", e); + } + }, sandboxServiceSettings.getRunIntervalMillis(), ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() throws IOException { + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/SandboxTask.java b/server/src/main/java/org/opensearch/search/sandboxing/SandboxTask.java new file mode 100644 index 0000000000000..92aba3759f115 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/SandboxTask.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing; + +/** + * This interface can be implemented by tasks which will be tracked and monitored using {@link org.opensearch.cluster.metadata.ResourceLimitGroup} + */ +public interface SandboxTask { + void setSandboxId(String sandboxId); + + String getSandboxId(); +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskCancellation.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskCancellation.java new file mode 100644 index 0000000000000..9bf96bf63578c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskCancellation.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.cluster.metadata.Sandbox; +import org.opensearch.search.sandboxing.SandboxLevelResourceUsageView; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.search.sandboxing.tracker.SandboxResourceUsageTrackerService.TRACKED_RESOURCES; + +/** + * Abstract class that provides a structure for task cancellation. + * This class is extended by other classes to provide specific task cancellation strategies. + */ +public abstract class AbstractTaskCancellation { + // Strategy for selecting tasks to cancel + protected final TaskSelectionStrategy taskSelectionStrategy; + // Views of resource usage at the sandbox level + protected final Map sandboxLevelViews; + // Set of active sandboxes + protected final Set activeSandboxes; + + public AbstractTaskCancellation( + TaskSelectionStrategy taskSelectionStrategy, + Map sandboxLevelViews, + Set activeSandboxes + ) { + this.taskSelectionStrategy = taskSelectionStrategy; + this.sandboxLevelViews = sandboxLevelViews; + this.activeSandboxes = activeSandboxes; + } + + /** + * Cancel tasks based on the implemented strategy. + */ + public final void cancelTasks() { + List cancellableTasks = getAllCancellableTasks(); + for (TaskCancellation taskCancellation : cancellableTasks) { + taskCancellation.cancel(); + } + } + + /** + * Abstract method to get the list of sandboxes from which tasks can be cancelled. + * This method needs to be implemented by subclasses. + * + * @return List of sandboxes + */ + abstract List getSandboxesToCancelFrom(); + + /** + * Get all cancellable tasks from the sandboxes. + * + * @return List of tasks that can be cancelled + */ + protected List getAllCancellableTasks() { + return getSandboxesToCancelFrom().stream() + .flatMap(sandbox -> getCancellableTasksFrom(sandbox).stream()) + .collect(Collectors.toList()); + } + + /** + * Get cancellable tasks from a specific sandbox. + * + * @param sandbox The sandbox from which to get cancellable tasks + * @return List of tasks that can be cancelled + */ + protected List getCancellableTasksFrom(Sandbox sandbox) { + return TRACKED_RESOURCES.stream() + .filter(resourceType -> shouldCancelTasks(sandbox, resourceType)) + .flatMap(resourceType -> getTaskCancellations(sandbox, resourceType).stream()) + .collect(Collectors.toList()); + } + + private boolean shouldCancelTasks(Sandbox sandbox, SandboxResourceType resourceType) { + long reduceBy = getReduceBy(sandbox, resourceType); + return reduceBy > 0; + } + + private List getTaskCancellations(Sandbox sandbox, SandboxResourceType resourceType) { + return taskSelectionStrategy.selectTasksForCancellation( + getAllTasksInSandbox(sandbox.getId()), + getReduceBy(sandbox, resourceType), + resourceType) + .stream() + .map(task -> createTaskCancellation((CancellableTask) task)) + .collect(Collectors.toList()); + } + + private long getReduceBy(Sandbox sandbox, SandboxResourceType resourceType) { + return getUsage(sandbox, resourceType) - sandbox.getResourceLimitFor(resourceType).getThresholdInLong(); + } + + private Long getUsage(Sandbox sandbox, SandboxResourceType resourceType) { + return sandboxLevelViews.get(sandbox.getId()).getResourceUsageData().get(resourceType); + } + + private List getAllTasksInSandbox(String sandboxId) { + return sandboxLevelViews.get(sandboxId).getActiveTasks(); + } + + private TaskCancellation createTaskCancellation(CancellableTask task) { + // todo add reasons and callbacks + return new TaskCancellation(task, List.of(), List.of(this::callbackOnCancel)); + } + + private void callbackOnCancel() { + // todo Implement callback logic here + System.out.println("Task was cancelled."); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskSelectionStrategy.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskSelectionStrategy.java new file mode 100644 index 0000000000000..de0cc19090a4d --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/AbstractTaskSelectionStrategy.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.tasks.Task; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractTaskSelectionStrategy implements TaskSelectionStrategy { + + public abstract Comparator sortingCondition(); + + @Override + public List selectTasksForCancellation(List tasks, long limit, SandboxResourceType resourceType) { + if (limit < 0) { + throw new IllegalArgumentException("reduceBy has to be greater than zero"); + } + if(limit == 0) { + return Collections.emptyList(); + } + + List sortedTasks = tasks.stream() + .sorted(sortingCondition()) + .collect(Collectors.toList()); + + List selectedTasks = new ArrayList<>(); + long accumulated = 0; + + for (Task task : sortedTasks) { + selectedTasks.add(task); + accumulated += resourceType.getResourceUsage(task); + if (accumulated >= limit) { + break; + } + } + return selectedTasks; + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/DefaultTaskCancellation.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/DefaultTaskCancellation.java new file mode 100644 index 0000000000000..f5d21f8af5e4e --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/DefaultTaskCancellation.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.cluster.metadata.Sandbox; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.search.sandboxing.SandboxLevelResourceUsageView; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DefaultTaskCancellation extends AbstractTaskCancellation { + public DefaultTaskCancellation( + TaskSelectionStrategy cancellationStrategy, + Map sandboxLevelViews, + Set activeSandboxes + ) { + super(cancellationStrategy, sandboxLevelViews, activeSandboxes); + } + + /** + * // TODO + * This should cover 3 scenarios + * - if node not in duress + * - pick sandboxes in enforced mode only + * - if node in duress + * - pick sandboxes in enforced mode + * - tasks running in deleted sandboxes with tasks running + * - pick sandboxes in enforced mode + */ + public List getSandboxesToCancelFrom() { + final List sandboxesToCancelFrom = new ArrayList<>(); + + for (Sandbox sandbox : this.activeSandboxes) { + Map currentResourceUsage = getResourceUsage(sandbox.getId()); + + for (Sandbox.ResourceLimit resourceLimit : sandbox.getResourceLimits()) { + if (isBreachingThreshold(currentResourceUsage, resourceLimit)) { + sandboxesToCancelFrom.add(sandbox); + break; + } + } + } + + return sandboxesToCancelFrom; + } + + private boolean isBreachingThreshold(Map currentResourceUsage, Sandbox.ResourceLimit resourceLimit) { + return currentResourceUsage.get(resourceLimit.getResourceType()) > resourceLimit.getThreshold(); + } + + private Map getResourceUsage(String sandboxId) { + return sandboxLevelViews.get(sandboxId).getResourceUsageData(); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategy.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategy.java new file mode 100644 index 0000000000000..d97f1b26b5362 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategy.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.tasks.Task; + +import java.util.Comparator; + +public class LongestRunningTaskFirstStrategy extends AbstractTaskSelectionStrategy { + + @Override + public Comparator sortingCondition() { + return Comparator.comparingLong(Task::getStartTime).reversed(); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategy.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategy.java new file mode 100644 index 0000000000000..f4e1c7f4fe9bc --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategy.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.tasks.Task; + +import java.util.Comparator; + +public class ShortestRunningTaskFirstStrategy extends AbstractTaskSelectionStrategy { + + @Override + public Comparator sortingCondition() { + return Comparator.comparingLong(Task::getStartTime); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategy.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategy.java new file mode 100644 index 0000000000000..b5345be6dda8b --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategy.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.tasks.Task; + +import java.util.List; + +/** + * Interface for strategies to select tasks for cancellation. + * Implementations of this interface define how tasks are selected for cancellation based on resource usage. + */ +public interface TaskSelectionStrategy { + /** + * Determines which tasks should be cancelled based on the provided criteria. + * + * @param tasks List of tasks available for cancellation. + * @param limit The amount of tasks to select whose resources reach this limit + * @param resourceType The type of resource that needs to be reduced, guiding the selection process. + * + * @return List of tasks that should be cancelled. + */ + List selectTasksForCancellation(List tasks, long limit, SandboxResourceType resourceType); +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/cancellation/package-info.java b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/package-info.java new file mode 100644 index 0000000000000..7bda195ba18ff --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/cancellation/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for cancellation related abstracts + */ +package org.opensearch.search.sandboxing.cancellation; diff --git a/server/src/main/java/org/opensearch/search/sandboxing/module/SandboxModule.java b/server/src/main/java/org/opensearch/search/sandboxing/module/SandboxModule.java new file mode 100644 index 0000000000000..8da5f20ab022f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/module/SandboxModule.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.module; + +import org.opensearch.common.inject.AbstractModule; +import org.opensearch.search.sandboxing.tracker.SandboxUsageTracker; +import org.opensearch.search.sandboxing.tracker.SandboxResourceUsageTrackerService; + +/** + * Module class for resource usage limiting related artifacts + */ +public class SandboxModule extends AbstractModule { + + /** + * Default constructor + */ + public SandboxModule() {} + + @Override + protected void configure() { + bind(SandboxUsageTracker.class).to(SandboxResourceUsageTrackerService.class).asEagerSingleton(); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/module/package-info.java b/server/src/main/java/org/opensearch/search/sandboxing/module/package-info.java new file mode 100644 index 0000000000000..5be0923cc3c62 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/module/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Guice Module + */ + +package org.opensearch.search.sandboxing.module; diff --git a/server/src/main/java/org/opensearch/search/sandboxing/package-info.java b/server/src/main/java/org/opensearch/search/sandboxing/package-info.java new file mode 100644 index 0000000000000..3dfecf384df44 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Sandboxing related artifacts + */ +package org.opensearch.search.sandboxing; diff --git a/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/CpuTimeResourceType.java b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/CpuTimeResourceType.java new file mode 100644 index 0000000000000..6c4dd7fbaa320 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/CpuTimeResourceType.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.resourcetype; + +import org.opensearch.tasks.Task; + +public class CpuTimeResourceType extends SandboxResourceType { + @Override + public long getResourceUsage(Task task) { + return task.getTotalResourceStats().getCpuTimeInNanos(); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/JvmMemoryResourceType.java b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/JvmMemoryResourceType.java new file mode 100644 index 0000000000000..91f6e73364b5f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/JvmMemoryResourceType.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.resourcetype; + +import org.opensearch.tasks.Task; + +public class JvmMemoryResourceType extends SandboxResourceType { + @Override + public long getResourceUsage(Task task) { + return task.getTotalResourceStats().getMemoryInBytes(); + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/SandboxResourceType.java b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/SandboxResourceType.java new file mode 100644 index 0000000000000..9b849718e6a11 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/SandboxResourceType.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.resourcetype; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.tasks.Task; + +@ExperimentalApi +public abstract class SandboxResourceType { + public abstract long getResourceUsage(Task task); + + public static SandboxResourceType fromString(String type) { + if (type.equalsIgnoreCase("JVM")) { + return new JvmMemoryResourceType(); + } else if (type.equalsIgnoreCase("CPU")) { + return new CpuTimeResourceType(); + } else { + throw new IllegalArgumentException("Unsupported resource type: " + type); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/package-info.java b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/package-info.java new file mode 100644 index 0000000000000..97d449aa49912 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/resourcetype/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.resourcetype; \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxResourceUsageTrackerService.java b/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxResourceUsageTrackerService.java new file mode 100644 index 0000000000000..84db827944c9f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxResourceUsageTrackerService.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.tracker; + +import org.opensearch.common.inject.Inject; +import org.opensearch.search.sandboxing.SandboxLevelResourceUsageView; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.search.sandboxing.SandboxTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; +import org.opensearch.tasks.TaskResourceTrackingService; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class tracks requests per Sandbox + */ +//@ExperimentalApi +public class SandboxResourceUsageTrackerService implements SandboxUsageTracker, TaskManager.TaskEventListeners { + + public static final List TRACKED_RESOURCES = List.of(SandboxResourceType.fromString("JVM")); + + private final TaskManager taskManager; + private final TaskResourceTrackingService taskResourceTrackingService; + + /** + * SandboxResourceTrackerService constructor + * + * @param taskManager + * @param taskResourceTrackingService + */ + @Inject + public SandboxResourceUsageTrackerService( + final TaskManager taskManager, + final TaskResourceTrackingService taskResourceTrackingService + ) { + this.taskManager = taskManager; + this.taskResourceTrackingService = taskResourceTrackingService; + } + + /** + * Constructs a map of SandboxLevelResourceUsageView instances for each sandbox. + * + * @return Map of sandbox views + */ + @Override + public Map constructSandboxLevelUsageViews() { + Map sandboxViews = new HashMap<>(); + + Map> tasksBySandbox = getTasksGroupedBySandbox(); + Map> sandboxResourceUsage = getResourceUsageOfSandboxes(tasksBySandbox); + + for(String sandboxId : tasksBySandbox.keySet()) { + SandboxLevelResourceUsageView sandboxLevelResourceUsageView = new SandboxLevelResourceUsageView( + sandboxId, + sandboxResourceUsage.get(sandboxId), + tasksBySandbox.get(sandboxId) + ); + sandboxViews.put(sandboxId, sandboxLevelResourceUsageView); + } + return sandboxViews; + } + + /** + * Groups tasks by their associated sandbox. + * + * @return Map of tasks grouped by sandbox + */ + private Map> getTasksGroupedBySandbox() { + return taskResourceTrackingService.getResourceAwareTasks() + .values() + .stream() + .filter(SandboxTask.class::isInstance) + .map(SandboxTask.class::cast) + .collect(Collectors.groupingBy( + SandboxTask::getSandboxId, + Collectors.mapping(task -> (Task) task, Collectors.toList()) + )); + } + + /** + * Calculates the resource usage of each sandbox. + * + * @param tasksBySandbox Map of tasks grouped by sandbox + * @return Map of resource usage for each sandbox + */ + private Map> getResourceUsageOfSandboxes(Map> tasksBySandbox) { + Map> resourceUsageOfSandboxes = new HashMap<>(); + + // Iterate over each sandbox entry + for (Map.Entry> sandboxEntry : tasksBySandbox.entrySet()) { + String sandboxId = sandboxEntry.getKey(); + List tasks = sandboxEntry.getValue(); + + // Prepare a usage map for the current sandbox, or retrieve the existing one + Map sandboxUsage = resourceUsageOfSandboxes.computeIfAbsent(sandboxId, k -> new HashMap<>()); + + // Accumulate resource usage for each task in the sandbox + for (Task task : tasks) { + for (SandboxResourceType resourceType : TRACKED_RESOURCES) { + long currentUsage = sandboxUsage.getOrDefault(resourceType, 0L); + long taskUsage = resourceType.getResourceUsage(task); + sandboxUsage.put(resourceType, currentUsage + taskUsage); + } + } + } + return resourceUsageOfSandboxes; + } + + /** + * Handles the completion of a task. + * + * @param task The completed task + */ + @Override + public void onTaskCompleted(Task task) { + } +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxUsageTracker.java b/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxUsageTracker.java new file mode 100644 index 0000000000000..b08e110e090f9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/tracker/SandboxUsageTracker.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.tracker; + +import org.opensearch.search.sandboxing.SandboxLevelResourceUsageView; + +import java.util.Map; + +/** + * This interface is mainly for tracking the resourceLimitGroup level resource usages + */ +public interface SandboxUsageTracker { + /** + * updates the current resource usage of resourceLimitGroups + */ + + Map constructSandboxLevelUsageViews(); +} diff --git a/server/src/main/java/org/opensearch/search/sandboxing/tracker/package-info.java b/server/src/main/java/org/opensearch/search/sandboxing/tracker/package-info.java new file mode 100644 index 0000000000000..fcf0c504d3752 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/sandboxing/tracker/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * ResourceLimitGroup resource tracking artifacts + */ +package org.opensearch.search.sandboxing.tracker; diff --git a/server/src/test/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategyStrategyTests.java b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategyStrategyTests.java new file mode 100644 index 0000000000000..8c19de92f1b9f --- /dev/null +++ b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/LongestRunningTaskFirstStrategyStrategyTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LongestRunningTaskFirstStrategyStrategyTests extends OpenSearchTestCase { + public void testSortingCondition() { + Task task1 = mock(Task.class); + Task task2 = mock(Task.class); + Task task3 = mock(Task.class); + when(task1.getStartTime()).thenReturn(100L); + when(task2.getStartTime()).thenReturn(200L); + when(task3.getStartTime()).thenReturn(300L); + + List tasks = Arrays.asList(task1, task3, task2); + tasks.sort(new LongestRunningTaskFirstStrategy().sortingCondition()); + + assertEquals(Arrays.asList(task3, task2, task1), tasks); + } +} diff --git a/server/src/test/java/org/opensearch/search/sandboxing/cancellation/SandboxCancellationStrategyTestHelpers.java b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/SandboxCancellationStrategyTestHelpers.java new file mode 100644 index 0000000000000..d4a5fa0c6d3f1 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/SandboxCancellationStrategyTestHelpers.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.action.search.SearchAction; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.core.tasks.resourcetracker.ResourceStatsType; +import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; + +public class SandboxCancellationStrategyTestHelpers { + + public static List getListOfTasks(long totalMemory) { + List tasks = new ArrayList<>(); + + while (totalMemory > 0) { + long id = randomLong(); + final Task task = getRandomTask(id); + long initial_memory = randomLongBetween(1, 100); + + ResourceUsageMetric[] initialTaskResourceMetrics = new ResourceUsageMetric[]{ + new ResourceUsageMetric(ResourceStats.MEMORY, initial_memory) + }; + task.startThreadResourceTracking(id, ResourceStatsType.WORKER_STATS, initialTaskResourceMetrics); + + long memory = initial_memory + randomLongBetween(1, 10000); + + totalMemory -= memory - initial_memory; + + ResourceUsageMetric[] taskResourceMetrics = new ResourceUsageMetric[]{ + new ResourceUsageMetric(ResourceStats.MEMORY, memory), + }; + task.updateThreadResourceStats(id, ResourceStatsType.WORKER_STATS, taskResourceMetrics); + task.stopThreadResourceTracking(id, ResourceStatsType.WORKER_STATS); + tasks.add(task); + } + + return tasks; + } + + public static Task getRandomTask(long id) { + return new Task( + id, + "transport", + SearchAction.NAME, + "test description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + } +} diff --git a/server/src/test/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategyStrategyTests.java b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategyStrategyTests.java new file mode 100644 index 0000000000000..2422d41ccd2b5 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/ShortestRunningTaskFirstStrategyStrategyTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ShortestRunningTaskFirstStrategyStrategyTests extends OpenSearchTestCase { + public void testSortingCondition() { + Task task1 = mock(Task.class); + Task task2 = mock(Task.class); + Task task3 = mock(Task.class); + when(task1.getStartTime()).thenReturn(100L); + when(task2.getStartTime()).thenReturn(200L); + when(task3.getStartTime()).thenReturn(300L); + + List tasks = Arrays.asList(task1, task3, task2); + tasks.sort(new ShortestRunningTaskFirstStrategy().sortingCondition()); + + assertEquals(Arrays.asList(task1, task2, task3), tasks); + } +} diff --git a/server/src/test/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategyTests.java b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategyTests.java new file mode 100644 index 0000000000000..39634d9e026b0 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/sandboxing/cancellation/TaskSelectionStrategyTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.cancellation; + +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Comparator; +import java.util.List; + +public class TaskSelectionStrategyTests extends OpenSearchTestCase { + + private class TestTaskSelectionStrategy extends AbstractTaskSelectionStrategy { + @Override + public Comparator sortingCondition() { + return Comparator.comparingLong(Task::getStartTime).reversed(); + } + } + + public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGreaterThanZero() { + TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy(); + long threshold = 100L; + long reduceBy = 50L; + SandboxResourceType resourceType = SandboxResourceType.fromString("JVM"); + List tasks = SandboxCancellationStrategyTestHelpers.getListOfTasks(threshold); + + List selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType); + assertFalse(selectedTasks.isEmpty()); + assertTrue(tasksUsageMeetsThreshold(selectedTasks, reduceBy)); + } + + public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsLesserThanZero() { + TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy(); + long threshold = 100L; + long reduceBy = -50L; + SandboxResourceType resourceType = SandboxResourceType.fromString("JVM"); + List tasks = SandboxCancellationStrategyTestHelpers.getListOfTasks(threshold); + + try { + testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("reduceBy has to be greater than zero", e.getMessage()); + } + } + + public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsEqualToZero() { + TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy(); + long threshold = 100L; + long reduceBy = 0; + SandboxResourceType resourceType = SandboxResourceType.fromString("JVM"); + List tasks = SandboxCancellationStrategyTestHelpers.getListOfTasks(threshold); + + List selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType); + assertTrue(selectedTasks.isEmpty()); + } + + private boolean tasksUsageMeetsThreshold(List selectedTasks, long threshold) { + long memory = 0; + for(Task task : selectedTasks) { + memory += task.getTotalResourceUtilization(ResourceStats.MEMORY); + if(memory > threshold) { + return true; + } + } + return false; + } +} diff --git a/server/src/test/java/org/opensearch/search/sandboxing/tracking/SandboxResourceUsageTrackerServiceTests.java b/server/src/test/java/org/opensearch/search/sandboxing/tracking/SandboxResourceUsageTrackerServiceTests.java new file mode 100644 index 0000000000000..265461652133c --- /dev/null +++ b/server/src/test/java/org/opensearch/search/sandboxing/tracking/SandboxResourceUsageTrackerServiceTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.sandboxing.tracking; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.search.sandboxing.SandboxLevelResourceUsageView; +import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType; +import org.opensearch.search.sandboxing.SandboxTask; +import org.opensearch.search.sandboxing.tracker.SandboxResourceUsageTrackerService; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SandboxResourceUsageTrackerServiceTests extends OpenSearchTestCase { + TestThreadPool threadPool; + TaskManager taskManager; + TaskResourceTrackingService mockTaskResourceTrackingService; + SandboxResourceUsageTrackerService sandboxResourceUsageTrackerService; + @Before + public void setup() { + taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + threadPool = new TestThreadPool(getTestName()); + mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + sandboxResourceUsageTrackerService = new SandboxResourceUsageTrackerService( + taskManager, mockTaskResourceTrackingService); + } + + @After + public void cleanup() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + public void testConstructSandboxLevelViews_CreatesSandboxLevelUsageView_WhenTasksArePresent() { + List sandboxIds = List.of("sandbox1", "sandbox2", "sandbox3"); + + Map activeSearchShardTasks = createActiveSearchShardTasks(sandboxIds); + when(mockTaskResourceTrackingService.getResourceAwareTasks()).thenReturn(activeSearchShardTasks); + Map stringSandboxLevelResourceUsageViewMap = sandboxResourceUsageTrackerService.constructSandboxLevelUsageViews(); + + for(String sandboxId : sandboxIds) { + assertEquals(400, (long) stringSandboxLevelResourceUsageViewMap.get(sandboxId).getResourceUsageData().get(SandboxResourceType.fromString("JVM"))); + assertEquals(2, stringSandboxLevelResourceUsageViewMap.get(sandboxId).getActiveTasks().size()); + } + } + + public void testConstructSandboxLevelViews_CreatesSandboxLevelUsageView_WhenTasksAreNotPresent() { + Map stringSandboxLevelResourceUsageViewMap = sandboxResourceUsageTrackerService.constructSandboxLevelUsageViews(); + assertTrue(stringSandboxLevelResourceUsageViewMap.isEmpty()); + } + + public void testConstructSandboxLevelUsageViews_WithTasksHavingDifferentResourceUsage() { + Map activeSearchShardTasks = new HashMap<>(); + activeSearchShardTasks.put(1L, createMockTask(SearchShardTask.class, 100, 200, "sandbox1")); + activeSearchShardTasks.put(2L, createMockTask(SearchShardTask.class, 200, 400, "sandbox1")); + when(mockTaskResourceTrackingService.getResourceAwareTasks()).thenReturn(activeSearchShardTasks); + + Map sandboxViews = sandboxResourceUsageTrackerService.constructSandboxLevelUsageViews(); + + assertEquals(600, (long) sandboxViews.get("sandbox1").getResourceUsageData().get(SandboxResourceType.fromString("JVM"))); + assertEquals(2, sandboxViews.get("sandbox1").getActiveTasks().size()); + } + + private Map createActiveSearchShardTasks(List sandboxIds) { + Map activeSearchShardTasks = new HashMap<>(); + long task_id = 0; + for (String sandboxId : sandboxIds) { + for (int i = 0; i < 2; i++) { + activeSearchShardTasks.put(++task_id, createMockTask(SearchShardTask.class, 100, 200, sandboxId)); + } + } + return activeSearchShardTasks; + } + + + private T createMockTask(Class type, long cpuUsage, long heapUsage, String sandboxId) { + T task = mock(type); + if (task instanceof SearchTask || task instanceof SearchShardTask) { + when(((SandboxTask) task).getSandboxId()).thenReturn(sandboxId); + } + when(task.getTotalResourceStats()).thenReturn(new TaskResourceUsage(cpuUsage, heapUsage)); + when(task.getStartTimeNanos()).thenReturn((long) 0); + + AtomicBoolean isCancelled = new AtomicBoolean(false); + doAnswer(invocation -> { + isCancelled.set(true); + return null; + }).when(task).cancel(anyString()); + doAnswer(invocation -> isCancelled.get()).when(task).isCancelled(); + + return task; + } +}