Skip to content

Commit

Permalink
disconnect on receiving message too
Browse files Browse the repository at this point in the history
  • Loading branch information
blckngm committed Apr 15, 2022
1 parent ab7f2ce commit 711f556
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions crates/mem-pool/src/sync/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub fn sync_server_protocol(shared: Arc<Mutex<SyncServerState>>) -> ProtocolMeta
let control = control.clone();
let shared = shared.clone();
tokio::spawn(async move {
let mut subscribed = false;
while let Some(Ok(msg)) = read_part.next().await {
let requested_block = match P2PSyncRequestReader::from_slice(&msg) {
Err(_) => {
Expand Down Expand Up @@ -183,6 +184,7 @@ pub fn sync_server_protocol(shared: Arc<Mutex<SyncServerState>>) -> ProtocolMeta
subscribers.len = shared.subscribers.len(),
added = context.id.value(),
);
subscribed = true;
break;
} else {
let try_again_block =
Expand All @@ -206,14 +208,22 @@ pub fn sync_server_protocol(shared: Arc<Mutex<SyncServerState>>) -> ProtocolMeta
);
}
}
// Wait for stream close and then remove this peer from subscribers.
while let Some(Ok(_)) = read_part.next().await {}
let mut shared = shared.lock().await;
shared.subscribers.remove(&context.id);
tracing::info!(
subscribers.len = shared.subscribers.len(),
removed = context.id.value(),
);
if subscribed {
// We are publishing and do not expect any more messages
// from the client.
//
// If we receive a message, or there is an error, or the
// stream is closed, remove the peer from subscribers and
// disconnect.
let _ = read_part.next().await;
warn_result(control.disconnect(context.id).await);
let mut shared = shared.lock().await;
shared.subscribers.remove(&context.id);
tracing::info!(
subscribers.len = shared.subscribers.len(),
removed = context.id.value(),
);
}
});
}))
.build()
Expand Down

0 comments on commit 711f556

Please sign in to comment.