-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Agent Communication Channels #1839
Comments
Concept 1 (Agent exclusive communication)Use a messaging system that assumes unordered messages. Unlimited messages may be too much, it is unclear how to trigger the start of the communication. class CooperativeAgent(Agent):
def __init__(self, message_interface):
self._message_interface = message_interface
self._state_lock = Lock()
self.awake = False
message_interface.register("agents_broadcast", on_agents_message_callback)
def on_agents_message_callback(self, msg):
"""Keep communicating until messages are done."""
with self._state_lock:
if msg.id == "__all__" and msg.wake:
self.awake = True
if self.self.open_to_communicate:
return
if stop_condition(msg):
self.awake = False
self._message_interface.send("agents_broadcast", Msg(..., id=self.id, closed=True))
reply_msg = self.gen_msg(msg)
self._message_interface.send("agents_broadcast", reply_msg)
agents: List[CooperativeAgent]
while not terms["__all__"]:
## Trigger start of agent communication with a `wake` message
message_interface.send("agents_broadcast", Msg(..., id="__all__" wake=True))
## block until all agents are done communicating?
wait_on_agents(agents)
for i in range(num_agents):
actions[AGENT_ID+i] = agents[AGENT_ID+i].act(obs[AGENT_ID+i])
obs, rewards, terms, truncs, infos = env.step(actions) Concept 2 (Gym wrapper Action->Observation message)Use a wrapper to pass messages between agents. This has the downside of a 1 step delay for passing information. class MessageAction(NamedTuple):
action: Any
msg: Any
class MessageObservation(NamedTuple):
observation: Observation
msgs: Dict[str, Any]
class MessagePasser(gymnasium.Wrapper):
def __init__(self, env: gymnasium.Env):
super.__init__(env)
# update action_space and observation_space
self.action_space =
def step(self, actions):
actions = {}
msgs = {}
for a_id, ma in actions.items():
msgs[a_id] = ma.msg
actions[a_id] = ma.action
obs, rewards, terms, truncs, infos = self.env.step(actions)
obs_with_msgs = {a_id: MessageObservation(ob, msgs) for a_id, ob in obs.items()}
return obs_with_msgs, rewards, terms, truncs, infos
def reset(self,*, seed, options):
...
# same standard interface
actions = {}
for i in range(num_agents):
actions[AGENT_ID+i] = agents[AGENT_ID+i].act(obs[AGENT_ID+i])
obs, rewards, terms, truncs, infos = env.step(actions) Concept 3 (Agent&Wrapper)Take concept 1 and 2 and combine so that the communication occurs inside the wrapper. agents = {a_id: Transmitter(Receiver(agent, on_receive), on_transmit) for a_id, agent in agents.items()}
env = MessagePasser(env, agents)
obs, rew, term, trunc, info = env.step({"transmit": False, "receive": True}) # communication and actions occur internally
# Actions are returned in `info` Concept 4 (Sensor based)In this concept messages are passed out of SMARTS about the vehicles. class Sensitivity(Enum):
LOW = 0
STANDARD = 1
HIGH = 2
class V2XTransmitter(NamedTuple):
"""A configuration utility to set up agents to transmit messages."""
# TODO MTA: move this to agent interface.
bands: Sequence[int]
range: float
mission: bool
position: bool
heading: bool
breaking: bool
throttle: bool
steering: bool
class V2XReceiver(NamedTuple):
"""A configuratoin utility to set up agent to receive messages."""
# TODO MTA: move this to agent interface.
bands: Sequence[int]
sensitivity: Sensitivity = Sensitivity.HIGH
# This is where I see a problem.
## SMARTS manages sensors but the agent must be able to send info.
## Specifically, the agent lives outside SMARTS and the sensors within.
class MessageTransmitterSensor():
def __call__(self, sim_state):
raise NotImplementedError
class MessageReceiverSensor():
def __call__(self, sim_state):
raise NotImplementedError
# Otherwise, the sensor interface is good
## It means there are no extra cases needed to modify the action/observation space.
class AgentInterface():
...
v2x_receiver = V2XReceiver(bands=[4,5,8], sensitivity=Sensitivity.STANDARD)
v2x_transmitter = V2XTransmitter(bands=[4,8], range=200, mission=True, position=True, heading=True) Concept 5 (Hybrid Message)In this concept SMARTS handles some of the message and the agent has some space to insert its own specific message. class V2XTransmitter(NamedTuple):
"""A configuration utility to set up agents to transmit messages."""
# TODO MTA: move this to agent interface.
bands: Sequence[int]
range: float
...
custom_message_blob_bytes: int # custom agent message
class AgentInterface:
...
v2x_receiver = V2XReceiver(bands=[4,5,8], sensitivity=Sensitivity.STANDARD)
v2x_transmitter = V2XTransmitter(bands=[4,8], ..., custom_message_blob_bytes=10000)
class Agent
# env interface is then same as normal except action and observation space
# > env.action_space[@0]
# Tuple(base_action, message)
# > env.observation_space[@0] |
Some thoughts on simulating communication.
|
From
Do you have any suggestions for papers?
I overlooked this, it is a good approach probably. I have some concern over how the sensor response is programmed. We generally do not want to inject user code into SMARTS, and would need to work from a remote (roughly data only.) It would not be breaking but if the response must go through the action that would be a very different behaviour than we currently have.
That was the reason for mocking up the wrapper version.
Right, we can assume packet size can be capped at 10Mbps for now. |
I am going to be implementing this through a |
Is your feature request related to a problem? Please describe.
There is a need to provide utility to allow for cooperative agents which can communicate with each other over top what SMARTS is responsible for.
The remaining question is how immediate and continuous the communication needs to be:
Immediacy (choose 1):
Frequency (choose 1):
Signal (as many that apply):
Describe the solution you'd like
We want a channel subscription utility which sits above the SMARTS layer to allow agents to communicate with each-other. These should not just be vehicle related agents.
Some options:
Describe alternatives you've considered
Subscribe to other agents through the agent interface where data is passed to other agents through observations (this has the issue of a 1 step delay.)
The text was updated successfully, but these errors were encountered: