Skip to content

Commit

Permalink
fix(network): reconnect control in case of one-way discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 10, 2023
1 parent 6fa3c58 commit 8083cf3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- network: unrouted requests don't lead to infinite waiting anymore ([#116]).
- network: release routed if flow closed.
- network: reconnect a control connection in the case of one-way discovery.
- core: use the right slot's key to remove actors.

[#116]: https://github.com/elfo-rs/elfo/pull/116
Expand Down
51 changes: 47 additions & 4 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ struct ConnectionRejected {
error: String,
}

#[message]
struct ControlConnectionFailed {
// `Some` only on the client side.
transport: Option<Transport>,
}

pub(super) struct Discovery {
ctx: NetworkContext,
node_map: Arc<NodeMap>,
Expand Down Expand Up @@ -108,6 +114,12 @@ impl Discovery {
});
self.open_connection(&msg.transport, role);
}
msg @ ControlConnectionFailed => {
if let Some(transport) = msg.transport {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
self.discover_one(transport);
}
}
});
}

Expand Down Expand Up @@ -161,13 +173,16 @@ impl Discovery {
}

fn discover(&mut self) {
for transport in self.ctx.config().discovery.predefined.clone() {
self.discover_one(transport);
}
}

fn discover_one(&mut self, transport: Transport) {
let msg = internode::SwitchToControl {
groups: self.node_map.this.groups.clone(),
};

for transport in self.ctx.config().discovery.predefined.clone() {
self.open_connection(&transport, ConnectionRole::Control(msg.clone()));
}
self.open_connection(&transport, ConnectionRole::Control(msg));
}

fn open_connection(
Expand Down Expand Up @@ -283,6 +298,8 @@ impl Discovery {
// TODO: check launch_id.
}

self.control_maintenance(socket, msg.transport.clone());

// Only initiator (client) can start new connections,
// because he knows the transport address.
let Some(transport) = msg.transport else {
Expand Down Expand Up @@ -369,6 +386,22 @@ impl Discovery {
fn on_connection_rejected(&mut self, _msg: ConnectionRejected) {
// TODO: something else? Retries?
}

fn control_maintenance(&mut self, mut socket: Socket, transport: Option<Transport>) {
self.ctx.attach(Stream::once(async move {
// TODO: graceful termination.
let err = control_maintenance(&mut socket).await.unwrap_err();

info!(
message = "control connection closed",
socket = %socket.info,
peer = %socket.peer,
reason = %err,
);

ControlConnectionFailed { transport }
}));
}
}

async fn accept_connection(
Expand Down Expand Up @@ -422,6 +455,16 @@ async fn accept_connection(
})
}

async fn control_maintenance(socket: &mut Socket) -> Result<()> {
loop {
send_regular(socket, internode::Ping { payload: 0 }).await?;
recv_regular::<internode::Ping>(socket).await?;
send_regular(socket, internode::Pong { payload: 0 }).await?;
recv_regular::<internode::Pong>(socket).await?;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}

fn infer_connections<'a>(
one: &'a [internode::GroupInfo],
two: &'a [internode::GroupInfo],
Expand Down

0 comments on commit 8083cf3

Please sign in to comment.