Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-751 | Clients should have unique IDs #638

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fedn/genprot.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
echo "Generating protocol"
python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. fedn/network/grpc/*.proto
python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. network/grpc/*.proto
echo "DONE"
5 changes: 3 additions & 2 deletions fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por

return jsonify(payload)

def add_client(self, client_id, preferred_combiner, remote_addr):
def add_client(self, client_id, preferred_combiner, remote_addr, name):
"""Add a client to the network.

:param client_id: The client id to add.
Expand Down Expand Up @@ -600,7 +600,8 @@ def add_client(self, client_id, preferred_combiner, remote_addr):
)

client_config = {
"name": client_id,
"client_id": client_id,
"name": name,
"combiner_preferred": preferred_combiner,
"combiner": combiner.name,
"ip": remote_addr,
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ def add_client(self, client):
:type client: dict
:return: None
"""
if self.get_client(client["name"]):
if self.get_client(client["client_id"]):
return

logger.info("adding client {}".format(client["name"]))
logger.info("adding client {}".format(client["client_id"]))
self.statestore.set_client(client)

def get_client(self, name):
Expand Down
7 changes: 5 additions & 2 deletions fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def __init__(self, config):
set_log_level_from_string(config.get("verbosity", "INFO"))
set_log_stream(config.get("logfile", None))

self.id = config["client_id"] or str(uuid.uuid4())

self.connector = ConnectorClient(
host=config["discover_host"],
port=config["discover_port"],
Expand All @@ -72,7 +74,7 @@ def __init__(self, config):
force_ssl=config["force_ssl"],
verify=config["verify"],
combiner=config["preferred_combiner"],
id=config["client_id"],
id=self.id,
)

# Validate client name
Expand Down Expand Up @@ -420,6 +422,7 @@ def _listen_to_task_stream(self):
r = fedn.ClientAvailableMessage()
r.sender.name = self.name
r.sender.role = fedn.WORKER
r.sender.client_id = self.id
Wrede marked this conversation as resolved.
Show resolved Hide resolved
# Add client to metadata
self._add_grpc_metadata("client", self.name)

Expand Down Expand Up @@ -762,7 +765,7 @@ def _send_heartbeat(self, update_frequency=2.0):
:rtype: None
"""
while True:
heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER))
heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id))
try:
self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self._missed_heartbeat = 0
Expand Down
2 changes: 1 addition & 1 deletion fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def assign(self):
"""
try:
retval = None
payload = {"client_id": self.name, "preferred_combiner": self.preferred_combiner}
payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner}
retval = requests.post(
self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client",
json=payload,
Expand Down
24 changes: 12 additions & 12 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, config):
# Set the status to offline for previous clients.
previous_clients = self.statestore.clients.find({"combiner": config["name"]})
for client in previous_clients:
self.statestore.set_client({"name": client["name"], "status": "offline"})
self.statestore.set_client({"name": client["name"], "status": "offline", "client_id": client["client_id"]})

self.modelservice = ModelService()

Expand Down Expand Up @@ -244,7 +244,7 @@ def _send_request_type(self, request_type, session_id, model_id, config=None, cl

request.sender.name = self.id
request.sender.role = fedn.COMBINER
request.receiver.name = client
request.receiver.client_id = client
request.receiver.role = fedn.WORKER
# Set the request data, not used in validation
if request_type == fedn.StatusType.INFERENCE:
Expand Down Expand Up @@ -290,9 +290,9 @@ def __join_client(self, client):
:param client: the client to add
:type client: :class:`fedn.network.grpc.fedn_pb2.Client`
"""
if client.name not in self.clients.keys():
if client.client_id not in self.clients.keys():
# The status is set to offline by default, and will be updated once _list_active_clients is called.
self.clients[client.name] = {"lastseen": datetime.now(), "status": "offline"}
self.clients[client.client_id] = {"last_seen": datetime.now(), "status": "offline"}

def _subscribe_client_to_queue(self, client, queue_name):
"""Subscribe a client to the queue.
Expand All @@ -303,8 +303,8 @@ def _subscribe_client_to_queue(self, client, queue_name):
:type queue_name: str
"""
self.__join_client(client)
if queue_name not in self.clients[client.name].keys():
self.clients[client.name][queue_name] = queue.Queue()
if queue_name not in self.clients[client.client_id].keys():
self.clients[client.client_id][queue_name] = queue.Queue()

def __get_queue(self, client, queue_name):
"""Get the queue for a client.
Expand All @@ -319,7 +319,7 @@ def __get_queue(self, client, queue_name):
:raises KeyError: if the queue does not exist
"""
try:
return self.clients[client.name][queue_name]
return self.clients[client.client_id][queue_name]
except KeyError:
raise

Expand Down Expand Up @@ -354,7 +354,7 @@ def _list_active_clients(self, channel):
for client in self._list_subscribed_clients(channel):
status = self.clients[client]["status"]
now = datetime.now()
then = self.clients[client]["lastseen"]
then = self.clients[client]["last_seen"]
if (now - then) < timedelta(seconds=10):
clients["active_clients"].append(client)
# If client has changed status, update statestore
Expand Down Expand Up @@ -600,7 +600,7 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context):
# Update the clients dict with the last seen timestamp.
client = heartbeat.sender
self.__join_client(client)
self.clients[client.name]["lastseen"] = datetime.now()
self.clients[client.client_id]["last_seen"] = datetime.now()

response = fedn.Response()
response.sender.name = heartbeat.sender.name
Expand Down Expand Up @@ -636,15 +636,15 @@ def TaskStream(self, response, context):
self._send_status(status)

# Set client status to online
self.clients[client.name]["status"] = "online"
self.statestore.set_client({"name": client.name, "status": "online"})
self.clients[client.client_id]["status"] = "online"
self.statestore.set_client({"name": client.name, "status": "online", "client_id": client.client_id, "last_seen": datetime.now()})

# Keep track of the time context has been active
start_time = time.time()
while context.is_active():
# Check if the context has been active for more than 10 seconds
if time.time() - start_time > 10:
self.clients[client.name]["lastseen"] = datetime.now()
self.clients[client.client_id]["last_seen"] = datetime.now()
# Reset the start time
start_time = time.time()
try:
Expand Down
1 change: 1 addition & 0 deletions fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum Role {
message Client {
Role role = 1;
string name = 2;
string client_id = 3;
}

message ReassignRequest {
Expand Down
Loading
Loading