Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Adding new Scheduler interface methods for scaling (#1321)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill Graham authored Sep 2, 2016
1 parent 025e409 commit b80ba2d
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 25 deletions.
1 change: 1 addition & 0 deletions heron/scheduler-core/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ common_deps_files = [
]

spi_deps_files = [
"//heron/api/src/java:api-java",
"//heron/spi/src/java:common-spi-java",
"//heron/spi/src/java:statemgr-spi-java",
"//heron/spi/src/java:uploader-spi-java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
return requestSchedulerService(Command.KILL, killTopologyRequest.toByteArray());
}

@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
return requestSchedulerService(Command.UPDATE, updateTopologyRequest.toByteArray());
}

/**
* Send payload to target HTTP connection to request a service
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public interface ISchedulerClient {
* @return true if killed successfully
*/
boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest);

/**
* Update a topology on given UpdateTopologyRequest
*
* @param updateTopologyRequest info for update command
* @return true if updated successfully
*/
boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package com.twitter.heron.scheduler.client;

import java.util.logging.Logger;

import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.spi.common.Config;
Expand All @@ -25,8 +23,6 @@
* This class manages topology by invoking IScheduler's interface directly as a library.
*/
public class LibrarySchedulerClient implements ISchedulerClient {
private static final Logger LOG = Logger.getLogger(LibrarySchedulerClient.class.getName());

private final Config config;
private final Config runtime;
private final IScheduler scheduler;
Expand Down Expand Up @@ -65,6 +61,20 @@ public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
return ret;
}

@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
boolean ret = false;

try {
scheduler.initialize(config, runtime);
ret = scheduler.onUpdate(updateTopologyRequest);
} finally {
SysUtils.closeIgnoringExceptions(scheduler);
}

return ret;
}

// TODO(mfu): Use JAVA8's lambda feature providing a method for all commands in SchedulerUtils
// TODO(mfu): boolean invokeSchedulerAsLibrary(String commandName, Function invoker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SchedulerServer {
// initialize the various URL end points
public static final String KILL_REQUEST_CONTEXT = "/kill";
public static final String RESTART_REQUEST_CONTEXT = "/restart";
public static final String UPDATE_REQUEST_CONTEXT = "/update";

private static final int SERVER_BACK_LOG = 0;

Expand All @@ -50,6 +51,9 @@ public SchedulerServer(Config runtime, IScheduler scheduler, int port)

this.schedulerServer.createContext(RESTART_REQUEST_CONTEXT,
new RestartRequestHandler(runtime, scheduler));

this.schedulerServer.createContext(UPDATE_REQUEST_CONTEXT,
new UpdateRequestHandler(runtime, scheduler));
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler.server;

import java.io.IOException;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.scheduler.IScheduler;
import com.twitter.heron.spi.utils.NetworkUtils;
import com.twitter.heron.spi.utils.SchedulerUtils;

