Skip to content

Commit

Permalink
#501: add memory constraints to problem
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 27, 2024
1 parent 32c404b commit 0a2238c
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 61 deletions.
1 change: 1 addition & 0 deletions config/work-stealing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ algorithm:
discretion_interval: 0.010
steal_time: 0.2
num_experiments: 10
max_memory_usage: 5.0e+9

# Specify output
output_dir: ../output
Expand Down
207 changes: 146 additions & 61 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import simpy
import random
from logging import Logger
Expand Down Expand Up @@ -44,10 +45,14 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):
self.rank = self.algorithm.ranks[self.rank_id]
self.pending_steal_request = False
self.other_ranks = [r_id for r_id in self.algorithm.ranks.keys() if r_id != self.rank_id]
self.running = True

# Initialize memory
self.rank_memory = 0.

# Initialize current cluster (if a rank is currently working through a cluster)
self.current_cluster = None
self.new_clusters = False

# Output initial information
self.__logger.info(f" Rank {self.rank_id} Initial Info: work={self.__get_total_work()}, n_tasks={self.rank.get_number_of_objects()}, n_clusters={self.rank.get_number_of_shared_blocks()}")
Expand All @@ -61,16 +66,20 @@ def run(self):
if self.current_cluster is not None:

# Execute all tasks on the cluster
self.rank_memory += self.current_cluster[0].get_shared_block().get_size()
for task in self.current_cluster:
yield self.env.process(self.__simulate_task(task))

# After each task, check if there is a steal request (to prevent hang ups) -- TODO: is this necessary
# After each task, check if there is a steal request (to prevent hang ups)
if self.algorithm.do_stealing:
self.__check_for_steal_requests()

# Once all tasks are executed, reset the current cluster
self.current_cluster = None

# TODO: how long?

Check warning on line 80 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: how long? (fixme)
yield self.env.timeout(self.algorithm.steal_time)

# Check if there is anything in queue
elif self.algorithm.has_work_in_deque(self.rank, including_steals=True):

Expand All @@ -79,44 +88,99 @@ def run(self):

# If item is a cluster, move to self.current_cluster (will be executed next)
if isinstance(item, list):
self.current_cluster = item

# Make sure there is memory on the rank to execute the cluster
if self.algorithm.rank_has_memory_for_cluster(self.rank, item):
self.current_cluster = item

# Otherwise, move to self.algorithm.memory_intensive_clusters so other ranks can "steal"
else:
if self.algorithm.do_stealing:
print(f"Rank {self.rank_id} (memory: {self.rank_memory}) moving cluster ({item[0].get_shared_block().get_size()}) to memory intensive clusters")
self.algorithm.memory_intensive_clusters.append(item)

# Then reset all other workers' new_clusters flag
for r_id in self.other_ranks:
self.algorithm.workers[r_id].new_clusters = True

# If steals are off, exit here
else:
sys.exit(f"Rank {self.rank_id} could not execute all tasks within the memory limit ({self.algorithm.max_memory_usage})")

yield self.env.timeout(0.01)

# If item is a StealRequest, look for clusters to give up
elif isinstance(item, StealRequest):
self.algorithm.respond_to_steal_request(item)
yield self.env.timeout(self.algorithm.steal_time)

# Catch any errors
# Catch any other datatypes
else:
self.__logger.error(f"Received some other datatype: {type(item)}")

# If no work is available, try to request a steal from a random rank
# If no work is available, try to request a steal (either from a random rank or the memory intensive bank)
elif self.algorithm.do_stealing and not self.pending_steal_request:
target_rank_id = random.choice(self.other_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]

if self.algorithm.has_stealable_cluster(target_rank):
self.algorithm.iterate_attempted_steals()
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
idx = self.algorithm.get_index_of_first_non_steal_request(target_rank)
self.algorithm.rank_queues[target_rank_id].insert(idx, steal_request)
self.pending_steal_request = True
yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here (see line 81)

# Check for new memory_intensive clusters
num_intensive_clusters = len(self.algorithm.memory_intensive_clusters)
if num_intensive_clusters > 0 and self.new_clusters:
idx = 0
for i in range(num_intensive_clusters):
cluster = self.algorithm.memory_intensive_clusters.pop(i)
if self.algorithm.rank_has_memory_for_cluster(self.rank, cluster):
self.current_cluster = cluster
break
else:
self.algorithm.memory_intensive_clusters.insert(i, cluster)
idx += 1

# Set new_clusters flag to false if we looped through all available clusters
self.new_clusters = False if idx == num_intensive_clusters else True

yield self.env.timeout(self.algorithm.steal_time) # TODO: is this right?

Check warning on line 140 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: is this right? (fixme)

