Skip to content

Commit

Permalink
#558: completed implementation fixing remaining shared blocks bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Jan 14, 2025
1 parent 4daf8a1 commit d024aa9
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 113 deletions.
4 changes: 3 additions & 1 deletion config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ work_model:
alpha: 1.0
beta: 0.0
gamma: 0.0
upper_bounds:
max_memory_usage: 45.0

# Specify algorithm
brute_force_optimization: true
Expand All @@ -37,6 +39,7 @@ write_JSON:
suffix: json
communications: true
offline_lb_compatible: true
lb_iterations: true
visualization:
x_ranks: 2
y_ranks: 2
Expand All @@ -48,4 +51,3 @@ visualization:
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file
lb_iterations: true
8 changes: 6 additions & 2 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def __configure(self, *config_path):
:returns: The configuration as a dictionary
"""

# merge configurations
# Merge configurations
config = self.__merge_configurations(*config_path)

# Change logger (with parameters from the configuration)
Expand Down Expand Up @@ -325,6 +325,7 @@ def __configure(self, *config_path):
self.__parameters.output_file_stem,
self.__parameters.json_params) if self.__parameters.json_params else None

# Return configuration
return config

def __resolve_config_path(self, config_path) -> str:
Expand Down Expand Up @@ -385,7 +386,7 @@ def __print_statistics(self, phase: Phase, phase_name: str, work_model: WorkMode
self.__logger)
lbstats.print_function_statistics(
phase.get_nodes(),
lambda x: x.get_max_memory_usage(phase),
lambda x: x.get_max_memory_usage(),
f"{phase_name} node maximum memory usage",
self.__logger)
if r_shared_mem_stats.get_maximum():
Expand Down Expand Up @@ -565,6 +566,9 @@ def run(self, cfg=None, cfg_dir=None):
a_min_max = []

# Instantiate runtime
if self.__parameters.ranks_per_node > 1 and (
wmp := self.__parameters.work_model.get("parameters")):
wmp["node_bounds"] = True
runtime = Runtime(
phases,
self.__parameters.work_model,
Expand Down
13 changes: 8 additions & 5 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,17 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
# No communications for this phase
self.__communications_dict.setdefault(phase_id, {rank_id: {}})

# Instantiate rank for current phase

# Create phase rank
phase_rank = Rank(self.__logger, rank_id)
phase_rank.set_metadata(self.__metadata[rank_id])

# Create node when required
rank_node = None
if self.ranks_per_node > 1:
rank_node = self.__nodes[rank_id // self.ranks_per_node]
rank_node.add_rank_id(rank_id)
phase_rank = Rank(self.__logger, rank_id, node=rank_node)
phase_rank.set_metadata(self.__metadata[rank_id])
rank_node.add_rank(phase_rank)
phase_rank.set_node(rank_node)

# Initialize storage for shared blocks information
rank_blocks, task_user_defined = {}, {}
Expand Down Expand Up @@ -356,7 +360,6 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
# Assign block to objects attached to it
for o in objects:
o.set_shared_block(block)
phase_rank.set_shared_blocks(shared_blocks)

# Returned rank and communicators per phase
return phase_rank, rank_comm
Expand Down
11 changes: 8 additions & 3 deletions src/lbaf/Model/lbsAffineCombinationWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

from .lbsWorkModelBase import WorkModelBase
from .lbsRank import Rank
from .lbsNode import Node

Check warning on line 48 in src/lbaf/Model/lbsAffineCombinationWorkModel.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Node imported from lbsNode (unused-import)


class AffineCombinationWorkModel(WorkModelBase):
Expand All @@ -63,14 +64,16 @@ def __init__(self, parameters, lgr: Logger):
self.__beta = parameters.get("beta", 0.0)
self.__gamma = parameters.get("gamma", 0.0)
self.__upper_bounds = parameters.get("upper_bounds", {})
self.__node_bounds = parameters.get("node_bounds", False)

# Call superclass init
super().__init__(parameters)
self.__logger.info(
f"Instantiated work model with alpha={self.__alpha}, beta={self.__beta}, gamma={self.__gamma}")
"Instantiated work model with: "
f"alpha={self.__alpha}, beta={self.__beta}, gamma={self.__gamma}")
for k, v in self.__upper_bounds.items():
self.__logger.info(
f"Upper bound for rank {k}: {v}")
f"Upper bound for {'node' if self.__node_bounds else 'rank'} {k}: {v}")

def get_alpha(self):
"""Get the alpha parameter."""
Expand All @@ -96,7 +99,9 @@ def compute(self, rank: Rank):
"""
# Check whether strict bounds are satisfied
for k, v in self.__upper_bounds.items():
if getattr(rank, f"get_{k}")() > v:
if getattr(
rank.get_node() if self.__node_bounds else rank,
f"get_{k}")() > v:
return math.inf

