Skip to content

Commit

Permalink
#566: fixed missing communications
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Dec 18, 2024
1 parent 512ea78 commit d3988dd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ def run(self, cfg=None, cfg_dir=None):

ranks_json_str = []
for i in range(len(rank_phases.items())):
print(i)
ranks_json_str.append(self.__json_writer._json_serializer((i, rank_phases[i])))

vttv_params = {
Expand Down
14 changes: 14 additions & 0 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):
else:
self._logger.info("No proposed object transfers")

tph = self._rebalanced_phase
print("AFTER EXECUTE IN REBALANCED PHASE:", tph.get_id(), tph)
for r in tph.get_ranks():
print(r.get_id(), [o for o in r.get_objects()])
print([id(o) for o in tph.get_objects()])
print([o.get_rank_id() for o in tph.get_objects()])
tpi = p_id
tph = phases[p_id]
print("AFTER EXECUTE IN PHASE:", tpi, tph)
for r in tph.get_ranks():
print(r.get_id(), [o for o in r.get_objects()])
print([id(o) for o in tph.get_objects()])
print([o.get_rank_id() for o in tph.get_objects()])
# Report iteration statistics
self._logger.info(
f"Iteration {i + 1} completed ({n_ignored} skipped ranks) in {time.time() - start_time:.3f} seconds")
Expand Down Expand Up @@ -296,3 +309,4 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):

# Report final mapping in debug mode
self._report_final_mapping(self._logger)

16 changes: 16 additions & 0 deletions src/lbaf/Execution/lbsRuntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ def get_work_model(self):
def execute(self, p_id: int, phase_increment: int=0, lb_iterations=False):

Check warning on line 124 in src/lbaf/Execution/lbsRuntime.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused argument 'lb_iterations' (unused-argument)
"""Execute runtime for single phase with given ID or multiple phases in selected range."""
# Execute load balancing algorithm
tpi = 0
tph = self.__phases[tpi]
print("PHASE:", tpi, tph)
for r in tph.get_ranks():
print(r.get_id(), [o for o in r.get_objects()])
print([o.get_id() for o in tph.get_objects()])
print([o.get_rank_id() for o in tph.get_objects()])
self.__logger.info(
f"Executing {type(self.__algorithm).__name__} for "
+ ("all phases" if p_id < 0 else f"phase {p_id}"))
Expand All @@ -133,6 +140,15 @@ def execute(self, p_id: int, phase_increment: int=0, lb_iterations=False):
self.__statistics,
self.__a_min_max)

tpi = 0
tph = self.__phases[tpi]
print("PHASE:", tpi, tph)
for r in tph.get_ranks():
print(r.get_id(), [o for o in r.get_objects()])
print([o.get_id() for o in tph.get_objects()])
print([o.get_rank_id() for o in tph.get_objects()])
#1/0

# Retrieve possibly null rebalanced phase and return it
if (lbp := self.__algorithm.get_rebalanced_phase()):
# Retain lb iterations with initial phase when it is replaced
Expand Down
75 changes: 42 additions & 33 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,80 +151,88 @@ def __create_task_data(self, rank: Rank):
key=lambda x: x.get("entity").get(
"id", x.get("entity").get("seq_id")))

def __find_object_rank(self, phase: Phase, object: Object):

Check warning on line 154 in src/lbaf/IO/lbsVTDataWriter.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Redefining built-in 'object' (redefined-builtin)

Check notice on line 154 in src/lbaf/IO/lbsVTDataWriter.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Either all return statements in a function should return an expression, or none of them should. (inconsistent-return-statements)
"""Determine which rank owns the object."""
for r in phase.get_ranks():
if object in r.get_objects():
return r

# If this point is reached the object could not be found
self.__logger.error(
f"Object id {object} cannot be located in any rank of phase {phase.get_id()}")

Check notice on line 163 in src/lbaf/IO/lbsVTDataWriter.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Trailing whitespace (trailing-whitespace)
def __get_communications(self, phase: Phase, rank: Rank):

Check notice on line 164 in src/lbaf/IO/lbsVTDataWriter.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many local variables (19/15) (too-many-locals)

Check notice on line 164 in src/lbaf/IO/lbsVTDataWriter.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many branches (15/12) (too-many-branches)
"""Create communication entries to be outputted to JSON."""

# Get initial communications (if any) for current phase
phase_communications_dict = phase.get_communications()

# Add empty entries for ranks with no initial communication
if rank.get_id() not in phase_communications_dict:
phase_communications_dict[rank.get_id()] = {}

# Get original communications on current rank
initial_on_rank_communications = phase_communications_dict[rank.get_id()]
r_id = rank.get_id()
phase_communications_dict.setdefault(r_id, {})
initial_on_rank_communications = phase_communications_dict[r_id]

# Get all objects on current rank
rank_objects = rank.get_object_ids()

# Initialize final communications
communications = []

# Ensure all objects are on the correct rank
if initial_on_rank_communications:
for comm_dict in initial_on_rank_communications:
for comm_entry in initial_on_rank_communications:
missing_ref = None