# If no work is publicly available, steal from a random rank
else:
target_rank_id = random.choice(self.other_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_stealable_cluster(target_rank, requesting_rank):
self.algorithm.iterate_attempted_steals()
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
idx = self.algorithm.get_index_of_first_non_steal_request(target_rank)
self.algorithm.rank_queues[target_rank_id].insert(idx, steal_request)
self.pending_steal_request = True
yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here? (see line 99)

Check warning on line 154 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: are we double counting steal time here? (see line 99) (fixme)
else:
yield self.env.timeout(0.01)

else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
yield self.env.timeout(0.01) # need to yield here -- but for how long?
yield self.env.timeout(0.01) # TODO: need to yield here -- but for how long?

Check warning on line 161 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: need to yield here -- but for how long? (fixme)

def __continue_condition(self):
"""Continue if the rank has clusters in its queue. If stealing is on, also continue if any other ranks have stealable clusters."""
"""Continue if the rank has clusters in its queue. If stealing is on, also continue if any stealable clusters are available."""
# Determine continuing condition
if self.algorithm.do_stealing:
condition = self.__has_work() or (self.algorithm.any_ranks_have_stealable_work())
available_work = self.__has_work() or self.algorithm.any_ranks_have_stealable_work(self.rank)
else:
condition = self.__has_work()
return condition
available_work = self.__has_work()
continue_condition = self.__has_memory() and available_work

# Check if all ranks are done
if not continue_condition:
self.running = False
print(f"Rank {self.rank_id} is done running (memory {self.rank_memory}).")
any_worker_still_running = any(worker.running for worker in self.algorithm.workers)

# Throw error if all ranks are done but there is still work
if not any_worker_still_running and len(self.algorithm.memory_intensive_clusters) > 0:
sys.exit("lbsWorkStealingAlgorithm ran out of rank memory to execute all clusters. Consider increasing the max_memory_usage parameter.")

# Otherwise, return the continue condition
return continue_condition

def __get_total_work(self):
"""Returns the total work on the rank."""
Expand All @@ -131,6 +195,10 @@ def __has_work(self):
"""Returns True if the rank has a cluster either in its queue or in its current_cluster member."""
return self.algorithm.has_work_in_deque(self.rank) or self.current_cluster is not None

def __has_memory(self):
"""Returns True if the rank is still below the maximum memory threshold."""
return self.rank_memory < self.algorithm.max_memory_usage

def __check_for_steal_requests(self):
"""Checks next item in queue; if it's a steal request, responds accordingly."""
rank_queue = self.algorithm.rank_queues[self.rank_id]
Expand All @@ -142,14 +210,15 @@ def __check_for_steal_requests(self):
def __simulate_task(self, task: Object):
"""Simulates the execution of a task."""
self.algorithm.increment_task_count()
self.rank_memory += task.get_size()
num_steal_reqs = []

Check warning on line 214 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused variable 'num_steal_reqs' (unused-variable)
# queue_sizes = []
# for i in range(self.algorithm.num_ranks):
# queue_sizes.append(sum(isinstance(elm, list) for elm in self.algorithm.rank_queues[i]))
# num_steal_reqs.append(sum(isinstance(elm, StealRequest) for elm in self.algorithm.rank_queues[i]))
task_time = task.get_load()
# self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task_time}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes} steal requests = {num_steal_reqs}")
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task_time}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task_time}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}); memory: {self.rank_memory}/{self.algorithm.max_memory_usage}")
yield self.env.timeout(task_time)


Expand Down Expand Up @@ -180,10 +249,11 @@ def __init__(
super(WorkStealingAlgorithm, self).__init__(
work_model, parameters, lgr, rank_qoi, object_qoi)

# Initialize the discretion interval
self.__discretion_interval = parameters.get("discretion_interval")
# Initialize the configuration parameters
self.__discretion_interval = parameters.get("discretion_interval") # TODO: still necessary?

Check warning on line 253 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: still necessary? (fixme)

Check warning on line 253 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `WorkStealingAlgorithm.__discretion_interval` (unused-private-member)
self.steal_time = parameters.get("steal_time", 0.1)
self.do_stealing = parameters.get("do_stealing", True)
self.max_memory_usage = parameters.get("max_memory_usage", 8.0e+9)

# Initialize logger
self.__logger = lgr
Expand All @@ -192,7 +262,6 @@ def __init__(
self.ranks = {}
self.num_ranks = 0
self.rank_queues = {}
self.workers = []

# Initialize task and steal counts
self.__task_count = 0
Expand All @@ -210,7 +279,7 @@ def __build_rank_clusters(self, rank: Rank) -> dict:
# Iterate over all migratable objects on rank
clusters = {}
for o in rank.get_migratable_objects():
# Retrieve shared block ID and skip object without one
# Retrieve shared block ID
sb = o.get_shared_block()
# Add current object to its block ID cluster
clusters.setdefault(sb.get_id(), []).append(o)
Expand All @@ -236,8 +305,44 @@ def __reset(self):
self.ranks = {rank.get_id(): rank for rank in ranks_list}
self.num_ranks = len(self.ranks)
self.__initialize_rank_queues()
self.memory_intensive_clusters = []

Check warning on line 308 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Attribute 'memory_intensive_clusters' defined outside __init__ (attribute-defined-outside-init)
self.workers = []


def calculate_cluster_memory_footprint(self, cluster):
"""Returns the total memory footprint of a given cluster."""
cluster_size = cluster[0].get_shared_block().get_size()
for task in cluster:
cluster_size += task.get_size()
return cluster_size

def rank_has_memory_for_cluster(self, rank, cluster):
"""Returns True if the rank has enough memory to execute the cluster."""
rank_memory = self.workers[rank.get_id()].rank_memory
cluster_memory = self.calculate_cluster_memory_footprint(cluster)
return rank_memory + cluster_memory < self.max_memory_usage

def has_work_in_deque(self, rank, including_steals=False):
"""Determines if a given rank's deque has a cluster (or StealRequest, if including_steals is True)."""
if including_steals:
return any(isinstance(item, (list, StealRequest)) for item in self.rank_queues[rank.get_id()])
else:
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])