# Return combination of load and volumes
Expand Down
4 changes: 2 additions & 2 deletions src/lbaf/Model/lbsBlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def __init__(

def __repr__(self):
return (
f"Block id: {self.__index}, home id: {self.__home_id}, "
f"size: {self.__size}, object ids: {self.__attached_object_ids}"
f"<Block id: {self.__index}, home id: {self.__home_id}, "
f"size: {self.__size}, object ids: {self.__attached_object_ids}>"
)

def get_id(self) -> int:
Expand Down
34 changes: 17 additions & 17 deletions src/lbaf/Model/lbsNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@
#@HEADER
#

import copy
import math
import functools
import operator
import importlib
from logging import Logger
from typing import Set

from .lbsRank import Rank

class Node:
"""A class representing a node to which a set of ranks are assigned."""

Expand All @@ -62,23 +59,26 @@ def __init__(

# Member variables passed by constructor
self.__index = n_id
self.__rank_ids = set()
self.__ranks: Set[Rank] = set()

def __repr__(self):
"""Custom print."""
return f"<Node id: {self.__index}, {len(self.__ranks)} ranks>"

def get_id(self) -> int:
"""Return node ID."""
return self.__index

def get_rank_ids(self) -> Set[int]:
return self.__rank_ids

def get_max_memory_usage(self, phase) -> float:
"""Combine all memory usages for each rank to get the node memory usage."""
module = importlib.import_module("lbaf.Model.lbsPhase")
return 0.0 + sum(
r.get_max_memory_usage() for r in phase.get_node_ranks(self.__index))
def get_ranks(self) -> Set[int]:
return self.__ranks

def add_rank_id(self, r_id: int):
self.__rank_ids.add(r_id)
def add_rank(self, rank):
self.__ranks.add(rank)

def get_number_of_ranks(self) -> int:
return len(self.__rank_ids)
return len(self.__ranks)

def get_max_memory_usage(self) -> float:
"""Sum all memory usages for each rank to get the node memory usage."""
return 0.0 + sum(
r.get_max_memory_usage() for r in self.__ranks)
6 changes: 3 additions & 3 deletions src/lbaf/Model/lbsObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
f"size: incorrect type {type(size)} or value: {size}")
self.__size = float(size)

# Rank to which object is currently assigned if defined
# ID of rank to which object is currently assigned if defined
if not(r_id is None or isinstance(r_id, int)) or isinstance(r_id, bool):
raise TypeError(
f"r_id: incorrect type {type(r_id)}")
Expand Down Expand Up @@ -156,7 +156,7 @@ def __init__(
raise TypeError(f"subphases: {subphases} is of type {type(subphases)} but must be <class 'list'>")

def __repr__(self):
return f"Object id: {self.get_id()}, load: {self.__load}"
return f"<Object id: {self.get_id()}, load: {self.__load}>"

@entity_property
def get_id(self) -> int:
Expand Down Expand Up @@ -331,4 +331,4 @@ def get_qois(self, qoi_type=None):
return self.__get_qois_of_type(qoi_type)
qois = self.__get_qois_of_type("qoi")
qois.update(self.__get_qois_of_type("entity_property"))
return qois
return qois
35 changes: 21 additions & 14 deletions src/lbaf/Model/lbsPhase.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from .lbsObject import Object
from .lbsObjectCommunicator import ObjectCommunicator
from .lbsRank import Rank
from .lbsNode import Node


class Phase:
Expand Down Expand Up @@ -129,7 +130,6 @@ def get_number_of_ranks(self):

def set_ranks(self, ranks: Set[Rank]):
""" Set list of ranks for this phase."""
self.__ranks = ranks

def get_ranks(self):
"""Retrieve all ranks belonging to phase."""
Expand All @@ -149,13 +149,26 @@ def get_node_ranks(self, node_id: int):

def copy_ranks(self, phase: Self):
"""Copy ranks from one phase to self."""

# Minimally copy all nodes of phase when they exist
new_nodes: Dict[Node] = {
n.get_id(): Node(self.__logger, n.get_id())
for n in phase.get_nodes()}

# Copy all ranks of phase
new_ranks: Set[Rank] = set()
self.__ranks = new_ranks
for r in phase.get_ranks():
# Minimally instantiate rank and copy
new_r = Rank(self.__logger)
new_r.copy(r)
new_ranks.add(new_r)
self.set_ranks(new_ranks)

# Copy node when rank is attached to one
if (r_node := r.get_node()) is not None:
new_r_node = new_nodes[r_node.get_id()]
new_r.set_node(new_r_node)
new_r_node.add_rank(new_r)

def get_rank_ids(self):
"""Retrieve IDs of ranks belonging to phase."""
Expand Down Expand Up @@ -557,8 +570,12 @@ def populate_from_specification(self, spec: PhaseSpecification, multiple_sharing
# Find shared block and create if not already created in memory
b: Block = None
if not shared_id in shared_blocks:
b = Block(b_id=shared_id, h_id=shared_block_spec["home_rank"], size=shared_block_spec["size"],
o_ids=shared_block_spec["tasks"])
b = Block(
b_id=shared_id,
h_id=shared_block_spec["home_rank"],
size=shared_block_spec["size"],
o_ids=shared_block_spec["tasks"])

# Index the shared block for next loops checks
shared_blocks[shared_id] = b
else:
Expand Down Expand Up @@ -675,16 +692,6 @@ def transfer_object(self, r_src: Rank, o: Object, r_dst: Rank):
self.__logger.debug(
f"Removing object {o_id} attachment to block {b_id} on rank {r_src.get_id()}")

# Perform sanity check
if b_id not in r_src.get_shared_ids():
self.__logger.error(
f"block {b_id} not present in {r_src.get_shared_ids()}")
raise SystemExit(1)

if not block.detach_object_id(o_id):
# Delete shared block if no tied object left on rank
r_src.delete_shared_block(block)

# Replicate or update block on destination rank
if not (b_dst := r_dst.get_shared_block_with_id(b_id)):
# Replicate block when not present on destination rank
Expand Down
Loading

0 comments on commit d024aa9

Please sign in to comment.