Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: remove the remote/local concept, assume services are always remote #475

Merged
merged 1 commit into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions llama_deploy/services/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import asyncio
import warnings
from abc import ABC, abstractmethod
from typing import Any

Expand All @@ -24,7 +24,6 @@ class BaseService(MessageQueuePublisherMixin, ABC):
- A service has a processing loop, for continuous processing of messages.
- A service can process a message.
- A service can publish a message to another service.
- A service can be launched in-process.
- A service can be launched as a server.
- A service can be registered to the control plane.
- A service can be registered to the message queue.
Expand All @@ -39,6 +38,12 @@ def __init__(
self._service_name = name
self._control_plane_config = control_plane_config or ControlPlaneConfig()
self._control_plane_url = self._control_plane_config.url
if control_plane_url is not None:
warnings.warn(
"The control_plane_url parameter is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)

@property
def service_name(self) -> str:
Expand All @@ -51,7 +56,7 @@ def service_definition(self) -> ServiceDefinition:
...

@abstractmethod
def as_consumer(self, remote: bool = False) -> BaseMessageQueueConsumer:
def as_consumer(self) -> BaseMessageQueueConsumer:
"""Get the consumer for the message queue."""
...

Expand All @@ -65,11 +70,6 @@ async def process_message(self, message: QueueMessage) -> Any:
"""Process a message."""
...

@abstractmethod
async def launch_local(self) -> asyncio.Task:
"""Launch the service in-process."""
...

@abstractmethod
async def launch_server(self) -> None:
"""Launch the service as a server."""
Expand Down Expand Up @@ -133,13 +133,11 @@ async def update_session_state(
async def register_to_message_queue(self) -> StartConsumingCallable:
"""Register the service to the message queue."""
return await self.message_queue.register_consumer(
self.as_consumer(remote=True), topic=self.get_topic(self.service_name)
self.as_consumer(), topic=self.get_topic(self.service_name)
)

async def deregister_from_message_queue(self) -> None:
return await self.message_queue.deregister_consumer(
self.as_consumer(remote=True)
)
return await self.message_queue.deregister_consumer(self.as_consumer())

def get_topic(self, msg_type: str) -> str:
return f"{self._control_plane_config.topic_namespace}.{msg_type}"
26 changes: 4 additions & 22 deletions llama_deploy/services/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from llama_deploy.control_plane.server import CONTROL_PLANE_MESSAGE_TYPE
from llama_deploy.message_consumers.base import BaseMessageQueueConsumer
from llama_deploy.message_consumers.callable import CallableMessageConsumer
from llama_deploy.message_consumers.remote import RemoteMessageConsumer
from llama_deploy.message_publishers.publisher import PublishCallback
from llama_deploy.message_queues.base import AbstractMessageQueue
Expand Down Expand Up @@ -415,32 +414,15 @@ async def process_message(self, message: QueueMessage) -> None:
else:
raise ValueError(f"Unhandled action: {message.action}")

def as_consumer(self, remote: bool = False) -> BaseMessageQueueConsumer:
"""Get the consumer for the message queue.
def as_consumer(self) -> BaseMessageQueueConsumer:
"""Get the consumer for the message queue."""

Args:
remote (bool):
Whether the consumer is remote. Defaults to False.
If True, the consumer will be a RemoteMessageConsumer that uses the `process_message` endpoint.
"""
if remote:
return RemoteMessageConsumer(
id_=self.publisher_id,
url=f"{self.config.url}{self._app.url_path_for('process_message')}",
message_type=self.service_name,
)

return CallableMessageConsumer(
return RemoteMessageConsumer(
id_=self.publisher_id,
url=f"{self.config.url}{self._app.url_path_for('process_message')}",
message_type=self.service_name,
handler=self.process_message,
)

async def launch_local(self) -> asyncio.Task:
"""Launch the service in-process."""
logger.info(f"{self.service_name} launch_local")
return asyncio.create_task(self.processing_loop())

# ---- Server based methods ----

@asynccontextmanager
Expand Down
4 changes: 2 additions & 2 deletions tests/services/test_workflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ async def test_hitl_workflow_service(
service_name="test_workflow",
description="Test Workflow Service",
host="localhost",
port=8001,
port=8002,
),
)

# launch it
server_task = await workflow_service.launch_local()
server_task = asyncio.create_task(workflow_service.launch_server())

# process run task
task = TaskDefinition(
Expand Down
Loading