Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communicate task completion to Scheduler interface. Add two schedulers #253

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface Task : Resource {
*/
public val launchedAt: Instant?

/**
* Time the task was skipped by the scheduler because no host with enough resources was available.
*/
public var timesSkipped: Int

/**
* Request the task to be started.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.time.Duration;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -100,7 +100,7 @@ public final class ComputeService implements AutoCloseable {
/**
* The tasks that should be launched by the service.
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
private final LinkedHashSet<ServiceTask> taskQueue = new LinkedHashSet<>();

/**
* The active tasks in the system.
Expand Down Expand Up @@ -175,6 +175,8 @@ public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull Task
hv.provisionedCores -= flavor.getCoreCount();
hv.instanceCount--;
hv.availableMemory += flavor.getMemorySize();

scheduler.removeTask(task, hv);
} else {
LOGGER.error("Unknown host {}", host);
}
Expand Down Expand Up @@ -309,17 +311,17 @@ public void close() {
/**
* Enqueue the specified [task] to be scheduled onto a host.
*/
SchedulingRequest schedule(ServiceTask task) {
void schedule(ServiceTask task) {
LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid());

long now = clock.millis();
SchedulingRequest request = new SchedulingRequest(task, now);
task.submitTime = now;
task.isCancelled = false;

task.launchedAt = Instant.ofEpochMilli(now);
taskQueue.add(request);
taskQueue.add(task);
tasksPending++;
requestSchedulingCycle();
return request;
}

void delete(ServiceFlavor flavor) {
Expand Down Expand Up @@ -354,29 +356,28 @@ private void requestSchedulingCycle() {
*/
private void doSchedule() {
// reorder tasks
Iterator<ServiceTask> iterator = taskQueue.iterator();

while (!taskQueue.isEmpty()) {
SchedulingRequest request = taskQueue.peek();
while (iterator.hasNext()) {
ServiceTask task = iterator.next();

if (request.isCancelled) {
taskQueue.poll();
if (task.isCancelled) {
iterator.remove();
tasksPending--;
continue;
}

final ServiceTask task = request.task;
// Check if all dependencies are met
// otherwise continue

final ServiceFlavor flavor = task.getFlavor();
final HostView hv = scheduler.select(request.task);
final HostView hv = scheduler.select(iterator);

if (hv == null || !hv.getHost().canFit(task)) {
LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task);

if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) {
// Remove the incoming image
taskQueue.poll();
iterator.remove();
tasksPending--;
attemptsFailure++;

Expand All @@ -392,7 +393,7 @@ private void doSchedule() {
Host host = hv.getHost();

// Remove request from queue
taskQueue.poll();
iterator.remove();
tasksPending--;

LOGGER.info("Assigned task {} to host {}", task, host);
Expand All @@ -413,6 +414,7 @@ private void doSchedule() {
activeTasks.put(task, host);
} catch (Exception cause) {
LOGGER.error("Failed to deploy VM", cause);
scheduler.removeTask(task, hv);
attemptsError++;
}
}
Expand Down Expand Up @@ -601,19 +603,4 @@ public void rescheduleTask(@NotNull Task task, @NotNull SimWorkload workload) {
internalTask.start();
}
}

/**
* A request to schedule a {@link ServiceTask} onto one of the {@link Host}s.
*/
static class SchedulingRequest {
final ServiceTask task;
final long submitTime;

boolean isCancelled;

SchedulingRequest(ServiceTask task, long submitTime) {
this.task = task;
this.submitTime = submitTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class HostView {
long availableMemory;
int provisionedCores;

// Additional metadata to be used by scheduler
public int priorityIndex = -1;
public int listIndex = -1;

/**
* Construct a {@link HostView} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public final class ServiceTask implements Task {
private TaskState state = TaskState.TERMINATED;
Instant launchedAt = null;
Host host = null;
private ComputeService.SchedulingRequest request = null;

// Scheduling request related fields
long submitTime;
boolean isCancelled;
int timesSkipped;

ServiceTask(
ComputeService service,
Expand All @@ -75,6 +79,9 @@ public final class ServiceTask implements Task {
this.image = image;
this.labels = labels;
this.meta = meta;

this.submitTime = -1;
this.isCancelled = false;
}

@NotNull
Expand Down Expand Up @@ -153,8 +160,8 @@ public void start() {
default:
LOGGER.info("User requested to start task {}", uid);
setState(TaskState.PROVISIONING);
assert request == null : "Scheduling request already active";
request = service.schedule(this);
assert submitTime == -1 : "Scheduling request already active";
service.schedule(this);
break;
}
}
Expand Down Expand Up @@ -246,10 +253,16 @@ void setState(TaskState state) {
* Cancel the provisioning request if active.
*/
private void cancelProvisioningRequest() {
final ComputeService.SchedulingRequest request = this.request;
if (request != null) {
this.request = null;
request.isCancelled = true;
}
this.isCancelled = true;
}

@Override
public int getTimesSkipped() {
return timesSkipped;
}

@Override
public void setTimesSkipped(int i) {
timesSkipped = i;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,20 @@ public interface ComputeScheduler {
public fun removeHost(host: HostView)

/**
* Select a host for the specified [task].
* Select a host for the specified [iter].
* We implicity assume that the task has been scheduled onto the host.
*
* @param task The server to select a host for.
* @param iter The server to select a host for.
* @return The host to schedule the server on or `null` if no server is available.
*/
public fun select(task: Task): HostView?
public fun select(iter: MutableIterator<Task>): HostView?

/**
* Inform the scheduler that a [task] has been removed from the [host].
* Could be due to completion or failure.
*/
public fun removeTask(
task: Task,
host: HostView,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class FilterScheduler(
hosts.remove(host)
}

override fun select(task: Task): HostView? {
override fun select(iter: MutableIterator<Task>): HostView? {
val task = iter.next()
val hosts = hosts
val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } }

Expand Down Expand Up @@ -108,4 +109,11 @@ public class FilterScheduler(
else -> subset[random.nextInt(maxSize)]
}
}

override fun removeTask(
task: Task,
host: HostView,
) {
TODO("Not yet implemented")
}
}
Loading
Loading