Skip to content

Commit

Permalink
Update tests to be in sync with removal of RotatingMap for internal o…
Browse files Browse the repository at this point in the history
…ffer storage.
  • Loading branch information
Jessica Hartog committed Jun 6, 2017
1 parent d21d929 commit e2f960b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 81 deletions.
3 changes: 1 addition & 2 deletions storm/src/test/storm/mesos/MesosCommonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import storm.mesos.resources.ReservationType;
import storm.mesos.resources.ResourceType;
import storm.mesos.util.MesosCommon;
import storm.mesos.util.RotatingMap;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -338,7 +337,7 @@ public void testExecutorMem() throws Exception {

@Test
public void aggregatedOffersPerNode() {
RotatingMap<Protos.OfferID, Protos.Offer> r = new RotatingMap(2);
Map<Protos.OfferID, Protos.Offer> r = new HashMap<Protos.OfferID, Protos.Offer>();
Protos.Offer offer = TestUtils.buildOffer("0-1", "h1", 0, 0);
r.put(offer.getId(), offer);
offer = TestUtils.buildOffer("0-2", "h1", 10, 1000);
Expand Down
61 changes: 30 additions & 31 deletions storm/src/test/storm/mesos/MesosNimbusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import storm.mesos.resources.AggregatedOffers;
import storm.mesos.resources.ResourceType;
import storm.mesos.util.MesosCommon;
import storm.mesos.util.RotatingMap;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -47,7 +46,7 @@ public class MesosNimbusTest {
public static final Integer DEFAULT_BUCKET_COUNT = 2;
public static final String FRAMEWORK_ROLE = "staas";

private RotatingMap<OfferID, Offer> rotatingMap = null;
private Map<OfferID, Offer> map = null;
Map<String, Collection<WorkerSlot>> slotsForTopologiesNeedingAssignments = null;
MesosNimbus mesosNimbus = null;
Map<String, String> mesosStormConf;
Expand Down Expand Up @@ -199,7 +198,7 @@ private Map<String, List<Protos.TaskInfo>> getTopologyIDtoTaskInfoMap(List<Proto

@Before
public void initialize() {
rotatingMap = new RotatingMap<>(DEFAULT_BUCKET_COUNT);
map = new HashMap<OfferID, Offer>();
slotsForTopologiesNeedingAssignments = new HashMap<>();

mesosStormConf = new HashMap<>();
Expand All @@ -222,7 +221,7 @@ public void testGetTasksToLaunchWhenNoTopologiesNeedAssignments() {

Topologies topologies = new Topologies(topologyDetailsMap);

Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
Map<String, Collection<WorkerSlot>> workerSlotsMap = new HashMap<>();

Map<String,List<Protos.TaskInfo>> tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
Expand All @@ -244,9 +243,9 @@ public void testGetTasksToLaunchForOneTopologyWithOneOffer() {

// One offer with sufficient resources
Offer offer = TestUtils.buildOfferWithPorts("O-1", "h1", 24, 40000, 3100, 3200);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
Map<String, Collection<WorkerSlot>> workerSlotsMap = new HashMap<>();
Collection<WorkerSlot> workerSlots = new ArrayList<>();
workerSlots.add(new WorkerSlot("h1", 3100));
Expand All @@ -259,9 +258,9 @@ public void testGetTasksToLaunchForOneTopologyWithOneOffer() {

// One offer with sufficient resources spread across reserved and unreserved resources
offer = TestUtils.buildOfferWithReservationAndPorts("O-1", "h1", 0.75, 750, 0.75, 850, 3100, 3101);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
workerSlotsMap = new HashMap<>();
workerSlots = new ArrayList<>();
workerSlots.add(new WorkerSlot("h1", 3100));
Expand All @@ -279,9 +278,9 @@ public void testGetTasksToLaunchForOneTopologyWithOneOffer() {

// One offer with only reserved resources
offer = TestUtils.buildOfferWithReservationAndPorts("O-1", "h1", 0, 0, 1.5, 1600, 3100, 3101);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
workerSlotsMap = new HashMap<>();
workerSlots = new ArrayList<>();
workerSlots.add(new WorkerSlot("h1", 3100));
Expand All @@ -298,36 +297,36 @@ public void testGetTasksToLaunchForOneTopologyWithOneOffer() {

// Offer with Insufficient cpu
offer = TestUtils.buildOfferWithPorts("O-1", "h1", 0, 40000, 3100, 3200);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);

tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
assertTrue(tasksToLaunch.isEmpty());

// Offer with Insufficient Mem for both executor and worker combined
offer = TestUtils.buildOfferWithPorts("O-1", "h1", 24, 900, 3100, 3200);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);

tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
assertTrue(tasksToLaunch.isEmpty());

// Offer with Insufficient Mem for executor
offer = TestUtils.buildOfferWithPorts("O-1", "h1", 24, 1400, 3100, 3200);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);

tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
assertTrue(tasksToLaunch.isEmpty());

// One offer with Insufficient ports
offer = TestUtils.buildOffer("O-1", "h1", 24, 4000);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);

tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
assertTrue(tasksToLaunch.isEmpty());
Expand All @@ -345,13 +344,13 @@ public void testGetTasksToLaunchForOneTopologyWithMultipleOffersOnSameHost() {
Topologies topologies = new Topologies(topologyDetailsMap);

Offer offer = TestUtils.buildOffer("O-1", "h1", 0, 40000);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOffer("O-2", "h1", 24, 0);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOfferWithPorts("O-3", "h1", 0, 0, 3100, 3200);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
Map<String, Collection<WorkerSlot>> workerSlotsMap = new HashMap<>();
Collection<WorkerSlot> workerSlots = new ArrayList<>();
workerSlots.add(new WorkerSlot("h1", 3100));
Expand All @@ -363,7 +362,7 @@ public void testGetTasksToLaunchForOneTopologyWithMultipleOffersOnSameHost() {
assertTrue(hasResources("*", taskInfoList.get(0), MesosCommon.DEFAULT_WORKER_CPU, MesosCommon.DEFAULT_WORKER_MEM_MB, 3100l));
assertTrue(hasCorrectExecutorResources(taskInfoList));

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
workerSlots.add(new WorkerSlot("h1", 3101));
workerSlots.add(new WorkerSlot("h1", 3102));
tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
Expand All @@ -388,7 +387,7 @@ public void testGetTasksToLaunchForOneTopologyWithMultipleOffersOnSameHost() {
topologyDetailsMap.put("t2", t2);
topologies = new Topologies(topologyDetailsMap);

aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);

tasksToLaunch = mesosNimbus.getTasksToLaunch(topologies, workerSlotsMap, aggregatedOffersPerNode);
Map<String, List<Protos.TaskInfo>> topologyIDtoTaskInfoMap = getTopologyIDtoTaskInfoMap(tasksToLaunch.get("h1"));
Expand Down Expand Up @@ -422,20 +421,20 @@ public void testGetTasksToLaunchForOneTopologyWithMultipleOffersAcrossMultipleHo
Topologies topologies = new Topologies(topologyDetailsMap);

Offer offer = TestUtils.buildOffer("O-H1-1", "h1", 0, 4000);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOffer("O-H1-2", "h1", 3.21, 0);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOfferWithPorts("O-H1-3", "h1", 0, 0, 3100, 3102);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

offer = TestUtils.buildOffer("O-H2-1", "h2", 0, 4000);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOffer("O-H2-2", "h2", 3.21, 0);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);
offer = TestUtils.buildOfferWithPorts("O-H2-3", "h2", 0, 0, 3100, 3102);
rotatingMap.put(offer.getId(), offer);
map.put(offer.getId(), offer);

Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(rotatingMap);
Map<String, AggregatedOffers> aggregatedOffersPerNode = MesosCommon.getAggregatedOffersPerNode(map);
Map<String, Collection<WorkerSlot>> workerSlotsMap = new HashMap<>();
Map<String, List<Protos.TaskInfo>> tasksToLaunch = new HashMap<>();

Expand Down
Loading

0 comments on commit e2f960b

Please sign in to comment.