Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add delayed container start #131

Merged
merged 10 commits into from
Nov 4, 2024
5 changes: 4 additions & 1 deletion mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def _reserve_aid(self, suggested_aid=None):
return suggested_aid
else:
logger.warning(
"The suggested aid could not be reserved, either it is not available or it is not allowed (pattern agentX);%s",
"The suggested aid could not be reserved, either it is not available or it is not allowed (pattern %sX);%s",
AGENT_PATTERN_NAME_PRE,
suggested_aid,
)

Expand Down Expand Up @@ -298,6 +299,8 @@ async def start(self):
# task that processes the inbox.
self._check_inbox_task: asyncio.Task = asyncio.create_task(self._check_inbox())

await self._container_process_manager.start()

"""Start the container. It totally depends on the implementation for what is actually happening."""
for agent in self._agents.values():
agent._do_start()
Expand Down
38 changes: 35 additions & 3 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ async def start_agent_loop():

for agent in container._agents.values():
agent._do_start()

container.running = True
container.on_ready()

while not terminate_event.is_set():
await asyncio.sleep(WAIT_STEP)
await container.shutdown()
Expand Down Expand Up @@ -294,7 +297,7 @@ async def _execute_dispatch_event(self, event_pipe: AioDuplex):
except Exception:
logger.exception("A dispatched coroutine has failed!")
except EOFError:
# other side disconnected -> task not necessry anymore
# other side disconnected -> task not necessary anymore
pass
except Exception:
logger.exception("The Dispatch Event Loop has failed!")
Expand Down Expand Up @@ -363,6 +366,8 @@ def __init__(
self._container = container
self._mp_enabled = False
self._ctx = get_context("spawn")
self._agent_process_init_list = []
self._started = False

def _init_mp(self):
# For agent multiprocessing support
Expand Down Expand Up @@ -444,6 +449,21 @@ def _find_sp_queue(self, aid):
raise ValueError(f"The aid '{aid}' does not exist in any subprocess.")

def create_agent_process(self, agent_creator, container, mirror_container_creator):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the design, it makes sense that delayed starting agent processes is an additional feature. I do not feel the prior approach was bad. Being able to start the process at will before starting the container makes sense as it is closer to the normal agent lifecycle. So I would suggest to introduce a new method for delayed agent process start (or a flag?). This also would prevent breaking the API.

# dill must be dumped on creation, otherwise the later variable state would be stored
agent_creator = dill.dumps(agent_creator)
mirror_container_creator = dill.dumps(mirror_container_creator)
if not self._started:
self._agent_process_init_list.append(
(agent_creator, container, mirror_container_creator)
)
else:
self.create_internal_agent_process(
maurerle marked this conversation as resolved.
Show resolved Hide resolved
agent_creator, container, mirror_container_creator
)

def create_internal_agent_process(
self, agent_creator, container, mirror_container_creator
):
if not self._active:
self._init_mp()
self._active = True
Expand All @@ -461,8 +481,8 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato
clock=container.clock,
kwargs=container._kwargs,
),
dill.dumps(agent_creator),
dill.dumps(mirror_container_creator),
agent_creator,
mirror_container_creator,
to_pipe_message,
self._main_queue,
to_pipe,
Expand Down Expand Up @@ -510,6 +530,18 @@ def handle_message_in_sp(self, message, receiver_id, priority, meta):
else:
sp_queue_of_agent.put_nowait((priority, message, meta))

