Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Adding LocalScheduler scaling impl with UpdateTopologyManager #1333

Merged
merged 8 commits into from
Sep 8, 2016
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.scheduler;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.protobuf.Descriptors;

import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.proto.system.PackingPlans;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoDeserializer;
import com.twitter.heron.spi.scheduler.IScalable;
import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import com.twitter.heron.spi.utils.Runtime;

/**
* Class that is able to update a topology. This includes changing the parallelism of
* topology components
*/
public class UpdateTopologyManager {
private static final Logger LOG = Logger.getLogger(UpdateTopologyManager.class.getName());

private Config runtime;
private Optional<IScalable> scalableScheduler;
private PackingPlanProtoDeserializer deserializer;

public UpdateTopologyManager(Config runtime, Optional<IScalable> scalableScheduler) {
this.runtime = runtime;
this.scalableScheduler = scalableScheduler;
this.deserializer = new PackingPlanProtoDeserializer();
}

/**
* Scales the topology out or in based on the proposedPackingPlan
*
* @param existingProtoPackingPlan the current plan. If this isn't what's found in the state manager,
* the update will fail
* @param proposedProtoPackingPlan packing plan to change the topology to
*/
public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
final PackingPlans.PackingPlan proposedProtoPackingPlan)
throws ExecutionException, InterruptedException {
String topologyName = Runtime.topologyName(runtime);
PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);

assertTrue(proposedPackingPlan.getContainers().size() > 0,
"proposed packing plan must have at least 1 container %s", proposedPackingPlan);