def get_task_count(self):
"""Returns number of tasks that have been simulated."""
return self.__task_count

def increment_task_count(self):
"""Increments the number of tasks that have been simulated."""
self.__task_count += 1

def get_total_task_count(self):
"""Returns the total number of tasks that need to be simualted."""
return self.__total_task_count

def get_index_of_first_non_steal_request(self, rank):
"""Returns the index of the first non-StealRequest in a rank's deque."""
rank_queue = self.rank_queues[rank.get_id()]
for i in range(len(rank_queue)):
if isinstance(rank_queue[i], StealRequest):
Expand All @@ -249,25 +354,25 @@ def iterate_attempted_steals(self):
"""Increases number of attempted steals by one."""
self.__attempted_steal_count += 1

def has_stealable_cluster(self, rank):
"""Asserts that a given rank has a stealable cluster at the back of its deque."""
stealable = False
rank_queue = self.rank_queues[rank.get_id()]
def has_stealable_cluster(self, target_rank, requesting_rank):
"""Asserts that a given target_rank has a stealable cluster at the back of its deque.
Also checks that the requesting rank has enough memory for the steal."""
rank_queue = self.rank_queues[target_rank.get_id()]

# Make sure rank has at a cluster at the back of its queue
if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list):
stealable = True
# Make sure target_rank has a cluster at the back of its queue
has_cluster = self.has_work_in_deque(target_rank) and isinstance(rank_queue[-1], list)

return stealable
# Make sure the requesting rank has enough memory to execute the cluster
return has_cluster and self.rank_has_memory_for_cluster(requesting_rank, rank_queue[-1])

def respond_to_steal_request(self, steal_request: StealRequest):
'''Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.'''
"""Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue."""
# Get both ranks
r_requesting = steal_request.get_requesting_rank()
r_target = steal_request.get_target_rank()

# Double check that r_target still has a cluster to steal
if self.has_stealable_cluster(r_target):
if self.has_stealable_cluster(r_target, r_requesting):

# Perform steal
cluster = self.rank_queues[r_target.get_id()].pop()
Expand All @@ -276,33 +381,14 @@ def respond_to_steal_request(self, steal_request: StealRequest):
self.__steal_count += 1

else:
self.__logger.info(f" Ignoring steal request from {r_requesting.get_id()} ({r_target.get_id()} has no stealable clusters) ")
self.__logger.info(f" Rank {r_target.get_id()} has no stealable clusters for {r_requesting.get_id()}")

# set false as we are responding, either by putting work or doing nothing
self.workers[r_requesting.get_id()].pending_steal_request = False

def get_task_count(self):
"""Returns number of tasks that have been simulated."""
return self.__task_count

def increment_task_count(self):
"""Increments the number of tasks that have been simulated."""
self.__task_count += 1

def get_total_task_count(self):
"""Returns the total number of tasks that need to be simualted."""
return self.__total_task_count

def has_work_in_deque(self, rank, including_steals=False):
"""Determines if a given rank's deque has a cluster (or StealRequest, if including_steals is True)."""
if including_steals:
return any(isinstance(item, (list, StealRequest)) for item in self.rank_queues[rank.get_id()])
else:
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])

def any_ranks_have_stealable_work(self):
"""Determines if any rank has a cluster in its deque that can be stolen."""
return any(self.has_stealable_cluster(r) for r in self.ranks.values())
def any_ranks_have_stealable_work(self, requesting_rank):
"""Determines if any rank has a cluster in its deque that can be stolen by requesting_rank."""
return any(self.has_stealable_cluster(r, requesting_rank) for r in self.ranks.values())

def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max):
"""Performs the simulation and returns the average time to complete all tasks."""
Expand All @@ -323,7 +409,6 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict

# Set up problem
random.seed()
self.workers = []

# Create simpy environment
env = simpy.Environment()
Expand All @@ -341,4 +426,4 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
experiment_times.append(end_time)

# Report average time for all experiments
self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times)}")
self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times):0.2f}")
4 changes: 4 additions & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def __init__(self, config_to_validate: dict, logger: Logger):
float,
lambda x: x>=0.0,
error="Should be of type 'float' and >= 0.0"),
Optional("max_memory_usage"): And(
float,
lambda x: x>=0.0,
error="Should be of type 'float' and >= 0.0"),
Optional("num_experiments"): And(
int,
lambda x: x > 0,
Expand Down

0 comments on commit 0a2238c

Please sign in to comment.