Skip to content

Commit

Permalink
initial code for the sandbox resource tracking and cancellation frame…
Browse files Browse the repository at this point in the history
…work

Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed May 30, 2024
1 parent 4700be3 commit 225c31a
Show file tree
Hide file tree
Showing 29 changed files with 1,246 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.sandboxing.SandboxTask;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;

Expand All @@ -50,9 +51,10 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask, SandboxTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;
private String sandboxId;

public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, () -> "");
Expand Down Expand Up @@ -84,4 +86,14 @@ public boolean supportsResourceTracking() {
public boolean shouldCancelChildrenOnCancellation() {
return false;
}

@Override
public void setSandboxId(String sandboxId) {
this.sandboxId = sandboxId;
}

@Override
public String getSandboxId() {
return "sandboxId";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.sandboxing.SandboxTask;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;

Expand All @@ -49,10 +50,11 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
public class SearchTask extends CancellableTask implements SearchBackpressureTask, SandboxTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private String sandboxId;

public SearchTask(
long id,
Expand Down Expand Up @@ -106,4 +108,13 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public void setSandboxId(String sandboxId) {
this.sandboxId = sandboxId;
}

@Override
public String getSandboxId() {
return "sandboxId";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ public boolean isSegmentReplicationEnabled(String indexName) {
.orElse(false);
}

/**
public Map<String, Sandbox> sandboxes() {
// stub
return Collections.emptyMap();
}

/**
* Context of the XContent.
*
* @opensearch.api
Expand Down
87 changes: 87 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Sandbox.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType;

import java.util.Collections;
import java.util.List;

@ExperimentalApi
public class Sandbox {
//TODO Kaushal should have implemented hashcode and equals
private SandboxMode mode;

public SandboxMode getMode() {
return mode;
}

public ResourceLimit getResourceLimitFor(SandboxResourceType resourceType) {
return null;
}

public String getName() {
return "";
}

public String getId() {
return "";
}

public List<ResourceLimit> getResourceLimits() {
return Collections.emptyList();
}

@ExperimentalApi
public class ResourceLimit {
public Long getThresholdInLong() {
return 0L;
}

public SandboxResourceType getResourceType() {
return null;
}

public Long getThreshold() {
return 0L;
}
}

@ExperimentalApi
public enum SandboxMode {
SOFT("soft"),
ENFORCED("enforced"),
MONITOR("monitor");

private final String name;

SandboxMode(String mode) {
this.name = mode;
}

public String getName() {
return name;
}

public static SandboxMode fromName(String s) {
switch (s) {
case "soft":
return SOFT;
case "enforced":
return ENFORCED;
case "monitor":
return MONITOR;
default:
throw new IllegalArgumentException("Invalid value for SandboxMode: " + s);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.sandboxing;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.sandboxing.resourcetype.SandboxResourceType;
import org.opensearch.tasks.Task;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@ExperimentalApi
public class SandboxLevelResourceUsageView {

private final String sandboxId;
private final Map<SandboxResourceType, Long> resourceUsage;
private final List<Task> activeTasks;

public SandboxLevelResourceUsageView(String sandboxId) {
this.sandboxId = sandboxId;
this.resourceUsage = new HashMap<>();
this.activeTasks = new ArrayList<>();
}

public SandboxLevelResourceUsageView(String sandboxId, Map<SandboxResourceType, Long> resourceUsage, List<Task> activeTasks) {
this.sandboxId = sandboxId;
this.resourceUsage = resourceUsage;
this.activeTasks = activeTasks;
}

public Map<SandboxResourceType, Long> getResourceUsageData() {
return resourceUsage;
}

public List<Task> getActiveTasks() {
return activeTasks;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SandboxLevelResourceUsageView that = (SandboxLevelResourceUsageView) o;
return Objects.equals(sandboxId, that.sandboxId);
}

@Override
public int hashCode() {
return Objects.hashCode(sandboxId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.sandboxing;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.Sandbox;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.search.sandboxing.cancellation.DefaultTaskCancellation;
import org.opensearch.search.sandboxing.cancellation.LongestRunningTaskFirstStrategy;
import org.opensearch.search.sandboxing.tracker.SandboxUsageTracker;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Main service which will run periodically to track and cancel resource constraint violating tasks in sandboxes
*/
public class SandboxService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(SandboxService.class);

private final SandboxUsageTracker sandboxUsageTracker;
private volatile Scheduler.Cancellable scheduledFuture;
private final ThreadPool threadPool;
private final ClusterService clusterService;

/**
* Guice managed constructor
*
* @param sandboxUsageTracker
* @param sandboxPruner
* @param sandboxServiceSettings
* @param threadPool
*/
@Inject
public SandboxService(
SandboxUsageTracker sandboxUsageTracker,
ClusterService clusterService,
ThreadPool threadPool
) {
this.sandboxUsageTracker = sandboxUsageTracker;
this.sandboxServiceSettings = sandboxServiceSettings;
this.sandboxPruner = sandboxPruner;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

/**
* run at regular interval
*/
private void doRun() {
Map<String, SandboxLevelResourceUsageView> sandboxLevelResourceUsageViews = sandboxUsageTracker.constructSandboxLevelUsageViews();
Set<Sandbox> activeSandboxes = getActiveSandboxes();
DefaultTaskCancellation taskCancellation = new DefaultTaskCancellation(
new LongestRunningTaskFirstStrategy(),
sandboxLevelResourceUsageViews,
activeSandboxes
);
taskCancellation.cancelTasks();
// TODO Prune the sandboxes
}

private Set<Sandbox> getActiveSandboxes() {
return new HashSet<>(clusterService.state().metadata().sandboxes().values());
}

/**
* {@link AbstractLifecycleComponent} lifecycle method
*/
@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
} catch (Exception e) {
logger.debug("Exception occurred in Query Sandbox service", e);
}
}, sandboxServiceSettings.getRunIntervalMillis(), ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
}

@Override
protected void doClose() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.sandboxing;

/**
* This interface can be implemented by tasks which will be tracked and monitored using {@link org.opensearch.cluster.metadata.ResourceLimitGroup}
*/
public interface SandboxTask {
void setSandboxId(String sandboxId);

String getSandboxId();
}
Loading

0 comments on commit 225c31a

Please sign in to comment.