class UpdateRequestHandler implements HttpHandler {
private IScheduler scheduler;

UpdateRequestHandler(Config runtime, IScheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public void handle(HttpExchange exchange) throws IOException {

// read the http request payload
byte[] requestBody = NetworkUtils.readHttpRequestBody(exchange);

// prepare the request
Scheduler.UpdateTopologyRequest updateTopologyRequest =
Scheduler.UpdateTopologyRequest.newBuilder()
.mergeFrom(requestBody)
.build();

// update the topology
boolean isUpdateSuccessfully = scheduler.onUpdate(updateTopologyRequest);

// prepare the response
Scheduler.SchedulerResponse response =
SchedulerUtils.constructSchedulerResponse(isUpdateSuccessfully);

// send the response back
NetworkUtils.sendHttpResponse(exchange, response.toByteArray());
}
}
2 changes: 1 addition & 1 deletion heron/schedulers/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ api_deps_files = [
]

scheduler_deps_files = \
["//heron/scheduler-core/src/java:scheduler-java"] + \
heron_java_proto_files() + \
common_deps_files + \
spi_deps_files + \
api_deps_files

yarn_deps_files = \
scheduler_deps_files + [
"//heron/scheduler-core/src/java:scheduler-java",
"//third_party/java:yarn",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,10 @@ public boolean onKill(Scheduler.KillTopologyRequest request) {
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
return true;
}

@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
return false;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public boolean onRestart(Scheduler.RestartTopologyRequest request) {
return controller.restartJob(containerId);
}

@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

/**
* Encode the JVM options
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.annotations.VisibleForTesting;

import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.spi.common.Config;
Expand Down Expand Up @@ -106,6 +108,9 @@ public void run() {
if (isTopologyKilled) {
LOG.info("Topology is killed. Not to start new executors.");
return;
} else if (!processToContainer.containsKey(containerExecutor)) {
LOG.log(Level.INFO, "Container {0} is killed. No need to relaunch.", container);
return;
}
LOG.log(Level.INFO, "Trying to restart container {0}", container);
// restart the container
Expand Down Expand Up @@ -141,9 +146,11 @@ public boolean onSchedule(PackingPlan packing) {
LOG.info("Starting to deploy topology: " + LocalContext.topologyName(config));
LOG.info("# of containers: " + numContainers);

// for each container, run its own executor
for (int i = 0; i < numContainers; i++) {
startExecutor(i);
synchronized (processToContainer) {
// for each container, run its own executor
for (int i = 0; i < numContainers; i++) {
startExecutor(i);
}
}

LOG.info("Executor for each container have been started.");
Expand All @@ -169,20 +176,22 @@ public boolean onKill(Scheduler.KillTopologyRequest request) {
// set the flag that the topology being killed
isTopologyKilled = true;

// destroy/kill the process for each container
for (Process p : processToContainer.keySet()) {
synchronized (processToContainer) {
// destroy/kill the process for each container
for (Process p : processToContainer.keySet()) {

// get the container index for the process
int index = processToContainer.get(p);
LOG.info("Killing executor for container: " + index);
// get the container index for the process
int index = processToContainer.get(p);
LOG.info("Killing executor for container: " + index);

// destroy the process
p.destroy();
LOG.info("Killed executor for container: " + index);
}
// destroy the process
p.destroy();
LOG.info("Killed executor for container: " + index);
}

// clear the mapping between process and container ids
processToContainer.clear();
// clear the mapping between process and container ids
processToContainer.clear();
}

return true;
}
Expand Down Expand Up @@ -225,17 +234,23 @@ public boolean onRestart(Scheduler.RestartTopologyRequest request) {
return true;
}

public boolean isTopologyKilled() {
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

boolean isTopologyKilled() {
return isTopologyKilled;
}

// This method shall be used only for unit test
protected ExecutorService getMonitorService() {
@VisibleForTesting
ExecutorService getMonitorService() {
return monitorService;
}

// This method shall be used only for unit test
protected Map<Process, Integer> getProcessToContainer() {
@VisibleForTesting
Map<Process, Integer> getProcessToContainer() {
return processToContainer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public boolean onSchedule(PackingPlan packing) {
return controller.submitTopology(topologyConf);
}

@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

@Override
public List<String> getJobLinks() {
List<String> jobLinks = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ public boolean onRestart(Scheduler.RestartTopologyRequest request) {
return mesosFramework.restartJob(containerId);
}

@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

protected MesosFramework getMesosFramework() {
return new MesosFramework(config, runtime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public boolean onRestart(Scheduler.RestartTopologyRequest request) {
return true;
}

@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

protected String getJobIdFilePath() {
return new File(workingDirectory, SlurmContext.jobIdFile(config)).getPath();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.twitter.heron.proto.scheduler.Scheduler.KillTopologyRequest;
import com.twitter.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
import com.twitter.heron.proto.scheduler.Scheduler.UpdateTopologyRequest;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.scheduler.IScheduler;
Expand Down Expand Up @@ -81,6 +82,12 @@ public boolean onRestart(RestartTopologyRequest request) {
}
}

@Override
public boolean onUpdate(UpdateTopologyRequest request) {
LOG.severe("Topology onUpdate not implemented by this scheduler.");
return false;
}

@Override
public void close() {
HeronMasterDriverProvider.getInstance().killTopology();
Expand Down
1 change: 1 addition & 0 deletions heron/spi/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ scheduler_deps_files = \
heron_java_proto_files() + [
":common-spi-java",
":packing-spi-java",
"//heron/api/src/java:classification",
"//heron/common/src/java:config-java",
"@com_google_guava_guava//jar",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum Command {
KILL,
ACTIVATE,
DEACTIVATE,
UPDATE,
RESTART;

public static Command makeCommand(String commandString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface IRepacking extends AutoCloseable {

/**
* Initialize the packing algorithm with the static config and the associated topology
* @param config topology config
* @param topology topology to repack
*/
void initialize(Config config, TopologyAPI.Topology topology);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@

package com.twitter.heron.spi.scheduler;

import com.twitter.heron.classification.InterfaceAudience;
import com.twitter.heron.classification.InterfaceStability;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;

/**
* Launches scheduler. heron-cli will create Launcher object using default no argument constructor.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface ILauncher extends AutoCloseable {
/**
* Initialize Launcher with Config, Uploader and topology. These object
Expand Down
Loading

0 comments on commit b80ba2d

Please sign in to comment.