diff --git a/heron/packing/src/java/com/twitter/heron/packing/roundrobin/ResourceCompliantRRPacking.java b/heron/packing/src/java/com/twitter/heron/packing/roundrobin/ResourceCompliantRRPacking.java index eef8bbe5b8a..2f969649826 100644 --- a/heron/packing/src/java/com/twitter/heron/packing/roundrobin/ResourceCompliantRRPacking.java +++ b/heron/packing/src/java/com/twitter/heron/packing/roundrobin/ResourceCompliantRRPacking.java @@ -91,23 +91,16 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking { private Resource defaultInstanceResources; private int numContainers; - private int numAdjustments; //ContainerId to examine next. It is set to 1 when the //algorithm restarts with a new number of containers private int containerId; - private void adjustNumContainers(int additionalContainers) { - increaseNumContainers(additionalContainers); - this.numAdjustments++; - } - private void increaseNumContainers(int additionalContainers) { this.numContainers += additionalContainers; } - private void resetState() { + private void resetToFirstContainer() { this.containerId = 1; - this.numAdjustments = 0; } private int nextContainerId(int afterId) { @@ -122,7 +115,7 @@ public void initialize(Config config, TopologyAPI.Topology inputTopology) { Context.instanceCpu(config), Context.instanceRam(config), Context.instanceDisk(config)); - resetState(); + resetToFirstContainer(); } private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan) { @@ -155,8 +148,7 @@ private PackingPlanBuilder newPackingPlanBuilder(PackingPlan existingPackingPlan @Override public PackingPlan pack() { - int adjustments = this.numAdjustments; - while (adjustments <= this.numAdjustments) { + while (true) { try { PackingPlanBuilder planBuilder = newPackingPlanBuilder(null); planBuilder.updateNumContainers(numContainers); @@ -169,13 +161,10 @@ public PackingPlan pack() { LOG.info(String.format( "%s Increasing the number of containers to %s and attempting to place again.", e.getMessage(), this.numContainers + 1)); - adjustNumContainers(1); - containerId = 1; - adjustments++; + increaseNumContainers(1); + resetToFirstContainer(); } } - throw new PackingException( - String.format("Could not pack all instances into %d containers ", this.numContainers)); } /** @@ -185,7 +174,7 @@ public PackingPlan pack() { */ public PackingPlan repack(PackingPlan currentPackingPlan, Map componentChanges) { this.numContainers = currentPackingPlan.getContainers().size(); - resetState(); + resetToFirstContainer(); int additionalContainers = computeNumAdditionalContainers(componentChanges, currentPackingPlan); increaseNumContainers(additionalContainers); @@ -193,8 +182,7 @@ public PackingPlan repack(PackingPlan currentPackingPlan, Map c "Allocated %s additional containers for repack bring the number of containers to %s.", additionalContainers, this.numContainers)); - int adjustments = 0; - while (adjustments <= this.numAdjustments) { + while (true) { try { PackingPlanBuilder planBuilder = newPackingPlanBuilder(currentPackingPlan); planBuilder.updateNumContainers(numContainers); @@ -205,16 +193,13 @@ public PackingPlan repack(PackingPlan currentPackingPlan, Map c } catch (ResourceExceededException e) { //Not enough containers. Adjust the number of containers. - adjustNumContainers(1); - containerId = 1; - adjustments++; + increaseNumContainers(1); + resetToFirstContainer(); LOG.info(String.format( "Increasing the number of containers to %s and attempting packing again.", this.numContainers)); } } - throw new PackingException( - String.format("Could not repack all instances into %d containers ", this.numContainers)); } @Override @@ -269,12 +254,12 @@ private PackingPlanBuilder getResourceCompliantRRAllocation( PackingUtils.getComponentsToScale(componentChanges, PackingUtils.ScalingDirection.UP); if (!componentsToScaleDown.isEmpty()) { - this.containerId = 1; + resetToFirstContainer(); removeInstancesFromContainers(planBuilder, componentsToScaleDown); } if (!componentsToScaleUp.isEmpty()) { - this.containerId = 1; + resetToFirstContainer(); int maxInstanceIndex = 0; for (PackingPlan.ContainerPlan containerPlan : currentPackingPlan.getContainers()) { for (PackingPlan.InstancePlan instancePlan : containerPlan.getInstances()) { @@ -310,16 +295,15 @@ private void assignInstancesToContainers(PackingPlanBuilder planBuilder, } /** - * Performs a RR placement. If the placement cannot be performed on the existing number of containers - * then it will request for an increase in the number of containers + * Attempts to place the instance the current containerId. * * @param planBuilder packing plan builder * @param instanceId the instance that needs to be placed in the container + * @throws ResourceExceededException if there is no room on the current container for the instance */ private void strictRRpolicy(PackingPlanBuilder planBuilder, InstanceId instanceId) throws ResourceExceededException { - planBuilder.addInstance(containerId, instanceId); - containerId = nextContainerId(containerId); + addInstance(planBuilder, instanceId, this.containerId); } /** @@ -335,15 +319,14 @@ private void flexibleRRpolicy(PackingPlanBuilder planBuilder, //If there is not enough space on containerId look at other containers in a RR fashion // starting from containerId. boolean containersChecked = false; - int currentContainer = containerId; + int currentContainer = this.containerId; while (!containersChecked) { try { - planBuilder.addInstance(currentContainer, instanceId); - containerId = nextContainerId(currentContainer); + addInstance(planBuilder, instanceId, currentContainer); return; } catch (ResourceExceededException e) { currentContainer = nextContainerId(currentContainer); - if (currentContainer == containerId) { + if (currentContainer == this.containerId) { containersChecked = true; } } @@ -355,6 +338,18 @@ private void flexibleRRpolicy(PackingPlanBuilder planBuilder, instanceId, numContainers)); } + /** + * Adds an instance to the container. If successful, increments this.containerId, else throws a + * ResourceExceededException + * @throws ResourceExceededException if the instance can't be placed in the container + */ + private void addInstance(PackingPlanBuilder planBuilder, + InstanceId instanceId, + int toContainerId) throws ResourceExceededException { + planBuilder.addInstance(toContainerId, instanceId); + this.containerId = nextContainerId(toContainerId); + } + /** * Removes instances from containers during scaling down * @@ -381,11 +376,11 @@ private void removeRRInstance(PackingPlanBuilder packingPlanBuilder, int currentContainer = this.containerId; while (!containersChecked) { if (packingPlanBuilder.removeInstance(currentContainer, component)) { - containerId = nextContainerId(currentContainer); + this.containerId = nextContainerId(currentContainer); return; } currentContainer = nextContainerId(currentContainer); - if (currentContainer == containerId) { + if (currentContainer == this.containerId) { containersChecked = true; } }