diff --git a/CHANGELOG.md b/CHANGELOG.md index 395d0cff..caf8de7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - network: unrouted requests don't lead to infinite waiting anymore ([#116]). - network: release routed if flow closed. +- network: reconnect a control connection in the case of one-way discovery. - core: use the right slot's key to remove actors. [#116]: https://github.com/elfo-rs/elfo/pull/116 diff --git a/elfo-network/src/discovery/mod.rs b/elfo-network/src/discovery/mod.rs index 93d0a1a1..90d28012 100644 --- a/elfo-network/src/discovery/mod.rs +++ b/elfo-network/src/discovery/mod.rs @@ -64,6 +64,12 @@ struct ConnectionRejected { error: String, } +#[message] +struct ControlConnectionFailed { + // `Some` only on the client side. + transport: Option, +} + pub(super) struct Discovery { ctx: NetworkContext, node_map: Arc, @@ -108,6 +114,12 @@ impl Discovery { }); self.open_connection(&msg.transport, role); } + msg @ ControlConnectionFailed => { + if let Some(transport) = msg.transport { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + self.discover_one(transport); + } + } }); } @@ -161,13 +173,16 @@ impl Discovery { } fn discover(&mut self) { + for transport in self.ctx.config().discovery.predefined.clone() { + self.discover_one(transport); + } + } + + fn discover_one(&mut self, transport: Transport) { let msg = internode::SwitchToControl { groups: self.node_map.this.groups.clone(), }; - - for transport in self.ctx.config().discovery.predefined.clone() { - self.open_connection(&transport, ConnectionRole::Control(msg.clone())); - } + self.open_connection(&transport, ConnectionRole::Control(msg)); } fn open_connection( @@ -283,6 +298,8 @@ impl Discovery { // TODO: check launch_id. } + self.control_maintenance(socket, msg.transport.clone()); + // Only initiator (client) can start new connections, // because he knows the transport address. let Some(transport) = msg.transport else { @@ -369,6 +386,22 @@ impl Discovery { fn on_connection_rejected(&mut self, _msg: ConnectionRejected) { // TODO: something else? Retries? } + + fn control_maintenance(&mut self, mut socket: Socket, transport: Option) { + self.ctx.attach(Stream::once(async move { + // TODO: graceful termination. + let err = control_maintenance(&mut socket).await.unwrap_err(); + + info!( + message = "control connection closed", + socket = %socket.info, + peer = %socket.peer, + reason = %err, + ); + + ControlConnectionFailed { transport } + })); + } } async fn accept_connection( @@ -422,6 +455,16 @@ async fn accept_connection( }) } +async fn control_maintenance(socket: &mut Socket) -> Result<()> { + loop { + send_regular(socket, internode::Ping { payload: 0 }).await?; + recv_regular::(socket).await?; + send_regular(socket, internode::Pong { payload: 0 }).await?; + recv_regular::(socket).await?; + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } +} + fn infer_connections<'a>( one: &'a [internode::GroupInfo], two: &'a [internode::GroupInfo],