Skip to content

Commit

Permalink
Updated logs to provide exact information to user.
Browse files Browse the repository at this point in the history
Signed-off-by: Parth Mandaliya <parthx.mandaliya@intel.com>
  • Loading branch information
ParthM-GitHub committed Sep 28, 2023
1 parent 6a4ce82 commit 4a68b40
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
69 changes: 42 additions & 27 deletions openfl/experimental/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ def __init__(
if self.single_col_cert_common_name is not None:
self._log_big_warning()
else:
# FIXME: '' instead of None is just for protobuf compatibility.
# FIXME: "" instead of None is just for protobuf compatibility.
# Cleaner solution?
self.single_col_cert_common_name = ''
self.single_col_cert_common_name = ""

self.log_metric_callback = log_metric_callback
if log_metric_callback is not None:
self.log_metric = log_metric_callback
self.logger.info(f'Using custom log metric: {self.log_metric}')
self.logger.info(f"Using custom log metric: {self.log_metric}")

self.uuid = aggregator_uuid
self.federation_uuid = federation_uuid
Expand Down Expand Up @@ -99,6 +99,7 @@ def __init__(
self.__private_attrs_callable = private_attributes_callable
self.__private_attrs = {}
self.connected_collaborators = []
self.tasks_sent_to_collaborators = 0
self.collaborator_results_received = []

if self.__private_attrs_callable is not None:
Expand Down Expand Up @@ -141,10 +142,10 @@ def __delete_agg_attrs_from_clone(self, clone: Any, replace_str: str = None) ->
def _log_big_warning(self) -> None:
"""Warn user about single collaborator cert mode."""
self.logger.warning(
f'\n{the_dragon}\nYOU ARE RUNNING IN SINGLE COLLABORATOR CERT MODE! THIS IS'
f' NOT PROPER PKI AND '
f'SHOULD ONLY BE USED IN DEVELOPMENT SETTINGS!!!! YE HAVE BEEN'
f' WARNED!!!'
f"\n{the_dragon}\nYOU ARE RUNNING IN SINGLE COLLABORATOR CERT MODE! THIS IS"
f" NOT PROPER PKI AND "
f"SHOULD ONLY BE USED IN DEVELOPMENT SETTINGS!!!! YE HAVE BEEN"
f" WARNED!!!"
)

@staticmethod
Expand All @@ -169,7 +170,7 @@ def run_flow(self) -> None:
next_step = self.do_task(f_name)

if self.time_to_quit:
self.logger.info('Experiment Completed.')
self.logger.info("Experiment Completed.")
self.quit_job_sent_to = self.authorized_cols
break

Expand All @@ -181,11 +182,22 @@ def run_flow(self) -> None:
self.logger.info(f"Tasks will not be sent to {k}")

while not self.collaborator_task_results.is_set():
# Waiting for selected collaborators to send the results.
len_sel_collabs = len(self.selected_collaborators)
self.logger.info("Waiting for "
+ f"{self.collaborators_counter}/{len_sel_collabs}"
+ " collaborators to send results.")
len_connected_collabs = len(self.connected_collaborators)
if len_connected_collabs != len_sel_collabs:
# Waiting for collaborators to connect.
self.logger.info("Waiting for "
+ f"{len_connected_collabs}/{len_sel_collabs}"
+ " collaborators to connect...")
elif self.tasks_sent_to_collaborators != len_sel_collabs:
self.logger.info("Waiting for "
+ f"{self.tasks_sent_to_collaborators}/{len_sel_collabs}"
+ " to make requests for tasks...")
else:
# Waiting for selected collaborators to send the results.
self.logger.info("Waiting for "
+ f"{self.collaborators_counter}/{len_sel_collabs}"
+ " collaborators to send results...")
time.sleep(Aggregator._get_sleep_time())

self.collaborator_task_results.clear()
Expand Down Expand Up @@ -246,18 +258,18 @@ def get_tasks(self, collaborator_name: str) -> Tuple:
self.connected_collaborators.append(collaborator_name)

self.logger.debug(
f'Aggregator GetTasks function reached from collaborator {collaborator_name}...'
f"Aggregator GetTasks function reached from collaborator {collaborator_name}..."
)

# If queue of requesting collaborator is empty
while self.__collaborator_tasks_queue[collaborator_name].qsize() == 0:
# If it is time to then inform the collaborator
if self.time_to_quit:
self.logger.info(
f'Sending signal to collaborator {collaborator_name} to shutdown...')
# FIXME: 0, and '' instead of None is just for protobuf compatibility.
f"Sending signal to collaborator {collaborator_name} to shutdown...")
# FIXME: 0, and "" instead of None is just for protobuf compatibility.
# Cleaner solution?
return 0, '', None, Aggregator._get_sleep_time(), self.time_to_quit
return 0, "", None, Aggregator._get_sleep_time(), self.time_to_quit

# If not time to quit then sleep for 10 seconds
time.sleep(Aggregator._get_sleep_time())
Expand All @@ -266,9 +278,9 @@ def get_tasks(self, collaborator_name: str) -> Tuple:
next_step, clone = self.__collaborator_tasks_queue[
collaborator_name].get()

self.logger.info(
f'Sending tasks to collaborator {collaborator_name} for round {self.current_round}'
)
self.tasks_sent_to_collaborators += 1
self.logger.info("Sending tasks to collaborator"
+ f" {collaborator_name} for round {self.current_round}...")
return self.current_round, next_step, pickle.dumps(clone), 0, self.time_to_quit

def do_task(self, f_name: str) -> Any:
Expand Down Expand Up @@ -379,13 +391,13 @@ def send_task_results(self, collab_name: str, round_number: int, next_step: str,
# Log a warning if collaborator is sending results for old round
if round_number is not self.current_round:
self.logger.warning(
f'Collaborator {collab_name} is reporting results'
f' for the wrong round: {round_number}. Ignoring...'
f"Collaborator {collab_name} is reporting results"
f" for the wrong round: {round_number}. Ignoring..."
)
else:
self.logger.info(
f'Collaborator {collab_name} is sending task results '
f'for round {round_number}'
f"Collaborator {collab_name} sent task results"
f" for round {round_number}."
)
# Unpickle the clone (FLSpec object)
clone = pickle.loads(clone_bytes)
Expand All @@ -399,6 +411,9 @@ def send_task_results(self, collab_name: str, round_number: int, next_step: str,
self.collaborators_counter = 0
# Set the event to inform aggregator to resume the flow execution
self.collaborator_task_results.set()
# Empty tasks_sent_to_collaborators list for next time.
if self.tasks_sent_to_collaborators == len(self.selected_collaborators):
self.tasks_sent_to_collaborators = 0

def valid_collaborator_cn_and_id(self, cert_common_name: str,
collaborator_common_name: str) -> bool:
Expand All @@ -415,9 +430,9 @@ def valid_collaborator_cn_and_id(self, cert_common_name: str,
"""
# if self.test_mode_whitelist is None, then the common_name must
# match collaborator_common_name and be in authorized_cols
# FIXME: '' instead of None is just for protobuf compatibility.
# FIXME: "" instead of None is just for protobuf compatibility.
# Cleaner solution?
if self.single_col_cert_common_name == '':
if self.single_col_cert_common_name == "":
return (cert_common_name == collaborator_common_name
and collaborator_common_name in self.authorized_cols)
# otherwise, common_name must be in whitelist and
Expand All @@ -431,7 +446,7 @@ def all_quit_jobs_sent(self) -> bool:
return set(self.quit_job_sent_to) == set(self.authorized_cols)


the_dragon = '''
the_dragon = """
,@@.@@+@@##@,@@@@.`@@#@+ *@@@@ #@##@ `@@#@# @@@@@ @@ @@@@` #@@@ :@@ `@#`@@@#.@
@@ #@ ,@ +. @@.@* #@ :` @+*@ .@`+. @@ *@::@`@@ @@# @@ #`;@`.@@ @@@`@`#@* +:@`
Expand Down Expand Up @@ -501,4 +516,4 @@ def all_quit_jobs_sent(self) -> bool:
`* @# +.
@@@
# `@
, '''
, """
10 changes: 5 additions & 5 deletions openfl/experimental/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ def run(self) -> None:
elif sleep_time > 0:
time.sleep(sleep_time)
else:
self.logger.info(f'Received the following tasks: {next_step}.')
self.logger.info(f"Received the following tasks: {next_step}.")
f_name, ctx = self.do_task(next_step, clone)
self.send_task_results(f_name, ctx)

self.logger.info('End of Federation reached. Exiting...')
self.logger.info("End of Federation reached. Exiting...")

def send_task_results(self, next_step: str, clone: Any) -> None:
"""
Expand All @@ -157,8 +157,8 @@ def send_task_results(self, next_step: str, clone: Any) -> None:
Returns:
None
"""
self.logger.info(f'Round {self.round_number}, '
f'collaborator {self.name} is sending results.')
self.logger.info(f"Round {self.round_number},"
f" collaborator {self.name} is sending results.")
self.client.send_task_results(
self.name, self.round_number,
next_step, pickle.dumps(clone)
Expand All @@ -177,7 +177,7 @@ def get_tasks(self) -> Tuple:
sleep_time (int): Sleep for given seconds if not ready yet
time_to_quit (bool): True if end of reached
"""
self.logger.info('Waiting for tasks...')
self.logger.info("Waiting for tasks...")
temp = self.client.get_tasks(self.name)
self.round_number, next_step, clone_bytes, sleep_time, time_to_quit = temp

Expand Down

0 comments on commit 4a68b40

Please sign in to comment.