Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Change AuroraController to an interface #1672

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ class AuroraCLIController implements AuroraController {
}

@Override
public boolean createJob(Map<String, String> bindings) {
public boolean createJob(Map<AuroraField, String> bindings) {
List<String> auroraCmd =
new ArrayList<>(Arrays.asList("aurora", "job", "create", "--wait-until", "RUNNING"));

for (Map.Entry<String, String> binding : bindings.entrySet()) {
for (AuroraField field : bindings.keySet()) {
auroraCmd.add("--bind");
auroraCmd.add(String.format("%s=%s", binding.getKey(), binding.getValue()));
auroraCmd.add(String.format("%s=%s", field, bindings.get(field)));
}

auroraCmd.add(jobSpec);
Expand All @@ -82,9 +82,9 @@ public boolean killJob() {

// Restart an aurora job
@Override
public boolean restartJob(int containerId) {
public boolean restart(Integer containerId) {
List<String> auroraCmd = new ArrayList<>(Arrays.asList("aurora", "job", "restart"));
if (containerId != -1) {
if (containerId != null && containerId != -1) {
auroraCmd.add(String.format("%s/%s", jobSpec, Integer.toString(containerId)));
} else {
auroraCmd.add(jobSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@
*/
public interface AuroraController {

boolean createJob(Map<String, String> auroraProperties);
boolean createJob(Map<AuroraField, String> auroraProperties);
boolean killJob();
boolean restartJob(int containerId);

/**
* Restarts a given container, or the entire job if containerId is null
Copy link
Contributor

@objmagic objmagic Jan 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when containerId is -1, the entire job will also be restarted according to here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although not the preferred approach I kept that for backward compatibility, but there's no need to do that since the code will all change in lock step. I've made the change to match behavior to docs.

* @param containerId ID of container to restart, or entire job if null
*/
boolean restart(Integer containerId);

void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove);
void addContainers(Integer count);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler.aurora;

/**
* Field names passed to aurora controllers during job creation
*/
public enum AuroraField {
CLUSTER,
COMPONENT_JVM_OPTS_IN_BASE64,
COMPONENT_RAMMAP,
CORE_PACKAGE_URI,
CPUS_PER_CONTAINER,
DISK_PER_CONTAINER,
ENVIRON,
HERON_SANDBOX_JAVA_HOME,
INSTANCE_JVM_OPTS_IN_BASE64,
/**
* Previous flag updated to use IS_PRODUCTION instead
* @deprecated remove in future release once config/src/yaml/heron.aurora is updated
*/
@Deprecated
ISPRODUCTION,
IS_PRODUCTION,
NUM_CONTAINERS,
RAM_PER_CONTAINER,
ROLE,
SANDBOX_EXECUTOR_BINARY,
SANDBOX_INSTANCE_CLASSPATH,
SANDBOX_METRICSMGR_CLASSPATH,
SANDBOX_METRICS_YAML,
SANDBOX_PYTHON_INSTANCE_BINARY,
SANDBOX_SCHEDULER_CLASSPATH,
SANDBOX_SHELL_BINARY,
SANDBOX_STMGR_BINARY,
SANDBOX_SYSTEM_YAML,
SANDBOX_TMASTER_BINARY,
STATEMGR_CONNECTION_STRING,
STATEMGR_ROOT_PATH,
TOPOLOGY_BINARY_FILE,
TOPOLOGY_CLASSPATH,
TOPOLOGY_DEFINITION_FILE,
TOPOLOGY_ID,
TOPOLOGY_NAME,
TOPOLOGY_PACKAGE_TYPE,
TOPOLOGY_PACKAGE_URI
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public boolean onSchedule(PackingPlan packing) {
SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtime), updatedPackingPlan,
Runtime.schedulerStateManagerAdaptor(runtime));

Map<String, String> auroraProperties = createAuroraProperties(updatedPackingPlan);
Map<AuroraField, String> auroraProperties = createAuroraProperties(updatedPackingPlan);

return controller.createJob(auroraProperties);
}
Expand All @@ -122,8 +122,11 @@ public boolean onKill(Scheduler.KillTopologyRequest request) {

@Override
public boolean onRestart(Scheduler.RestartTopologyRequest request) {
int containerId = request.getContainerIndex();
return controller.restartJob(containerId);
Integer containerId = null;
if (request.getContainerIndex() != -1) {
containerId = request.getContainerIndex();
}
return controller.restart(containerId);
}

@Override
Expand Down Expand Up @@ -160,77 +163,82 @@ protected String formatJavaOpts(String javaOpts) {
return String.format("\"%s\"", javaOptsBase64.replace("=", "&equals;"));
}

protected Map<String, String> createAuroraProperties(PackingPlan packing) {
Map<String, String> auroraProperties = new HashMap<>();
@SuppressWarnings("deprecation") // remove once we remove ISPRODUCTION usage below
protected Map<AuroraField, String> createAuroraProperties(PackingPlan packing) {
Map<AuroraField, String> auroraProperties = new HashMap<>();

TopologyAPI.Topology topology = Runtime.topology(runtime);
Resource containerResource = packing.getContainers().iterator().next().getRequiredResource();

auroraProperties.put("SANDBOX_EXECUTOR_BINARY", Context.executorSandboxBinary(config));
auroraProperties.put("TOPOLOGY_NAME", topology.getName());
auroraProperties.put("TOPOLOGY_ID", topology.getId());
auroraProperties.put("TOPOLOGY_DEFINITION_FILE",
auroraProperties.put(AuroraField.SANDBOX_EXECUTOR_BINARY,
Context.executorSandboxBinary(config));
auroraProperties.put(AuroraField.TOPOLOGY_NAME, topology.getName());
auroraProperties.put(AuroraField.TOPOLOGY_ID, topology.getId());
auroraProperties.put(AuroraField.TOPOLOGY_DEFINITION_FILE,
FileUtils.getBaseName(Context.topologyDefinitionFile(config)));
auroraProperties.put("STATEMGR_CONNECTION_STRING",
auroraProperties.put(AuroraField.STATEMGR_CONNECTION_STRING,
Context.stateManagerConnectionString(config));
auroraProperties.put("STATEMGR_ROOT_PATH", Context.stateManagerRootPath(config));
auroraProperties.put("SANDBOX_TMASTER_BINARY", Context.tmasterSandboxBinary(config));
auroraProperties.put("SANDBOX_STMGR_BINARY", Context.stmgrSandboxBinary(config));
auroraProperties.put("SANDBOX_METRICSMGR_CLASSPATH",
auroraProperties.put(AuroraField.STATEMGR_ROOT_PATH, Context.stateManagerRootPath(config));
auroraProperties.put(AuroraField.SANDBOX_TMASTER_BINARY, Context.tmasterSandboxBinary(config));
auroraProperties.put(AuroraField.SANDBOX_STMGR_BINARY, Context.stmgrSandboxBinary(config));
auroraProperties.put(AuroraField.SANDBOX_METRICSMGR_CLASSPATH,
Context.metricsManagerSandboxClassPath(config));
auroraProperties.put("INSTANCE_JVM_OPTS_IN_BASE64",
auroraProperties.put(AuroraField.INSTANCE_JVM_OPTS_IN_BASE64,
formatJavaOpts(TopologyUtils.getInstanceJvmOptions(topology)));
auroraProperties.put("TOPOLOGY_CLASSPATH",
auroraProperties.put(AuroraField.TOPOLOGY_CLASSPATH,
TopologyUtils.makeClassPath(topology, Context.topologyBinaryFile(config)));

auroraProperties.put("SANDBOX_SYSTEM_YAML", Context.systemConfigSandboxFile(config));
auroraProperties.put("COMPONENT_RAMMAP", Runtime.componentRamMap(runtime));
auroraProperties.put("COMPONENT_JVM_OPTS_IN_BASE64",
auroraProperties.put(AuroraField.SANDBOX_SYSTEM_YAML, Context.systemConfigSandboxFile(config));
auroraProperties.put(AuroraField.COMPONENT_RAMMAP, Runtime.componentRamMap(runtime));
auroraProperties.put(AuroraField.COMPONENT_JVM_OPTS_IN_BASE64,
formatJavaOpts(TopologyUtils.getComponentJvmOptions(topology)));
auroraProperties.put("TOPOLOGY_PACKAGE_TYPE",
auroraProperties.put(AuroraField.TOPOLOGY_PACKAGE_TYPE,
Context.topologyPackageType(config).name().toLowerCase());
auroraProperties.put("TOPOLOGY_BINARY_FILE",
auroraProperties.put(AuroraField.TOPOLOGY_BINARY_FILE,
FileUtils.getBaseName(Context.topologyBinaryFile(config)));
auroraProperties.put("HERON_SANDBOX_JAVA_HOME", Context.javaSandboxHome(config));
auroraProperties.put(AuroraField.HERON_SANDBOX_JAVA_HOME, Context.javaSandboxHome(config));

auroraProperties.put("SANDBOX_SHELL_BINARY", Context.shellSandboxBinary(config));
auroraProperties.put("SANDBOX_PYTHON_INSTANCE_BINARY",
auroraProperties.put(AuroraField.SANDBOX_SHELL_BINARY, Context.shellSandboxBinary(config));
auroraProperties.put(AuroraField.SANDBOX_PYTHON_INSTANCE_BINARY,
Context.pythonInstanceSandboxBinary(config));

auroraProperties.put("CPUS_PER_CONTAINER", Double.toString(containerResource.getCpu()));
auroraProperties.put("DISK_PER_CONTAINER",
auroraProperties.put(AuroraField.CPUS_PER_CONTAINER,
Double.toString(containerResource.getCpu()));
auroraProperties.put(AuroraField.DISK_PER_CONTAINER,
Long.toString(containerResource.getDisk().asBytes()));
auroraProperties.put("RAM_PER_CONTAINER",
auroraProperties.put(AuroraField.RAM_PER_CONTAINER,
Long.toString(containerResource.getRam().asBytes()));

auroraProperties.put("NUM_CONTAINERS", (1 + TopologyUtils.getNumContainers(topology)) + "");
auroraProperties.put(AuroraField.NUM_CONTAINERS,
Integer.toString(1 + TopologyUtils.getNumContainers(topology)));

auroraProperties.put(AuroraField.CLUSTER, Context.cluster(config));
auroraProperties.put(AuroraField.ENVIRON, Context.environ(config));
auroraProperties.put(AuroraField.ROLE, Context.role(config));

auroraProperties.put("CLUSTER", Context.cluster(config));
auroraProperties.put("ENVIRON", Context.environ(config));
auroraProperties.put("ROLE", Context.role(config));
auroraProperties.put("ISPRODUCTION", isProduction() + "");
// TODO (nlu): currently enforce environment to be "prod" for a Production job
String isProduction = Boolean.toString("prod".equals(Context.environ(config)));
// TODO: remove this and suppress above once we cut a release and update the aurora config file
auroraProperties.put(AuroraField.ISPRODUCTION, isProduction);
auroraProperties.put(AuroraField.IS_PRODUCTION, isProduction);

auroraProperties.put("SANDBOX_INSTANCE_CLASSPATH", Context.instanceSandboxClassPath(config));
auroraProperties.put("SANDBOX_METRICS_YAML", Context.metricsSinksSandboxFile(config));
auroraProperties.put(AuroraField.SANDBOX_INSTANCE_CLASSPATH,
Context.instanceSandboxClassPath(config));
auroraProperties.put(AuroraField.SANDBOX_METRICS_YAML, Context.metricsSinksSandboxFile(config));

String completeSchedulerClassPath = new StringBuilder()
.append(Context.schedulerSandboxClassPath(config)).append(":")
.append(Context.packingSandboxClassPath(config)).append(":")
.append(Context.stateManagerSandboxClassPath(config))
.toString();
auroraProperties.put("SANDBOX_SCHEDULER_CLASSPATH", completeSchedulerClassPath);
auroraProperties.put(AuroraField.SANDBOX_SCHEDULER_CLASSPATH, completeSchedulerClassPath);

String heronCoreReleasePkgURI = Context.corePackageUri(config);
String topologyPkgURI = Runtime.topologyPackageUri(runtime).toString();

auroraProperties.put("CORE_PACKAGE_URI", heronCoreReleasePkgURI);
auroraProperties.put("TOPOLOGY_PACKAGE_URI", topologyPkgURI);
auroraProperties.put(AuroraField.CORE_PACKAGE_URI, heronCoreReleasePkgURI);
auroraProperties.put(AuroraField.TOPOLOGY_PACKAGE_URI, topologyPkgURI);

return auroraProperties;
}

protected boolean isProduction() {
// TODO (nlu): currently enforce environment to be "prod" for a Production job
return "prod".equals(Context.environ(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void after() throws Exception {

@Test
public void testCreateJob() throws Exception {
Map<String, String> bindings = new HashMap<>();
Map<AuroraField, String> bindings = new HashMap<>();
List<String> expectedCommand = asList("aurora job create --wait-until RUNNING %s %s %s",
JOB_SPEC, AURORA_FILENAME, VERBOSE_CONFIG);

Expand Down Expand Up @@ -108,12 +108,12 @@ public void testRestartJob() throws Exception {

// Failed
Mockito.doReturn(false).when(controller).runProcess(Matchers.anyListOf(String.class));
Assert.assertFalse(controller.restartJob(containerId));
Assert.assertFalse(controller.restart(containerId));
Mockito.verify(controller).runProcess(Mockito.eq(expectedCommand));

// Happy path
Mockito.doReturn(true).when(controller).runProcess(Matchers.anyListOf(String.class));
Assert.assertTrue(controller.restartJob(containerId));
Assert.assertTrue(controller.restart(containerId));
Mockito.verify(controller, Mockito.times(2)).runProcess(expectedCommand);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,24 @@ public void testOnSchedule() throws Exception {

// Failed to create job via controller
Mockito.doReturn(false).when(controller)
.createJob(Matchers.anyMapOf(String.class, String.class));
.createJob(Matchers.anyMapOf(AuroraField.class, String.class));
Mockito.doReturn(true).when(stateManager)
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));

Assert.assertFalse(scheduler.onSchedule(validPlan));

Mockito.verify(controller)
.createJob(Matchers.anyMapOf(String.class, String.class));
.createJob(Matchers.anyMapOf(AuroraField.class, String.class));
Mockito.verify(stateManager)
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));

// Happy path
Mockito.doReturn(true).when(controller)
.createJob(Matchers.anyMapOf(String.class, String.class));
.createJob(Matchers.anyMapOf(AuroraField.class, String.class));
Assert.assertTrue(scheduler.onSchedule(validPlan));

Mockito.verify(controller, Mockito.times(2))
.createJob(Matchers.anyMapOf(String.class, String.class));
.createJob(Matchers.anyMapOf(AuroraField.class, String.class));
Mockito.verify(stateManager, Mockito.times(2))
.updatePackingPlan(any(PackingPlans.PackingPlan.class), eq(TOPOLOGY_NAME));
}
Expand Down Expand Up @@ -166,15 +166,15 @@ public void testOnRestart() throws Exception {

// Failed to kill job via controller
Mockito.doReturn(false).when(
controller).restartJob(containerToRestart);
controller).restart(containerToRestart);
Assert.assertFalse(scheduler.onRestart(restartTopologyRequest));
Mockito.verify(controller).restartJob(containerToRestart);
Mockito.verify(controller).restart(containerToRestart);

// Happy path
Mockito.doReturn(true).when(
controller).restartJob(containerToRestart);
controller).restart(containerToRestart);
Assert.assertTrue(scheduler.onRestart(restartTopologyRequest));
Mockito.verify(controller, Mockito.times(2)).restartJob(containerToRestart);
Mockito.verify(controller, Mockito.times(2)).restart(containerToRestart);
}

@Test
Expand Down