async def start(self):
if not self._started:
self._started = True
for (
agent_creator,
container,
mirror_container_creator,
) in self._agent_process_init_list:
await self.create_internal_agent_process(
agent_creator, container, mirror_container_creator
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be using asyncio.gather instead to decrease runtime

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gather did not work somehow, as the ProcessHandle is unhashable..
Using a poor men's version of gather now instead


async def shutdown(self):
if self._active:
# send a signal to all sub processes to terminate their message feed in's
Expand Down
6 changes: 5 additions & 1 deletion mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@
elif isinstance(protocol_addr, tuple | list) and len(protocol_addr) == 2:
protocol_addr = tuple(protocol_addr)
else:
logger.warning("Address for sending message is not valid;%s", protocol_addr)
logger.warning(

Check warning on line 226 in mango/container/tcp.py

View check run for this annotation

Codecov / codecov/patch

mango/container/tcp.py#L226

Added line #L226 was not covered by tests
"Receiver ProtocolAddress sending message from %s to %s is not valid",
sender_id,
receiver_addr,
)
return False

meta = {}
Expand Down
7 changes: 5 additions & 2 deletions mango/util/termination_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def unfinished_task_count(container: Container):
async def tasks_complete_or_sleeping(container: Container, except_sources=["no_wait"]):
sleeping_tasks = []
task_list = []
await container.inbox.join()
# is None for containers in MirrorContainerProcessManager
if container.inbox is not None:
await container.inbox.join()
# python does not have do while pattern
for agent in container._agents.values():
await agent.inbox.join()
Expand All @@ -33,7 +35,8 @@ async def tasks_complete_or_sleeping(container: Container, except_sources=["no_w
while len(task_list) > len(sleeping_tasks):
# sleep needed so that asyncio tasks of this time step are correctly awaken.
# await asyncio.sleep(0)
await container.inbox.join()
if container.inbox is not None:
await container.inbox.join()
for scheduled_task, task, _, _ in task_list:
await asyncio.wait(
[scheduled_task._is_sleeping, scheduled_task._is_done],
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/test_message_roundtrip_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ async def test_mp_simple_ping_pong_multi_container_tcp():
container_2 = create_tcp_container(
addr=repl_addr,
)
await container_1.as_agent_process(
container_1.as_agent_process(
agent_creator=lambda c: c.register(PingPongAgent(), suggested_aid=aid1)
)
await container_2.as_agent_process(
container_2.as_agent_process(
agent_creator=lambda c: c.register(PingPongAgent(), suggested_aid=aid2)
)
agent = container_1.register(PingPongAgent())
Expand Down
105 changes: 95 additions & 10 deletions tests/unit_tests/container/test_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@

import pytest

from mango import Agent, AgentAddress, activate, addr, create_tcp_container, sender_addr
from mango import (
Agent,
AgentAddress,
activate,
addr,
create_ec_container,
create_tcp_container,
sender_addr,
)


class MyAgent(Agent):
test_counter: int = 0
current_task: object
i_am_ready = False

def on_ready(self):
self.i_am_ready = True

def handle_message(self, content, meta):
self.test_counter += 1
Expand All @@ -16,7 +28,7 @@ def handle_message(self, content, meta):
if self.test_counter == 1:
# send back pong, providing your own details
self.current_task = self.schedule_instant_message(
content="pong", receiver_addr=sender_addr(meta)
content=self.i_am_ready, receiver_addr=sender_addr(meta)
)


Expand Down Expand Up @@ -63,7 +75,7 @@ async def test_agent_processes_ping_pong(num_sp_agents, num_sp):
# GIVEN
c = create_tcp_container(addr=("127.0.0.1", 15589), copy_internal_messages=False)
for i in range(num_sp):
await c.as_agent_process(
c.as_agent_process(
agent_creator=lambda container: [
container.register(MyAgent(), suggested_aid=f"process_agent{i},{j}")
for j in range(num_sp_agents)
Expand All @@ -80,7 +92,9 @@ async def test_agent_processes_ping_pong(num_sp_agents, num_sp):
receiver_addr=addr(c.addr, f"process_agent{i},{j}"),
)
while agent.test_counter != num_sp_agents * num_sp:
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)

assert agent.i_am_ready is True

assert agent.test_counter == num_sp_agents * num_sp

Expand All @@ -91,7 +105,7 @@ async def test_agent_processes_ping_pong_p_to_p():
addr = ("127.0.0.1", 5829)
aid_main_agent = "main_agent"
c = create_tcp_container(addr=addr, copy_internal_messages=False)
await c.as_agent_process(
c.as_agent_process(
agent_creator=lambda container: container.register(
P2PTestAgent(aid_main_agent), suggested_aid="process_agent1"
)
Expand All @@ -108,10 +122,10 @@ def agent_init(c):
return agent

async with activate(c) as c:
await c.as_agent_process(agent_creator=agent_init)
c.as_agent_process(agent_creator=agent_init)

while main_agent.test_counter != 1:
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)

assert main_agent.test_counter == 1

Expand All @@ -133,7 +147,42 @@ async def agent_creator(container):
await p2pta.send_message(content="pong", receiver_addr=target_addr)

async with activate(c) as c:
await c.as_agent_process(agent_creator=agent_creator)
c.as_agent_process(agent_creator=agent_creator)

# WHEN
def agent_init(c):
agent = c.register(MyAgent(), suggested_aid="process_agent2")
agent.schedule_instant_message(
"Message To Process Agent", AgentAddress(addr, "process_agent1")
)
return agent

c.as_agent_process(agent_creator=agent_init)

while main_agent.test_counter != 2:
await asyncio.sleep(0.01)

assert main_agent.test_counter == 2


@pytest.mark.asyncio
async def test_async_agent_processes_ping_pong_p_to_p_external():
# GIVEN
addr = ("127.0.0.1", 5811)
aid_main_agent = "main_agent"
c = create_ec_container(addr=addr, copy_internal_messages=False)
main_agent = c.register(P2PMainAgent(), suggested_aid=aid_main_agent)

target_addr = main_agent.addr

async def agent_creator(container):
p2pta = container.register(
P2PTestAgent(aid_main_agent), suggested_aid="process_agent1"
)
await p2pta.send_message(content="pong", receiver_addr=target_addr)

async with activate(c) as c:
c.as_agent_process(agent_creator=agent_creator)

# WHEN
def agent_init(c):
Expand All @@ -143,13 +192,49 @@ def agent_init(c):
)
return agent

await c.as_agent_process(agent_creator=agent_init)
c.as_agent_process(agent_creator=agent_init)

while main_agent.test_counter != 2:
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)

assert main_agent.test_counter == 2


def test_sync_setup_agent_processes():
# GIVEN
c = create_tcp_container(addr=("127.0.0.1", 15589), copy_internal_messages=False)
c.as_agent_process(
agent_creator=lambda container: [
container.register(MyAgent(), suggested_aid="process_agent0")
]
)
agent = c.register(MyAgent())


@pytest.mark.asyncio
async def test_ready_agent_processes():
# GIVEN
c = create_tcp_container(addr=("127.0.0.1", 15589), copy_internal_messages=False)
c.as_agent_process(
agent_creator=lambda container: [
container.register(MyAgent(), suggested_aid="process_agent0")
]
)
agent = c.register(MyAgent())

def handle_message(content, meta):
agent.other_agent_is_ready = content

agent.handle_message = handle_message

async with activate(c) as c:
await agent.send_message(
"Message To Process Agent",
receiver_addr=addr(c.addr, "process_agent0"),
)
await asyncio.sleep(0.01)
assert agent.other_agent_is_ready is True


if __name__ == "__main__":
asyncio.run(test_agent_processes_ping_pong(5, 5))