ContainerDelta containerDelta = new ContainerDelta(
existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
int newContainerCount = containerDelta.getContainersToAdd().size();
int removableContainerCount = containerDelta.getContainersToRemove().size();

String message = String.format("Topology change requires %s new containers and removing %s "
+ "existing containers, but the scheduler does not support scaling, aborting. "
+ "Existing packing plan: %s, proposed packing plan: %s",
newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
assertTrue(newContainerCount + removableContainerCount == 0 || scalableScheduler.isPresent(),
message);

SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);

// assert found PackingPlan is same as existing PackingPlan.
validateCurrentPackingPlan(existingProtoPackingPlan, topologyName, stateManager);

// fetch the topology, which will need to be updated
TopologyAPI.Topology updatedTopology =
getUpdatedTopology(topologyName, proposedPackingPlan, stateManager);

// request new resources if necessary. Once containers are allocated we should make the changes
// to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
if (newContainerCount > 0) {
scalableScheduler.get().addContainers(containerDelta.getContainersToAdd());
}

// update parallelism in updatedTopology since TMaster checks that
// Sum(parallelism) == Sum(instances)
logFine("Deleted existing Topology: %s", stateManager.deleteTopology(topologyName));
logFine("Set new Topology: %s", stateManager.setTopology(updatedTopology, topologyName));

// update packing plan to trigger the scaling event
logFine("Deleted existing packing plan: %s", stateManager.deletePackingPlan(topologyName));
logFine("Set new PackingPlan: %s",
stateManager.setPackingPlan(proposedProtoPackingPlan, topologyName));

// delete the physical plan so TMaster doesn't try to re-establish it on start-up.
logFine("Deleted Physical Plan: %s", stateManager.deletePhysicalPlan(topologyName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past we discussed about issues when multiple components were updating a PackingPlan (Launcher while submitting a topology and Scheduler while updating the topology). In this case, PhysicalPlan is also getting updated my multiple components; RuntimeManager during kill, TMaster during submit and UpdateManager during update. Is it a concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically the launcher (or the TMaster in the case of physical plan) initially writes object and RuntimeManager deletes them on kill. That pattern is somewhat standard.

Where we're deviating is with the Scheduler updating PackingPlan and Topology and deleting PhysicalPlan (TMaster will re-create it). I think this is ok for this iteration, but we might want to consider messaging the TMaster to do these operations as a future enhancement.


if (removableContainerCount > 0) {
scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
}
}

@VisibleForTesting
TopologyAPI.Topology getUpdatedTopology(String topologyName,
PackingPlan proposedPackingPlan,
SchedulerStateManagerAdaptor stateManager) {
TopologyAPI.Topology updatedTopology = stateManager.getTopology(topologyName);
Map<String, Integer> proposedComponentCounts = proposedPackingPlan.getComponentCounts();
return mergeTopology(updatedTopology, proposedComponentCounts);
}

@VisibleForTesting
void validateCurrentPackingPlan(PackingPlans.PackingPlan existingProtoPackingPlan,
String topologyName,
SchedulerStateManagerAdaptor stateManager) {
PackingPlans.PackingPlan foundPackingPlan = stateManager.getPackingPlan(topologyName);
assertTrue(foundPackingPlan.equals(existingProtoPackingPlan),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, in what cases will the two packing plans differ? And if that happens what is the user expected to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will differ if they somehow get our of sync, like during a race when multiple agents are trying to update packing at the same time. This is a poor version of checking though. I'd like to keep this in place for now, but follow up with real transnational support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By multiple agents, do you mean concurrent update requests?
In my opinion this check should be removed. Jut in case scheduler plan is not in sync with persisted plan, scheduler would be able to persist the new desired plan. This check enforces topology restart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. My thought was that if they're not in sync, something is happening that we're unable to reason so unexpected results could occur if we continue. I could be convinced to remove it though, just sharing my thinking when implementing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the check and filed #1353.

"Existing packing plan received does not equal the packing plan found in the state "
+ "manager. Not updating updatedTopology. Received: %s, Found: %s",
existingProtoPackingPlan, foundPackingPlan);
}

@VisibleForTesting
static class ContainerDelta {
private final Set<PackingPlan.ContainerPlan> containersToAdd;
private final Set<PackingPlan.ContainerPlan> containersToRemove;

@VisibleForTesting
ContainerDelta(Set<PackingPlan.ContainerPlan> currentContainers,
Set<PackingPlan.ContainerPlan> proposedContainers) {
Set<PackingPlan.ContainerPlan> toAdd = new HashSet<>();
for (PackingPlan.ContainerPlan proposedContainerPlan : proposedContainers) {
if (!currentContainers.contains(proposedContainerPlan)) {
toAdd.add(proposedContainerPlan);
}
}
this.containersToAdd = Collections.unmodifiableSet(toAdd);

Set<PackingPlan.ContainerPlan> toRemove = new HashSet<>();
for (PackingPlan.ContainerPlan curentContainerPlan : currentContainers) {
if (!proposedContainers.contains(curentContainerPlan)) {
toRemove.add(curentContainerPlan);
}
}
this.containersToRemove = Collections.unmodifiableSet(toRemove);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. Do we remove a container if its instances do not exactly match the previous ones? What happens if the current container contains instances 1,2,3 and the proposed container contains 1,2,4? Do we remove this container and re-allocate it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @avflor.
My impression is that scheduler will allocate new containers for executorIds not present in the current plan and remove container's for executorIds not present in new packing plan. Moreover this logic can result in adding an existing container and then removing it in the updateTopology method. In ContainerDelta() method, comparison should be based on container ID only and not based on container instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Updating now to use container ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

@VisibleForTesting
Set<PackingPlan.ContainerPlan> getContainersToRemove() {
return containersToRemove;
}

@VisibleForTesting
Set<PackingPlan.ContainerPlan> getContainersToAdd() {
return containersToAdd;
}
}

/**
* For each of the components with changed parallelism we need to update the Topology configs to
* represent the change.
*/
@VisibleForTesting
static TopologyAPI.Topology mergeTopology(TopologyAPI.Topology topology,
Map<String, Integer> proposedComponentCounts) {
TopologyAPI.Topology.Builder builder = TopologyAPI.Topology.newBuilder().mergeFrom(topology);
for (String componentName : proposedComponentCounts.keySet()) {
Integer parallelism = proposedComponentCounts.get(componentName);

boolean updated = false;
for (TopologyAPI.Bolt.Builder boltBuilder : builder.getBoltsBuilderList()) {
if (updateComponent(boltBuilder.getCompBuilder(), componentName, parallelism)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateComponent can return false. Should merge operation fail if that happens? false case should not happen so I think the case could be treated as an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateComponent will return false under normal circumstances (see the updateComponent javadocs). Because in the protobuf world Bolt and Spout are both Components, but without inheritance, we have to try to update each to see if we have a match. We use true/false so if we get a match on bolt we don't try again on spout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point. This looks fine now. Just wondering if there is a way to simplify the logic here? For e.g. by deserializing the topology protobuf object and then updating it?

Copy link
Contributor Author

@billonahill billonahill Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the protobuf Topology object is indeed clunky, due to the protobuf api, but I thing it's safer and easier (and more efficient) than going through a deserialization/serialization process, and the risk of ending up with a different Topology object than we started with. It's fairly large with nested config hanging from all components.

updated = true;
break;
}
}

if (!updated) {
for (TopologyAPI.Spout.Builder spoutBuilder : builder.getSpoutsBuilderList()) {
if (updateComponent(spoutBuilder.getCompBuilder(), componentName, parallelism)) {
break;
}
}
}
}

return builder.build();
}

/**
* Go through all fields in the component builder until one is found where "name=componentName".
* When found, go through the confs in the conf builder to find the parallelism field and update
* that.
*
* @return true if the component was found and updated, which might not always happen. For example
* if the component is a spout, it won't be found if a bolt builder is passed.
*/
private static boolean updateComponent(TopologyAPI.Component.Builder compBuilder,
String componentName,
int parallelism) {
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry
: compBuilder.getAllFields().entrySet()) {
if (entry.getKey().getName().equals("name") && componentName.equals(entry.getValue())) {
TopologyAPI.Config.Builder confBuilder = compBuilder.getConfigBuilder();
boolean keyFound = false;
// no way to get a KeyValue builder with a get(key) so we have to iterate until found.
for (TopologyAPI.Config.KeyValue.Builder kvBuilder : confBuilder.getKvsBuilderList()) {
if (kvBuilder.getKey().equals(
com.twitter.heron.api.Config.TOPOLOGY_COMPONENT_PARALLELISM)) {
kvBuilder.setValue(Integer.toString(parallelism));
keyFound = true;
break;
}
}
if (!keyFound) {
TopologyAPI.Config.KeyValue.Builder kvBuilder =
TopologyAPI.Config.KeyValue.newBuilder();
kvBuilder.setKey(com.twitter.heron.api.Config.TOPOLOGY_COMPONENT_PARALLELISM);
kvBuilder.setValue(Integer.toString(parallelism));
confBuilder.addKvs(kvBuilder);
}
return true;
}
}
return false;
}

private static void assertTrue(boolean condition, String message, Object... values) {
if (!condition) {
throw new RuntimeException("ERROR: " + String.format(message, values));
}
}

private static void logFine(String format, Object... values) {
LOG.fine(String.format(format, values));
}
}
1 change: 1 addition & 0 deletions heron/scheduler-core/tests/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ java_tests(
"com.twitter.heron.scheduler.RuntimeManagerMainTest",
"com.twitter.heron.scheduler.SubmitterMainTest",
"com.twitter.heron.scheduler.SchedulerMainTest",
"com.twitter.heron.scheduler.UpdateTopologyManagerTest",
"com.twitter.heron.scheduler.server.SchedulerServerTest",
"com.twitter.heron.scheduler.client.LibrarySchedulerClientTest",
"com.twitter.heron.scheduler.client.HttpServiceSchedulerClientTest",
Expand Down
Loading