Skip to content

Commit

Permalink
Uri service_name replaces client name for EventClientManager subscrip…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
Hackerman342 committed Apr 24, 2024
1 parent 9798528 commit 2eec175
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions py/farm_ng/core/event_client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
EventServiceConfigList,
SubscribeRequest,
)
from farm_ng.core.uri import get_service_name
from farm_ng.core.uri_pb2 import Uri

__all__ = ["EventClientSubscriptionManager"]
Expand Down Expand Up @@ -86,37 +87,38 @@ def _initialize_clients(
clients.update({config.name: EventClient(config)})
return clients

def _try_register_topic(self, service_name: str, uri_path: str) -> bool:
def _try_register_topic(self, client_name: str, uri: Uri) -> bool:
"""Attempts to register a topic for a service.
Args:
service_name: The name of the service.
uri_path: The path of the topic.
client_name: The name of the client to register the topic for.
uri: The uri to register.
Returns:
True if the topic was registered successfully, False otherwise.
"""
if service_name not in self._clients:
self.logger.warning("Service %s not found", service_name)
if client_name not in self._clients:
self.logger.warning("Service %s not found", client_name)
return False

# make the key for the subscription map
service_path: str = f"{service_name}{uri_path}"
service_name = get_service_name(uri)
topic_name: str = f"{service_name}{uri.path}"

if service_path in self._subscriptions:
# self.logger.warning("Topic %s already registered", service_path)
if topic_name in self._subscriptions:
# self.logger.warning("Topic %s already registered", topic_name)
return False

subscribe_request = SubscribeRequest(
uri=Uri(
path=uri_path,
path=uri.path,
query=f"service_name={service_name}",
),
every_n=1,
)

self.logger.info("Registering topic %s", service_path)
self._subscriptions[service_path] = subscribe_request
self.logger.info("Registering topic %s for client %s", topic_name, client_name)
self._subscriptions[topic_name] = subscribe_request

return True

Expand All @@ -133,7 +135,7 @@ async def update_subscriptions_for_client(self, client: EventClient) -> None:
# register the topics
uri: Uri
for uri in uris:
self._try_register_topic(client.config.name, uri.path)
self._try_register_topic(client.config.name, uri)

await asyncio.sleep(1.0)

Expand Down

0 comments on commit 2eec175

Please sign in to comment.