From 06876fa33ce6626b70022ce1ab86ed9466c48dfe Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Fri, 2 Sep 2016 17:16:39 -0700
Subject: [PATCH 1/6] Adding LocalSchduler scaling impl with
UpdateTopologyManager
---
.../scheduler/UpdateTopologyManager.java | 249 ++++++++++++++++++
heron/scheduler-core/tests/java/BUILD | 1 +
.../scheduler/UpdateTopologyManagerTest.java | 179 +++++++++++++
.../heron/scheduler/local/LocalScheduler.java | 70 ++++-
.../scheduler/local/LocalSchedulerTest.java | 102 ++++++-
.../heron/spi/utils/PackingTestUtils.java | 25 +-
6 files changed, 615 insertions(+), 11 deletions(-)
create mode 100644 heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
create mode 100644 heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
new file mode 100644
index 00000000000..828c5fae042
--- /dev/null
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -0,0 +1,249 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.scheduler;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.protobuf.Descriptors;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.proto.system.PackingPlans;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.packing.PackingPlan;
+import com.twitter.heron.spi.packing.PackingPlanProtoDeserializer;
+import com.twitter.heron.spi.scheduler.IScalable;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.utils.Runtime;
+
+/**
+ * Class that is able to update a topology. This includes changing the parallelism of
+ * topology components
+ */
+public class UpdateTopologyManager {
+ private static final Logger LOG = Logger.getLogger(UpdateTopologyManager.class.getName());
+
+ private Config runtime;
+ private Optional scalableScheduler;
+ private PackingPlanProtoDeserializer deserializer;
+
+ public UpdateTopologyManager(Config runtime, Optional scalableScheduler) {
+ this.runtime = runtime;
+ this.scalableScheduler = scalableScheduler;
+ this.deserializer = new PackingPlanProtoDeserializer();
+ }
+
+ /**
+ * Scales the topology out or in based on the proposedPackingPlan
+ *
+ * @param existingProtoPackingPlan the current plan. If this isn't what's found in the state manager,
+ * the update will fail
+ * @param proposedProtoPackingPlan packing plan to change the topology to
+ */
+ public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
+ final PackingPlans.PackingPlan proposedProtoPackingPlan)
+ throws ExecutionException, InterruptedException {
+ String topologyName = Runtime.topologyName(runtime);
+ PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
+ PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);
+
+ assertTrue(proposedPackingPlan.getContainers().size() > 0,
+ "proposed packing plan must have at least 1 container %s", proposedPackingPlan);
+
+ ContainerDelta containerDelta = new ContainerDelta(
+ existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
+ int newContainerCount = containerDelta.getContainersToAdd().size();
+ int removableContainerCount = containerDelta.getContainersToRemove().size();
+
+ String message = String.format("Topology change requires %s new containers and removing %s "
+ + "existing containers, but the scheduler does not support scaling, aborting. "
+ + "Existing packing plan: %s, proposed packing plan: %s",
+ newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
+ assertTrue(newContainerCount + removableContainerCount == 0 || scalableScheduler.isPresent(),
+ message);
+
+ SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
+
+ // assert found PackingPlan is same as existing PackingPlan.
+ validateCurrentPackingPlan(existingProtoPackingPlan, topologyName, stateManager);
+
+ // fetch the topology, which will need to be updated
+ TopologyAPI.Topology updatedTopology =
+ getUpdatedTopology(topologyName, proposedPackingPlan, stateManager);
+
+ // request new resources if necessary. Once containers are allocated we should make the changes
+ // to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
+ if (newContainerCount > 0) {
+ scalableScheduler.get().addContainers(containerDelta.getContainersToAdd());
+ }
+
+ // update parallelism in updatedTopology since TMaster checks that
+ // Sum(parallelism) == Sum(instances)
+ logFine("Deleted existing Topology: %s", stateManager.deleteTopology(topologyName));
+ logFine("Set new Topology: %s", stateManager.setTopology(updatedTopology, topologyName));
+
+ // update packing plan to trigger the scaling event
+ logFine("Deleted existing packing plan: %s", stateManager.deletePackingPlan(topologyName));
+ logFine("Set new PackingPlan: %s",
+ stateManager.setPackingPlan(proposedProtoPackingPlan, topologyName));
+
+ // delete the physical plan so TMaster doesn't try to re-establish it on start-up.
+ logFine("Deleted Physical Plan: %s", stateManager.deletePhysicalPlan(topologyName));
+
+ if (removableContainerCount > 0) {
+ scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
+ }
+ }
+
+ @VisibleForTesting
+ TopologyAPI.Topology getUpdatedTopology(String topologyName,
+ PackingPlan proposedPackingPlan,
+ SchedulerStateManagerAdaptor stateManager) {
+ TopologyAPI.Topology updatedTopology = stateManager.getTopology(topologyName);
+ Map proposedComponentCounts = proposedPackingPlan.getComponentCounts();
+ return mergeTopology(updatedTopology, proposedComponentCounts);
+ }
+
+ @VisibleForTesting
+ void validateCurrentPackingPlan(PackingPlans.PackingPlan existingProtoPackingPlan,
+ String topologyName,
+ SchedulerStateManagerAdaptor stateManager) {
+ PackingPlans.PackingPlan foundPackingPlan = stateManager.getPackingPlan(topologyName);
+ assertTrue(foundPackingPlan.equals(existingProtoPackingPlan),
+ "Existing packing plan received does not equal the packing plan found in the state "
+ + "manager. Not updating updatedTopology. Received: %s, Found: %s",
+ existingProtoPackingPlan, foundPackingPlan);
+ }
+
+ @VisibleForTesting
+ static class ContainerDelta {
+ private final Set containersToAdd;
+ private final Set containersToRemove;
+
+ @VisibleForTesting
+ ContainerDelta(Set currentContainers,
+ Set proposedContainers) {
+ Set toAdd = new HashSet<>();
+ for (PackingPlan.ContainerPlan proposedContainerPlan : proposedContainers) {
+ if (!currentContainers.contains(proposedContainerPlan)) {
+ toAdd.add(proposedContainerPlan);
+ }
+ }
+ this.containersToAdd = Collections.unmodifiableSet(toAdd);
+
+ Set toRemove = new HashSet<>();
+ for (PackingPlan.ContainerPlan curentContainerPlan : currentContainers) {
+ if (!proposedContainers.contains(curentContainerPlan)) {
+ toRemove.add(curentContainerPlan);
+ }
+ }
+ this.containersToRemove = Collections.unmodifiableSet(toRemove);
+ }
+
+ @VisibleForTesting
+ Set getContainersToRemove() {
+ return containersToRemove;
+ }
+
+ @VisibleForTesting
+ Set getContainersToAdd() {
+ return containersToAdd;
+ }
+ }
+
+ /**
+ * For each of the components with changed parallelism we need to update the Topology configs to
+ * represent the change.
+ */
+ @VisibleForTesting
+ static TopologyAPI.Topology mergeTopology(TopologyAPI.Topology topology,
+ Map proposedComponentCounts) {
+ TopologyAPI.Topology.Builder builder = TopologyAPI.Topology.newBuilder().mergeFrom(topology);
+ for (String componentName : proposedComponentCounts.keySet()) {
+ Integer parallelism = proposedComponentCounts.get(componentName);
+
+ boolean updated = false;
+ for (TopologyAPI.Bolt.Builder boltBuilder : builder.getBoltsBuilderList()) {
+ if (updateComponent(boltBuilder.getCompBuilder(), componentName, parallelism)) {
+ updated = true;
+ break;
+ }
+ }
+
+ if (!updated) {
+ for (TopologyAPI.Spout.Builder spoutBuilder : builder.getSpoutsBuilderList()) {
+ if (updateComponent(spoutBuilder.getCompBuilder(), componentName, parallelism)) {
+ break;
+ }
+ }
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Go through all fields in the component builder until one is found where "name=componentName".
+ * When found, go through the confs in the conf builder to find the parallelism field and update
+ * that.
+ *
+ * @return true if the component was found and updated, which might not always happen. For example
+ * if the component is a spout, it won't be found if a bolt builder is passed.
+ */
+ private static boolean updateComponent(TopologyAPI.Component.Builder compBuilder,
+ String componentName,
+ int parallelism) {
+ for (Map.Entry entry
+ : compBuilder.getAllFields().entrySet()) {
+ if (entry.getKey().getName().equals("name") && componentName.equals(entry.getValue())) {
+ TopologyAPI.Config.Builder confBuilder = compBuilder.getConfigBuilder();
+ boolean keyFound = false;
+ // no way to get a KeyValue builder with a get(key) so we have to iterate until found.
+ for (TopologyAPI.Config.KeyValue.Builder kvBuilder : confBuilder.getKvsBuilderList()) {
+ if (kvBuilder.getKey().equals(
+ com.twitter.heron.api.Config.TOPOLOGY_COMPONENT_PARALLELISM)) {
+ kvBuilder.setValue(Integer.toString(parallelism));
+ keyFound = true;
+ break;
+ }
+ }
+ if (!keyFound) {
+ TopologyAPI.Config.KeyValue.Builder kvBuilder =
+ TopologyAPI.Config.KeyValue.newBuilder();
+ kvBuilder.setKey(com.twitter.heron.api.Config.TOPOLOGY_COMPONENT_PARALLELISM);
+ kvBuilder.setValue(Integer.toString(parallelism));
+ confBuilder.addKvs(kvBuilder);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void assertTrue(boolean condition, String message, Object... values) {
+ if (!condition) {
+ throw new RuntimeException("ERROR: " + String.format(message, values));
+ }
+ }
+
+ private static void logFine(String format, Object... values) {
+ LOG.fine(String.format(format, values));
+ }
+}
diff --git a/heron/scheduler-core/tests/java/BUILD b/heron/scheduler-core/tests/java/BUILD
index e6ce8a306f7..53ab195c8f3 100644
--- a/heron/scheduler-core/tests/java/BUILD
+++ b/heron/scheduler-core/tests/java/BUILD
@@ -41,6 +41,7 @@ java_tests(
"com.twitter.heron.scheduler.RuntimeManagerMainTest",
"com.twitter.heron.scheduler.SubmitterMainTest",
"com.twitter.heron.scheduler.SchedulerMainTest",
+ "com.twitter.heron.scheduler.UpdateTopologyManagerTest",
"com.twitter.heron.scheduler.server.SchedulerServerTest",
"com.twitter.heron.scheduler.client.LibrarySchedulerClientTest",
"com.twitter.heron.scheduler.client.HttpServiceSchedulerClientTest",
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
new file mode 100644
index 00000000000..2e438075601
--- /dev/null
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -0,0 +1,179 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.twitter.heron.scheduler;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.proto.system.PackingPlans;
+import com.twitter.heron.scheduler.UpdateTopologyManager.ContainerDelta;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Keys;
+import com.twitter.heron.spi.packing.PackingPlan;
+import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
+import com.twitter.heron.spi.packing.Resource;
+import com.twitter.heron.spi.scheduler.IScalable;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.utils.PackingTestUtils;
+import com.twitter.heron.spi.utils.TopologyTests;
+
+public class UpdateTopologyManagerTest {
+
+ private Set currentContainerPlan;
+ private Set proposedContainerPlan;
+ private Set expectedContainersToAdd;
+ private Set expectedContainersToRemove;
+
+ @Before
+ public void init() {
+ currentContainerPlan = buildContainerSet("current-1", "current-2", "current-3", "current-4");
+ proposedContainerPlan = buildContainerSet("current-1", "current-3", "new-1", "new-2");
+ expectedContainersToAdd = buildContainerSet("new-1", "new-2");
+ expectedContainersToRemove = buildContainerSet("current-2", "current-4");
+ }
+
+ @Test
+ public void testContainerDelta() {
+ ContainerDelta result = new ContainerDelta(currentContainerPlan, proposedContainerPlan);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(expectedContainersToAdd, result.getContainersToAdd());
+ Assert.assertEquals(expectedContainersToRemove, result.getContainersToRemove());
+ }
+
+ /**
+ * Test scalable scheduler invocation
+ */
+ @Test
+ public void requestsToAddAndRemoveContainers() throws Exception {
+ PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
+
+ PackingPlan currentPacking
+ = new PackingPlan("current", currentContainerPlan, new Resource(0.5, 1, 2));
+ PackingPlan proposedPacking
+ = new PackingPlan("proposed", proposedContainerPlan, new Resource(0.5, 1, 2));
+
+ PackingPlans.PackingPlan currentProtoPlan = serializer.toProto(currentPacking);
+ PackingPlans.PackingPlan proposedProtoPlan = serializer.toProto(proposedPacking);
+
+ SchedulerStateManagerAdaptor mockStateMgr = Mockito.mock(SchedulerStateManagerAdaptor.class);
+ Config mockRuntime = Mockito.mock(Config.class);
+ Mockito.when(mockRuntime.get(Keys.schedulerStateManagerAdaptor())).thenReturn(mockStateMgr);
+
+ IScalable mockScheduler = Mockito.mock(IScalable.class);
+
+ UpdateTopologyManager updateManager
+ = new UpdateTopologyManager(mockRuntime, Optional.of(mockScheduler));
+ UpdateTopologyManager spyUpdateManager = Mockito.spy(updateManager);
+
+ Mockito.doNothing().when(spyUpdateManager)
+ .validateCurrentPackingPlan(currentProtoPlan, null, mockStateMgr);
+ Mockito.doReturn(null).when(spyUpdateManager).
+ getUpdatedTopology(null, proposedPacking, mockStateMgr);
+
+ spyUpdateManager.updateTopology(currentProtoPlan, proposedProtoPlan);
+
+ Mockito.verify(mockScheduler).addContainers(expectedContainersToAdd);
+ Mockito.verify(mockScheduler).removeContainers(expectedContainersToRemove);
+ }
+
+ @Test
+ public void testUpdateTopology() {
+ Map bolts = new HashMap<>();
+ bolts.put("bolt1", 1);
+ bolts.put("bolt7", 7);
+
+ Map spouts = new HashMap<>();
+ spouts.put("spout3", 3);
+ spouts.put("spout5", 5);
+
+ TopologyAPI.Topology topology = TopologyTests.createTopology(
+ "test", new com.twitter.heron.api.Config(), spouts, bolts);
+
+ // assert that the initial config settings are as expected
+ assertParallelism(topology, spouts, bolts);
+
+ Map boltUpdates = new HashMap<>();
+ boltUpdates.put("bolt1", 3);
+ boltUpdates.put("bolt7", 2);
+
+ Map spoutUpdates = new HashMap<>();
+ spoutUpdates.put("spout3", 8);
+
+ Map updates = new HashMap<>();
+ updates.putAll(boltUpdates);
+ updates.putAll(spoutUpdates);
+
+ // assert that the updated topology config settings are as expected
+ topology = UpdateTopologyManager.mergeTopology(topology, updates);
+ bolts.putAll(boltUpdates);
+ spouts.putAll(spoutUpdates);
+
+ assertParallelism(topology, spouts, bolts);
+ }
+
+ private void assertParallelism(TopologyAPI.Topology topology,
+ Map expectedSouts,
+ Map expectedBolts) {
+ for (String boltName : expectedBolts.keySet()) {
+ String foundParallelism = null;
+ for (TopologyAPI.Bolt bolt : topology.getBoltsList()) {
+ foundParallelism = getParallelism(bolt.getComp(), boltName);
+ if (foundParallelism != null) {
+ break;
+ }
+ }
+ Assert.assertEquals(Integer.toString(expectedBolts.get(boltName)), foundParallelism);
+ }
+
+ for (String spoutName : expectedSouts.keySet()) {
+ String foundParallelism = null;
+ for (TopologyAPI.Spout spout : topology.getSpoutsList()) {
+ foundParallelism = getParallelism(spout.getComp(), spoutName);
+ if (foundParallelism != null) {
+ break;
+ }
+ }
+ Assert.assertEquals(Integer.toString(expectedSouts.get(spoutName)), foundParallelism);
+ }
+ }
+
+ private static String getParallelism(TopologyAPI.Component component, String componentName) {
+ if (component.getName().equals(componentName)) {
+ for (TopologyAPI.Config.KeyValue keyValue : component.getConfig().getKvsList()) {
+ if (keyValue.getKey().equals(com.twitter.heron.api.Config.TOPOLOGY_COMPONENT_PARALLELISM)) {
+ return keyValue.getValue();
+ }
+ }
+ }
+ return null;
+ }
+
+ private static Set buildContainerSet(String... containerIds) {
+ Set containerPlan = new HashSet<>();
+ for (String containerId : containerIds) {
+ containerPlan.add(PackingTestUtils.testContainerPlan(containerId));
+ }
+ return containerPlan;
+ }
+}
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index efb0068305d..5bcd215a63a 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -17,27 +17,33 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
+import com.twitter.heron.scheduler.UpdateTopologyManager;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
+import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.scheduler.IScheduler;
import com.twitter.heron.spi.utils.Runtime;
import com.twitter.heron.spi.utils.SchedulerUtils;
import com.twitter.heron.spi.utils.ShellUtils;
-public class LocalScheduler implements IScheduler {
+public class LocalScheduler implements IScheduler, IScalable {
private static final Logger LOG = Logger.getLogger(LocalScheduler.class.getName());
// executor service for monitoring all the containers
private final ExecutorService monitorService = Executors.newCachedThreadPool();
@@ -45,6 +51,7 @@ public class LocalScheduler implements IScheduler {
private final Map processToContainer = new ConcurrentHashMap<>();
private Config config;
private Config runtime;
+ private UpdateTopologyManager updateTopologyManager;
// has the topology been killed?
private volatile boolean isTopologyKilled = false;
@@ -52,6 +59,7 @@ public class LocalScheduler implements IScheduler {
public void initialize(Config mConfig, Config mRuntime) {
this.config = mConfig;
this.runtime = mRuntime;
+ this.updateTopologyManager = new UpdateTopologyManager(runtime, Optional.of(this));
}
@Override
@@ -66,6 +74,7 @@ public void close() {
/**
* Start executor process via running an async shell process
*/
+ @VisibleForTesting
protected Process startExecutorProcess(int container) {
return ShellUtils.runASyncProcess(true,
getExecutorCommand(container),
@@ -76,6 +85,7 @@ protected Process startExecutorProcess(int container) {
/**
* Start the executor for the given container
*/
+ @VisibleForTesting
protected void startExecutor(final int container) {
LOG.info("Starting a new executor for container: " + container);
@@ -93,6 +103,7 @@ protected void startExecutor(final int container) {
/**
* Start the monitor of a given executor
*/
+ @VisibleForTesting
protected void startExecutorMonitor(final int container, final Process containerExecutor) {
// add the container for monitoring
Runnable r = new Runnable() {
@@ -124,7 +135,7 @@ public void run() {
monitorService.submit(r);
}
- protected String[] getExecutorCommand(int container) {
+ private String[] getExecutorCommand(int container) {
List freePorts = new ArrayList<>(SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR);
for (int i = 0; i < SchedulerUtils.PORTS_REQUIRED_FOR_EXECUTOR; i++) {
freePorts.add(SysUtils.getFreePort());
@@ -236,10 +247,61 @@ public boolean onRestart(Scheduler.RestartTopologyRequest request) {
@Override
public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
- LOG.severe("Topology onUpdate not implemented by this scheduler.");
- return false;
+ try {
+ updateTopologyManager.updateTopology(
+ request.getCurrentPackingPlan(), request.getProposedPackingPlan());
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void addContainers(Set containers) {
+ synchronized (processToContainer) {
+ int activeContainerCount = processToContainer.size();
+
+ for (int i = 0; i < containers.size(); i++) {
+ // if number of active container is 2, then there is 1 TMaster container (id=0) and 1 worker
+ // (id = 1). Then the next container to be added will have id = 2, same as current container
+ // count
+ startExecutor(activeContainerCount + i);
+ }
+ }
}
+ @Override
+ public void removeContainers(Set containersToRemove) {
+ LOG.log(Level.INFO,
+ "Kill {0} of {1} containers",
+ new Object[]{containersToRemove.size(), processToContainer.size()});
+
+ synchronized (processToContainer) {
+ // Create a inverse map to be able to get process instance from container id
+ Map containerToProcessMap = new HashMap<>();
+ for (Map.Entry entry : processToContainer.entrySet()) {
+ containerToProcessMap.put(entry.getValue(), entry.getKey());
+ }
+
+ for (PackingPlan.ContainerPlan containerToRemove : containersToRemove) {
+ String containerId = containerToRemove.getId();
+ Process process = containerToProcessMap.get(Integer.valueOf(containerId));
+ if (process == null) {
+ LOG.log(Level.WARNING, "Container for id:{0} not found.", containerId);
+ continue;
+ }
+
+ // remove the process so that it is not monitored and relaunched
+ LOG.info("Killing executor for container: " + containerId);
+ processToContainer.remove(process);
+ process.destroy();
+ LOG.info("Killed executor for container: " + containerId);
+ }
+ }
+ }
+
+ @VisibleForTesting
boolean isTopologyKilled() {
return isTopologyKilled;
}
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index aec30176f73..0e9af07e9f3 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -14,6 +14,8 @@
package com.twitter.heron.scheduler.local;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -28,22 +30,25 @@
import com.twitter.heron.spi.common.ConfigKeys;
import com.twitter.heron.spi.common.Keys;
import com.twitter.heron.spi.packing.PackingPlan;
+import com.twitter.heron.spi.utils.PackingTestUtils;
public class LocalSchedulerTest {
private static final String TOPOLOGY_NAME = "testTopology";
- private static final long NUM_CONTAINER = 2;
+ private static final int NUM_CONTAINER = 2;
private static final int MAX_WAITING_SECOND = 10;
private static LocalScheduler scheduler;
+ private Config config;
+ private Config runtime;
@Before
public void before() throws Exception {
scheduler = Mockito.spy(LocalScheduler.class);
- Config config = Mockito.mock(Config.class);
+ config = Mockito.mock(Config.class);
Mockito.when(config.getStringValue(ConfigKeys.get("TOPOLOGY_NAME"))).thenReturn(TOPOLOGY_NAME);
- Config runtime = Mockito.mock(Config.class);
- Mockito.when(runtime.getLongValue(Keys.numContainers())).thenReturn(NUM_CONTAINER);
+ runtime = Mockito.mock(Config.class);
+ Mockito.when(runtime.getLongValue(Keys.numContainers())).thenReturn(NUM_CONTAINER * 1L);
scheduler.initialize(config, runtime);
}
@@ -84,6 +89,95 @@ public void testOnSchedule() throws Exception {
}
}
+ @Test
+ public void testAddContainer() throws Exception {
+ Mockito.when(runtime.getLongValue(Keys.numContainers())).thenReturn(2L);
+ scheduler.initialize(config, runtime);
+
+ //verify plan is deployed and containers are created
+ Mockito.doNothing().
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+ Process mockProcessTM = Mockito.mock(Process.class);
+ Mockito.doReturn(mockProcessTM).when(scheduler).startExecutorProcess(0);
+
+ Process mockProcessWorker1 = Mockito.mock(Process.class);
+ Mockito.doReturn(mockProcessWorker1).when(scheduler).startExecutorProcess(1);
+
+ PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
+ Assert.assertTrue(scheduler.onSchedule(packingPlan));
+
+ Mockito.verify(scheduler, Mockito.times(NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+
+ //now verify add container adds new container
+ Process mockProcessWorker2 = Mockito.mock(Process.class);
+ Mockito.doReturn(mockProcessWorker2).when(scheduler).startExecutorProcess(2);
+ Set containers = new HashSet<>();
+ Set spyContainers = Mockito.spy(containers);
+
+ Mockito.doReturn(1).when(spyContainers).size();
+ scheduler.addContainers(spyContainers);
+ Mockito.verify(scheduler).startExecutor(NUM_CONTAINER);
+
+ Process mockProcess = Mockito.mock(Process.class);
+ Mockito.doReturn(mockProcess).when(scheduler).startExecutorProcess(Mockito.anyInt());
+ Mockito.doReturn(2).when(spyContainers).size();
+ scheduler.addContainers(spyContainers);
+ Mockito.verify(scheduler).startExecutor(NUM_CONTAINER + 1);
+ Mockito.verify(scheduler).startExecutor(NUM_CONTAINER + 2);
+ }
+
+ /**
+ * Verify containers can be removed by Local Scheduler
+ */
+ @Test
+ public void testRemoveContainer() throws Exception {
+ final int LOCAL_NUM_CONTAINER = 6;
+
+ Mockito.when(runtime.getLongValue(Keys.numContainers())).thenReturn(LOCAL_NUM_CONTAINER * 1L);
+ scheduler.initialize(config, runtime);
+
+ //verify plan is deployed and containers are created
+ Mockito.doNothing().
+ when(scheduler).startExecutorMonitor(Mockito.anyInt(), Mockito.any(Process.class));
+
+ Process[] processes = new Process[LOCAL_NUM_CONTAINER];
+ Set existingContainers = new HashSet<>();
+ for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
+ processes[i] = Mockito.mock(Process.class);
+ Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
+ existingContainers.add(PackingTestUtils.testContainerPlan("" + i));
+ }
+
+ PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
+ Assert.assertTrue(scheduler.onSchedule(packingPlan));
+ Assert.assertEquals(LOCAL_NUM_CONTAINER, scheduler.getProcessToContainer().size());
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+
+ Set containersToRemove = new HashSet<>();
+ PackingPlan.ContainerPlan containerToRemove =
+ PackingTestUtils.testContainerPlan("" + (LOCAL_NUM_CONTAINER - 1));
+ containersToRemove.add(containerToRemove);
+ scheduler.removeContainers(containersToRemove);
+ existingContainers.remove(containerToRemove);
+ Assert.assertEquals(existingContainers.size(), scheduler.getProcessToContainer().size());
+ Mockito.verify(processes[LOCAL_NUM_CONTAINER - 1]).destroy();
+ // verify no new process restarts
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+
+ containersToRemove.clear();
+ containersToRemove.add(PackingTestUtils.testContainerPlan("1"));
+ containersToRemove.add(PackingTestUtils.testContainerPlan("2"));
+ scheduler.removeContainers(containersToRemove);
+ existingContainers.remove(PackingTestUtils.testContainerPlan("1"));
+ existingContainers.remove(PackingTestUtils.testContainerPlan("2"));
+
+ Assert.assertEquals(existingContainers.size(), scheduler.getProcessToContainer().size());
+ Mockito.verify(processes[1]).destroy();
+ Mockito.verify(processes[2]).destroy();
+ // verify no new process restarts
+ Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
+ }
+
@Test
public void testOnKill() throws Exception {
Process mockProcess = Mockito.mock(Process.class);
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
index e1f6ba4a243..90ccfe1b81f 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
@@ -14,7 +14,9 @@
package com.twitter.heron.spi.utils;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.proto.system.PackingPlans;
@@ -24,6 +26,7 @@
import com.twitter.heron.spi.packing.IPacking;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
+import com.twitter.heron.spi.packing.Resource;
/**
* Packing utilities for testing
@@ -33,7 +36,7 @@ public final class PackingTestUtils {
private PackingTestUtils() {
}
- public static PackingPlan createTestPackingPlan(String topologyName, IPacking packing) {
+ public static PackingPlan testPackingPlan(String topologyName, IPacking packing) {
Map spouts = new HashMap<>();
spouts.put("testSpout", 2);
@@ -56,10 +59,26 @@ public static PackingPlan createTestPackingPlan(String topologyName, IPacking pa
return packing.pack();
}
- public static PackingPlans.PackingPlan createTestProtoPackingPlan(
+ public static PackingPlans.PackingPlan testProtoPackingPlan(
String topologyName, IPacking packing) {
- PackingPlan plan = createTestPackingPlan(topologyName, packing);
+ PackingPlan plan = testPackingPlan(topologyName, packing);
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
return serializer.toProto(plan);
}
+
+ public static PackingPlan.ContainerPlan testContainerPlan(String containerId) {
+ Resource resource = new Resource(7.5, 6, 9);
+ Set instancePlans = new HashSet<>();
+ for (int index : new Integer[]{0, 1}) {
+ String instanceId = "instance-" + index;
+ String componentName = "componentName-" + index;
+ instancePlans.add(testInstancePlan(instanceId, componentName));
+ }
+ return new PackingPlan.ContainerPlan(containerId, instancePlans, resource);
+ }
+
+ private static PackingPlan.InstancePlan testInstancePlan(String id, String componentName) {
+ Resource resource = new Resource(1.5, 2, 3);
+ return new PackingPlan.InstancePlan(id, componentName, resource);
+ }
}
From 9592f1e6159421e4566e6fcfc7c6a8ef0a738f20 Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Tue, 6 Sep 2016 21:14:25 -0700
Subject: [PATCH 2/6] Update based on master changes to remove Resource and
change containerId to int
---
.../scheduler/UpdateTopologyManagerTest.java | 17 ++++++++---------
.../heron/scheduler/local/LocalScheduler.java | 4 ++--
.../scheduler/local/LocalSchedulerTest.java | 12 ++++++------
.../heron/spi/utils/PackingTestUtils.java | 2 +-
4 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
index 2e438075601..d79b7fbafd2 100644
--- a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -32,7 +32,6 @@
import com.twitter.heron.spi.common.Keys;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
-import com.twitter.heron.spi.packing.Resource;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import com.twitter.heron.spi.utils.PackingTestUtils;
@@ -47,10 +46,10 @@ public class UpdateTopologyManagerTest {
@Before
public void init() {
- currentContainerPlan = buildContainerSet("current-1", "current-2", "current-3", "current-4");
- proposedContainerPlan = buildContainerSet("current-1", "current-3", "new-1", "new-2");
- expectedContainersToAdd = buildContainerSet("new-1", "new-2");
- expectedContainersToRemove = buildContainerSet("current-2", "current-4");
+ currentContainerPlan = buildContainerSet(1, 2, 3, 4);
+ proposedContainerPlan = buildContainerSet(1, 3, 5, 6);
+ expectedContainersToAdd = buildContainerSet(5, 6);
+ expectedContainersToRemove = buildContainerSet(2, 4);
}
@Test
@@ -69,9 +68,9 @@ public void requestsToAddAndRemoveContainers() throws Exception {
PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
PackingPlan currentPacking
- = new PackingPlan("current", currentContainerPlan, new Resource(0.5, 1, 2));
+ = new PackingPlan("current", currentContainerPlan);
PackingPlan proposedPacking
- = new PackingPlan("proposed", proposedContainerPlan, new Resource(0.5, 1, 2));
+ = new PackingPlan("proposed", proposedContainerPlan);
PackingPlans.PackingPlan currentProtoPlan = serializer.toProto(currentPacking);
PackingPlans.PackingPlan proposedProtoPlan = serializer.toProto(proposedPacking);
@@ -169,9 +168,9 @@ private static String getParallelism(TopologyAPI.Component component, String com
return null;
}
- private static Set buildContainerSet(String... containerIds) {
+ private static Set buildContainerSet(int... containerIds) {
Set containerPlan = new HashSet<>();
- for (String containerId : containerIds) {
+ for (int containerId : containerIds) {
containerPlan.add(PackingTestUtils.testContainerPlan(containerId));
}
return containerPlan;
diff --git a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
index 5bcd215a63a..0408e50aee3 100644
--- a/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
+++ b/heron/schedulers/src/java/com/twitter/heron/scheduler/local/LocalScheduler.java
@@ -285,8 +285,8 @@ public void removeContainers(Set containersToRemove)
}
for (PackingPlan.ContainerPlan containerToRemove : containersToRemove) {
- String containerId = containerToRemove.getId();
- Process process = containerToProcessMap.get(Integer.valueOf(containerId));
+ int containerId = containerToRemove.getId();
+ Process process = containerToProcessMap.get(containerId);
if (process == null) {
LOG.log(Level.WARNING, "Container for id:{0} not found.", containerId);
continue;
diff --git a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
index 0e9af07e9f3..98c16368736 100644
--- a/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
+++ b/heron/schedulers/tests/java/com/twitter/heron/scheduler/local/LocalSchedulerTest.java
@@ -145,7 +145,7 @@ public void testRemoveContainer() throws Exception {
for (int i = 0; i < LOCAL_NUM_CONTAINER; i++) {
processes[i] = Mockito.mock(Process.class);
Mockito.doReturn(processes[i]).when(scheduler).startExecutorProcess(i);
- existingContainers.add(PackingTestUtils.testContainerPlan("" + i));
+ existingContainers.add(PackingTestUtils.testContainerPlan(i));
}
PackingPlan packingPlan = Mockito.mock(PackingPlan.class);
@@ -155,7 +155,7 @@ public void testRemoveContainer() throws Exception {
Set containersToRemove = new HashSet<>();
PackingPlan.ContainerPlan containerToRemove =
- PackingTestUtils.testContainerPlan("" + (LOCAL_NUM_CONTAINER - 1));
+ PackingTestUtils.testContainerPlan(LOCAL_NUM_CONTAINER - 1);
containersToRemove.add(containerToRemove);
scheduler.removeContainers(containersToRemove);
existingContainers.remove(containerToRemove);
@@ -165,11 +165,11 @@ public void testRemoveContainer() throws Exception {
Mockito.verify(scheduler, Mockito.times(LOCAL_NUM_CONTAINER)).startExecutor(Mockito.anyInt());
containersToRemove.clear();
- containersToRemove.add(PackingTestUtils.testContainerPlan("1"));
- containersToRemove.add(PackingTestUtils.testContainerPlan("2"));
+ containersToRemove.add(PackingTestUtils.testContainerPlan(1));
+ containersToRemove.add(PackingTestUtils.testContainerPlan(2));
scheduler.removeContainers(containersToRemove);
- existingContainers.remove(PackingTestUtils.testContainerPlan("1"));
- existingContainers.remove(PackingTestUtils.testContainerPlan("2"));
+ existingContainers.remove(PackingTestUtils.testContainerPlan(1));
+ existingContainers.remove(PackingTestUtils.testContainerPlan(2));
Assert.assertEquals(existingContainers.size(), scheduler.getProcessToContainer().size());
Mockito.verify(processes[1]).destroy();
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
index 90ccfe1b81f..7f16e37313b 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
@@ -66,7 +66,7 @@ public static PackingPlans.PackingPlan testProtoPackingPlan(
return serializer.toProto(plan);
}
- public static PackingPlan.ContainerPlan testContainerPlan(String containerId) {
+ public static PackingPlan.ContainerPlan testContainerPlan(int containerId) {
Resource resource = new Resource(7.5, 6, 9);
Set instancePlans = new HashSet<>();
for (int index : new Integer[]{0, 1}) {
From 203390ce7c25c1be6593c4e9f318dac5de0c1f45 Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Wed, 7 Sep 2016 13:42:56 -0700
Subject: [PATCH 3/6] Change ContainerDelta logic to be based on container id,
not container equality
---
.../scheduler/UpdateTopologyManager.java | 30 ++++++++++++++++---
.../scheduler/UpdateTopologyManagerTest.java | 15 ++++++----
.../heron/spi/utils/PackingTestUtils.java | 7 ++++-
3 files changed, 41 insertions(+), 11 deletions(-)
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index 828c5fae042..dc14f04486c 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -132,6 +132,16 @@ void validateCurrentPackingPlan(PackingPlans.PackingPlan existingProtoPackingPla
existingProtoPackingPlan, foundPackingPlan);
}
+ /**
+ * Given both a current and proposed set of containers, determines the set of containers to be
+ * added and those to be removed. Whether to add or remove a container is determined by the id of
+ * the container. Proposed containers with an id not in the existing set are to be added, while
+ * current container ids not in the proposed set are to be removed.
+ *
+ * It is important to note that the container comparison is done by id only, and does not include
+ * the InstancePlans in the container, which for a given container might change in the proposed
+ * plan.
+ */
@VisibleForTesting
static class ContainerDelta {
private final Set containersToAdd;
@@ -140,18 +150,22 @@ static class ContainerDelta {
@VisibleForTesting
ContainerDelta(Set currentContainers,
Set proposedContainers) {
+
+ Set currentContainerIds = toIdSet(currentContainers);
+ Set proposedContainerIds = toIdSet(proposedContainers);
+
Set toAdd = new HashSet<>();
for (PackingPlan.ContainerPlan proposedContainerPlan : proposedContainers) {
- if (!currentContainers.contains(proposedContainerPlan)) {
+ if (!currentContainerIds.contains(proposedContainerPlan.getId())) {
toAdd.add(proposedContainerPlan);
}
}
this.containersToAdd = Collections.unmodifiableSet(toAdd);
Set toRemove = new HashSet<>();
- for (PackingPlan.ContainerPlan curentContainerPlan : currentContainers) {
- if (!proposedContainers.contains(curentContainerPlan)) {
- toRemove.add(curentContainerPlan);
+ for (PackingPlan.ContainerPlan currentContainerPlan : currentContainers) {
+ if (!proposedContainerIds.contains(currentContainerPlan.getId())) {
+ toRemove.add(currentContainerPlan);
}
}
this.containersToRemove = Collections.unmodifiableSet(toRemove);
@@ -237,6 +251,14 @@ private static boolean updateComponent(TopologyAPI.Component.Builder compBuilder
return false;
}
+ private static Set toIdSet(Set containers) {
+ Set currentContainerMap = new HashSet<>();
+ for (PackingPlan.ContainerPlan container : containers) {
+ currentContainerMap.add(container.getId());
+ }
+ return currentContainerMap;
+ }
+
private static void assertTrue(boolean condition, String message, Object... values) {
if (!condition) {
throw new RuntimeException("ERROR: " + String.format(message, values));
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
index d79b7fbafd2..1f2741581c7 100644
--- a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -46,10 +46,12 @@ public class UpdateTopologyManagerTest {
@Before
public void init() {
- currentContainerPlan = buildContainerSet(1, 2, 3, 4);
- proposedContainerPlan = buildContainerSet(1, 3, 5, 6);
- expectedContainersToAdd = buildContainerSet(5, 6);
- expectedContainersToRemove = buildContainerSet(2, 4);
+ Integer[] instanceIndexA = new Integer[] {37, 48, 59};
+ Integer[] instanceIndexB = new Integer[] {17, 22};
+ currentContainerPlan = buildContainerSet(new Integer[] {1, 2, 3, 4}, instanceIndexA);
+ proposedContainerPlan = buildContainerSet(new Integer[] {1, 3, 5, 6}, instanceIndexB);
+ expectedContainersToAdd = buildContainerSet(new Integer[] {5, 6}, instanceIndexB);
+ expectedContainersToRemove = buildContainerSet(new Integer[] {2, 4}, instanceIndexA);
}
@Test
@@ -168,10 +170,11 @@ private static String getParallelism(TopologyAPI.Component component, String com
return null;
}
- private static Set buildContainerSet(int... containerIds) {
+ private static Set buildContainerSet(Integer[] containerIds,
+ Integer[] instanceIndexes) {
Set containerPlan = new HashSet<>();
for (int containerId : containerIds) {
- containerPlan.add(PackingTestUtils.testContainerPlan(containerId));
+ containerPlan.add(PackingTestUtils.testContainerPlan(containerId, instanceIndexes));
}
return containerPlan;
}
diff --git a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
index 7f16e37313b..b4d05f3f530 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/utils/PackingTestUtils.java
@@ -67,9 +67,14 @@ public static PackingPlans.PackingPlan testProtoPackingPlan(
}
public static PackingPlan.ContainerPlan testContainerPlan(int containerId) {
+ return testContainerPlan(containerId, 0, 1);
+ }
+
+ public static PackingPlan.ContainerPlan testContainerPlan(int containerId,
+ Integer... instanceIndices) {
Resource resource = new Resource(7.5, 6, 9);
Set instancePlans = new HashSet<>();
- for (int index : new Integer[]{0, 1}) {
+ for (int index : instanceIndices) {
String instanceId = "instance-" + index;
String componentName = "componentName-" + index;
instancePlans.add(testInstancePlan(instanceId, componentName));
From 2bd2c370d81cc26180b5df8bb5523a02754cabe2 Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Wed, 7 Sep 2016 14:22:30 -0700
Subject: [PATCH 4/6] fix bad javadoc html
---
.../java/com/twitter/heron/scheduler/UpdateTopologyManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index dc14f04486c..de05f79f0a1 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -137,7 +137,7 @@ void validateCurrentPackingPlan(PackingPlans.PackingPlan existingProtoPackingPla
* added and those to be removed. Whether to add or remove a container is determined by the id of
* the container. Proposed containers with an id not in the existing set are to be added, while
* current container ids not in the proposed set are to be removed.
- *
+ *
* It is important to note that the container comparison is done by id only, and does not include
* the InstancePlans in the container, which for a given container might change in the proposed
* plan.
From 53ecc272654967f1128814cf1147b37c766b6c72 Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Wed, 7 Sep 2016 16:49:37 -0700
Subject: [PATCH 5/6] Remove check that existing packing plan hasn't changed
---
.../heron/scheduler/UpdateTopologyManager.java | 14 --------------
.../heron/scheduler/UpdateTopologyManagerTest.java | 2 --
2 files changed, 16 deletions(-)
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index de05f79f0a1..4f760b3ded1 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -81,9 +81,6 @@ public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPl
SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
- // assert found PackingPlan is same as existing PackingPlan.
- validateCurrentPackingPlan(existingProtoPackingPlan, topologyName, stateManager);
-
// fetch the topology, which will need to be updated
TopologyAPI.Topology updatedTopology =
getUpdatedTopology(topologyName, proposedPackingPlan, stateManager);
@@ -121,17 +118,6 @@ TopologyAPI.Topology getUpdatedTopology(String topologyName,
return mergeTopology(updatedTopology, proposedComponentCounts);
}
- @VisibleForTesting
- void validateCurrentPackingPlan(PackingPlans.PackingPlan existingProtoPackingPlan,
- String topologyName,
- SchedulerStateManagerAdaptor stateManager) {
- PackingPlans.PackingPlan foundPackingPlan = stateManager.getPackingPlan(topologyName);
- assertTrue(foundPackingPlan.equals(existingProtoPackingPlan),
- "Existing packing plan received does not equal the packing plan found in the state "
- + "manager. Not updating updatedTopology. Received: %s, Found: %s",
- existingProtoPackingPlan, foundPackingPlan);
- }
-
/**
* Given both a current and proposed set of containers, determines the set of containers to be
* added and those to be removed. Whether to add or remove a container is determined by the id of
diff --git a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
index 1f2741581c7..2a496a2ed77 100644
--- a/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
+++ b/heron/scheduler-core/tests/java/com/twitter/heron/scheduler/UpdateTopologyManagerTest.java
@@ -87,8 +87,6 @@ public void requestsToAddAndRemoveContainers() throws Exception {
= new UpdateTopologyManager(mockRuntime, Optional.of(mockScheduler));
UpdateTopologyManager spyUpdateManager = Mockito.spy(updateManager);
- Mockito.doNothing().when(spyUpdateManager)
- .validateCurrentPackingPlan(currentProtoPlan, null, mockStateMgr);
Mockito.doReturn(null).when(spyUpdateManager).
getUpdatedTopology(null, proposedPacking, mockStateMgr);
From 81038362c448ed4984c847f1a352f42560cda9a5 Mon Sep 17 00:00:00 2001
From: Bill Graham
Date: Thu, 8 Sep 2016 11:41:37 -0700
Subject: [PATCH 6/6] Added comments to ContainerDelta about not supporting
container size changes
---
.../java/com/twitter/heron/scheduler/UpdateTopologyManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index 4f760b3ded1..c0b0b222d1c 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -126,7 +126,7 @@ TopologyAPI.Topology getUpdatedTopology(String topologyName,
*
* It is important to note that the container comparison is done by id only, and does not include
* the InstancePlans in the container, which for a given container might change in the proposed
- * plan.
+ * plan. Changing the size of a container is not supported and will be ignored.
*/
@VisibleForTesting
static class ContainerDelta {