# Copy object information to the communication node
sender_obj: Object = [o for o in phase.get_objects() if
o.get_id() is not None and o.get_id() == comm_dict["from"].get("id") or
o.get_seq_id() is not None and o.get_seq_id() == comm_dict["from"].get("seq_id")]
o.get_id() is not None and o.get_id() == comm_entry["from"].get("id") or
o.get_seq_id() is not None and o.get_seq_id() == comm_entry["from"].get("seq_id")]
if len(sender_obj) == 1:
# Retrieve communications with single sender
sender_obj = sender_obj[0]
sender_rank_id = self.__find_object_rank(phase, sender_obj).get_id()
from_rank: Rank = [
r for r in phase.get_ranks() if r.get_id() == sender_obj.get_rank_id()][0]
comm_dict["from"]["home"] = sender_obj.get_rank_id()
comm_dict["from"]["migratable"] = from_rank.is_migratable(sender_obj)
r for r in phase.get_ranks() if r.get_id() == sender_rank_id][0]
comm_entry["from"]["home"] = sender_rank_id
comm_entry["from"]["migratable"] = from_rank.is_migratable(sender_obj)
for k, v in sender_obj.get_unused_params().items():
comm_dict["from"][k] = v
comm_dict["from"] = dict(sorted(comm_dict["from"].items()))
comm_entry["from"][k] = v
comm_entry["from"] = dict(sorted(comm_entry["from"].items()))
else:
# Other cases are not supported
missing_ref = comm_dict["from"].get("id", comm_dict["from"].get("seq_id"))
missing_ref = comm_entry["from"].get("id", comm_entry["from"].get("seq_id"))
self.__logger.error(
f"Invalid object id ({missing_ref}) in communication {json.dumps(comm_dict)}")
f"Invalid object id ({missing_ref}) in communication {json.dumps(comm_entry)}")

receiver_obj: Object = [o for o in phase.get_objects() if
o.get_id() is not None and o.get_id() == comm_dict["to"].get("id") or
o.get_seq_id() is not None and o.get_seq_id() == comm_dict["to"].get("seq_id")]
o.get_id() is not None and o.get_id() == comm_entry["to"].get("id") or
o.get_seq_id() is not None and o.get_seq_id() == comm_entry["to"].get("seq_id")]
if len(receiver_obj) == 1:
# Retrieve communications with single receiver
receiver_obj = receiver_obj[0]
comm_dict["to"]["home"] = receiver_obj.get_rank_id()
receiver_rank_id = self.__find_object_rank(phase, receiver_obj).get_id()
comm_entry["to"]["home"] = receiver_rank_id
to_rank: Rank = [
r for r in phase.get_ranks() if receiver_obj in r.get_objects()][0]
comm_dict["to"]["migratable"] = to_rank.is_migratable(receiver_obj)
comm_entry["to"]["migratable"] = to_rank.is_migratable(receiver_obj)
for k, v in receiver_obj.get_unused_params().items():
comm_dict["to"][k] = v
comm_dict["to"] = dict(sorted(comm_dict["to"].items()))
comm_entry["to"][k] = v
comm_entry["to"] = dict(sorted(comm_entry["to"].items()))
else:
# Other cases are not supported
missing_ref = comm_dict["to"].get("id", comm_dict["to"].get("seq_id"))
missing_ref = comm_entry["to"].get("id", comm_entry["to"].get("seq_id"))
self.__logger.error(
f"Invalid object id ({missing_ref}) in communication {json.dumps(comm_dict)}")
f"Invalid object id ({missing_ref}) in communication {json.dumps(comm_entry)}")

if missing_ref is not None:
# Keep communication with invalid entity references for the moment.
# We might remove these communications in the future in the reader work to fix invalid input.
communications.append(comm_dict)
elif ("migratable" in comm_dict["from"].keys() and
not comm_dict["from"]["migratable"]): # object is sentinel
communications.append(comm_dict)
elif comm_dict["from"].get("id", comm_dict["from"].get("seq_id")) in rank_objects:
communications.append(comm_dict)
communications.append(comm_entry)
elif ("migratable" in comm_entry["from"].keys() and
not comm_entry["from"]["migratable"]): # object is sentinel
communications.append(comm_entry)
elif comm_entry["from"].get("id", comm_entry["from"].get("seq_id")) in rank_objects:
communications.append(comm_entry)
else:
self.__moved_comms.append(comm_dict)
self.__moved_comms.append(comm_entry)

# Loop through any moved objects to find the correct rank
if self.__moved_comms:
Expand All @@ -240,7 +248,6 @@ def _json_serializer(self, rank_phases_double) -> str:
# Unpack received double
r_id, r_phases = rank_phases_double
current_rank = None

# Get current rank
for p_id, rank in r_phases.items():
if rank.get_id() == r_id:
Expand Down Expand Up @@ -314,6 +321,8 @@ def _json_writer(self, rank_phases_double) -> str:
"""Write one JSON per rank for list of phase instances."""
# Unpack received double
r_id = rank_phases_double[0]
if r_id:
return

# Create file name for current rank
file_name = f"{self.__file_stem}.{r_id}.{self.__extension}"
Expand Down

0 comments on commit d3988dd

Please sign in to comment.