Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

shutdown devp2p connections #9711

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ impl SyncHandler {
}
Err(()) => {
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
io.disconnect_peer(peer_id);
io.disable_peer(peer_id);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated issue, if chunk is bad, we should disable peer (disconnect + mark as bad)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this is ok? How are the chunks requested? What if the peer just sent a chunk from different snapshot and we don't differentiate that on the request level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't request 2 different snapshots anyway, so I guess this should be OK.

return Ok(());
}
}
Expand Down
26 changes: 23 additions & 3 deletions util/network-devp2p/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::{SocketAddr, Shutdown};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::Duration;
use hash::{keccak, write_keccak};
Expand All @@ -42,9 +42,13 @@ const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30);
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;

pub trait GenericSocket : Read + Write {
fn shutdown(&self) -> io::Result<()>;
}

impl GenericSocket for TcpStream {
fn shutdown(&self) -> io::Result<()> {
self.shutdown(Shutdown::Both)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never shutdown tcp stream

}
}

pub struct GenericConnection<Socket: GenericSocket> {
Expand All @@ -64,6 +68,14 @@ pub struct GenericConnection<Socket: GenericSocket> {
registered: AtomicBool,
}

impl<Socket: GenericSocket> Drop for GenericConnection<Socket> {
fn drop(&mut self) {
if let Err(err) = self.socket.shutdown() {
warn!(target: "network", "Socket shutdown failed: {:?}", err);
}
}
}

impl<Socket: GenericSocket> GenericConnection<Socket> {
pub fn expect(&mut self, size: usize) {
trace!(target:"network", "Expect to read {} bytes", size);
Expand Down Expand Up @@ -574,7 +586,11 @@ mod tests {
}
}

impl GenericSocket for TestSocket {}
impl GenericSocket for TestSocket {
fn shutdown(&self) -> io::Result<()> {
Ok(())
}
}

struct TestBrokenSocket {
error: String
Expand All @@ -596,7 +612,11 @@ mod tests {
}
}

impl GenericSocket for TestBrokenSocket {}
impl GenericSocket for TestBrokenSocket {
fn shutdown(&self) -> io::Result<()> {
Ok(())
}
}

type TestConnection = GenericConnection<TestSocket>;

Expand Down
78 changes: 35 additions & 43 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl Host {
}
for p in to_kill {
trace!(target: "network", "Disconnecting on reserved-only mode: {}", p);
self.kill_connection(p, io, false);
self.kill_connection(p, io);
}
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ impl Host {
}
for p in to_kill {
trace!(target: "network", "Disconnecting on shutdown: {}", p);
self.kill_connection(p, io, true);
self.kill_connection(p, io);
}
io.unregister_handler();
}
Expand Down Expand Up @@ -532,7 +532,7 @@ impl Host {
}
for p in to_kill {
trace!(target: "network", "Ping timeout: {}", p);
self.kill_connection(p, io, true);
self.kill_connection(p, io);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note_failure here as well?

}
}

Expand Down Expand Up @@ -692,11 +692,6 @@ impl Host {
}
}

fn connection_closed(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Connection closed: {}", token);
self.kill_connection(token, io, true);
}

fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
Expand Down Expand Up @@ -813,7 +808,7 @@ impl Host {
}

if kill {
self.kill_connection(token, io, true);
self.kill_connection(token, io);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note_failure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in some cases of kill = true, we already noted failure, when in others we did not. I'll fix it.

}

