Skip to content

Commit

Permalink
Revert "Revert "update events for different device id.""
Browse files Browse the repository at this point in the history
This reverts commit 2a8706f.
  • Loading branch information
chaoyanghe committed Apr 25, 2022
1 parent bd92436 commit 658efce
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def finish(self):
self.communication_manager.finish()

def send_model_to_server(self, receive_id, weights, local_sample_num):
self.event_sdk.log_event_started("comm_c2s")
self.event_sdk.log_event_started("comm_c2s", event_edge_id=0)

message = Message(MyMessage.MSG_TYPE_C2S_SEND_MODEL_TO_SERVER, self.client_real_id, receive_id)
message.add_params(MyMessage.MSG_ARG_KEY_MODEL_PARAMS, weights)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def handle_message_client_status_update(self, msg_params):
self.send_init_msg()

def handle_message_receive_model_from_client(self, msg_params):
self.event_sdk.log_event_ended("comm_c2s")
self.event_sdk.log_event_ended("comm_c2s", event_edge_id=0)
self.event_sdk.log_event_ended("wait")

sender_id = msg_params.get(MyMessage.MSG_ARG_KEY_SENDER)
Expand Down
32 changes: 26 additions & 6 deletions fedml_api/distributed/fedavg_cross_silo/FedEventSDK.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,36 @@ def __init__(self, args):
self.com_manager = MqttS3StatusManager(
args.mqtt_config_path, args.s3_config_path, topic=args.run_id)

def log_event_started(self, event_name, event_value=None):
event_topic, event_msg = self.__build_event_mqtt_msg(self.args.run_id, self.edge_id,
def log_event_started(self, event_name, event_value=None, event_edge_id=None):
if event_value is None:
event_value_passed = ""
else:
event_value_passed = event_value

if event_edge_id is not None:
edge_id = event_edge_id
else:
edge_id = self.edge_id

event_topic, event_msg = self.__build_event_mqtt_msg(self.args.run_id, edge_id,
FedEventSDK.EVENT_TYPE_STARTED,
event_name, event_value)
event_name, event_value_passed)
self.com_manager.send_message_json(event_topic, json.dumps(event_msg))

def log_event_ended(self, event_name, event_value=None):
event_topic, event_msg = self.__build_event_mqtt_msg(self.args.run_id, self.edge_id,
def log_event_ended(self, event_name, event_value=None, event_edge_id=None):
if event_value is None:
event_value_passed = ""
else:
event_value_passed = event_value

if event_edge_id is not None:
edge_id = event_edge_id
else:
edge_id = self.edge_id

event_topic, event_msg = self.__build_event_mqtt_msg(self.args.run_id, edge_id,
FedEventSDK.EVENT_TYPE_ENDED,
event_name, event_value)
event_name, event_value_passed)
self.com_manager.send_message_json(event_topic, json.dumps(event_msg))

@staticmethod
Expand Down

0 comments on commit 658efce

Please sign in to comment.