let handlers = self.handlers.read();
Expand All @@ -825,7 +820,7 @@ impl Host {
if duplicate {
trace!(target: "network", "Rejected duplicate connection: {}", token);
session.lock().disconnect(io, DisconnectReason::DuplicatePeer);
self.kill_connection(token, io, false);
self.kill_connection(token, io);
return;
}
for p in ready_data {
Expand Down Expand Up @@ -908,13 +903,11 @@ impl Host {

fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
trace!(target: "network", "Connection timeout: {}", token);
self.kill_connection(token, io, true)
self.kill_connection(token, io)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we should note_failure

}

fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>, remote: bool) {
fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed remote flag as I believe it was confusing and often used incorrectly.

e.g.

  • fn stop was calling it for all nodes with remote set to true and because of that, we were calling note_failure for all of them

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now we are deregistering the stream everytime we call kill_connection. Previously it was done only if session was done. Seems that deregister will remove entry from self.sessions, but only when session is expired - otherwise we will leak the the value in the HashMap forever.

The code is really fragile here, and I think we should be careful with removing stuff that just "seems used incorrectly", especially when it runs for couple of years now and we are not really sure if there are any bugs in that code (i.e. is this strictly related to TCP streams hanging in the WAIT state?).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second issue: So now we don't note_failure on any of the previous kill_connection(_,_, true) - to avoid breaking stuff we should still review all call sites of that function if note_failure should be there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems strange to remove remote just because it was wrongly used. Maybe just rename the variable? Or have a deregister bool and a note_failure one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now we are deregistering the stream every time we call kill_connection. Previously it was done only if session was done. Seems that deregister will remove entry from self.sessions, but only when session is expired - otherwise we will leak the the value in the HashMap forever.

@tomusdrw the leak is happening now. Session is always set to expired, session is removed from handlers, but the value is never deregistered if remote = false, cause s.done() returns true as long as the socket is writeable.

In this pr the leak is not happening, cause after setting expired to true, and unregistering handlers, we always call deregister.

https://github.com/paritytech/parity-ethereum/blob/05550429c7dc0e154710d171bb432e68d000e34d/util/network-devp2p/src/host.rs#L917-L926

The code is really fragile here, and I think we should be careful with removing stuff that just "seems used incorrectly", especially when it runs for couple of years now and we are not really sure if there are any bugs in that code (i.e. is this strictly related to TCP streams hanging in the WAIT state?).

I believe that CLOSE_WAIT state is just a symptom of a bigger problem. After we call kill_connection, sessions leak in memory and they are already disconnected from the handlers. Remote nodes close the connection to our unresponsive socket, but because they are no handlers connected, we are stuck on the CLOSE_WAIT state.

let mut to_disconnect: Vec<ProtocolId> = Vec::new();
let mut failure_id = None;
let mut deregister = false;
let mut expired_session = None;
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.read();
Expand All @@ -930,25 +923,19 @@ impl Host {
}
}
s.set_expired();
failure_id = s.id().cloned();
}
deregister = remote || s.done();
}
}
if let Some(id) = failure_id {
if remote {
self.nodes.write().note_failure(&id);
}
}

for p in to_disconnect {
let reserved = self.reserved_nodes.read();
if let Some(h) = self.handlers.read().get(&p) {
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
}
if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deregister set to false lead to incorrect state of the application. connection that was supposed to be killed, was never killed and occupied an entry in sessions hashmap

}

// deregister stream needs to be called to remove session
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
}

fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
Expand Down Expand Up @@ -996,7 +983,7 @@ impl IoHandler<NetworkIoMessage> for Host {
fn stream_hup(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
trace!(target: "network", "Hup: {}", stream);
match stream {
FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io),
FIRST_SESSION ... LAST_SESSION => self.kill_connection(stream, io),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make sense to note_failure here imho.

_ => warn!(target: "network", "Unexpected hup"),
};
}
Expand Down Expand Up @@ -1115,7 +1102,7 @@ impl IoHandler<NetworkIoMessage> for Host {
session.lock().disconnect(io, DisconnectReason::DisconnectRequested);
}
trace!(target: "network", "Disconnect requested {}", peer);
self.kill_connection(*peer, io, false);
self.kill_connection(*peer, io);
},
NetworkIoMessage::DisablePeer(ref peer) => {
let session = { self.sessions.read().get(*peer).cloned() };
Expand All @@ -1128,7 +1115,7 @@ impl IoHandler<NetworkIoMessage> for Host {
}
}
trace!(target: "network", "Disabling peer {}", peer);
self.kill_connection(*peer, io, false);
self.kill_connection(*peer, io);
},
NetworkIoMessage::InitPublicInterface =>
self.init_public_interface(io).unwrap_or_else(|e| warn!("Error initializing public interface: {:?}", e)),
Expand Down Expand Up @@ -1249,22 +1236,27 @@ fn load_key(path: &Path) -> Option<Secret> {
}
}

#[test]
fn key_save_load() {
#[cfg(test)]
mod tests {
use ethereum_types::H256;
use tempdir::TempDir;
use super::{Host, NetworkConfiguration, load_key, save_key};

#[test]
fn key_save_load() {
let tempdir = TempDir::new("").unwrap();
let key = H256::random().into();
save_key(tempdir.path(), &key);
let r = load_key(tempdir.path());
assert_eq!(key, r.unwrap());
}

let tempdir = TempDir::new("").unwrap();
let key = H256::random().into();
save_key(tempdir.path(), &key);
let r = load_key(tempdir.path());
assert_eq!(key, r.unwrap());
}

#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new_local();
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".parse().unwrap();
config.use_secret = Some(key);
let host: Host = Host::new(config, None).unwrap();
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new_local();
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".parse().unwrap();
config.use_secret = Some(key);
let host: Host = Host::new(config, None).unwrap();
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
}
}