Skip to content

Commit

Permalink
peer_state: Robust state machine transitions (#251)
Browse files Browse the repository at this point in the history
This PR refactors the peer state. Previously the `AddressStore` and
`PeerState` where intertwined. The `AddressStore` contained an optional
`ConnectionID`, while the `PeerState` states contained a mandatory
score.

This PR separates the address store and the peer state, while moving the
logic of the peer state transitioning to a separate module. While at it,
have added documentation and testing for state machine transitions.

Changes include:
- Secondary connection from the PeerContext is merged into the
`PeerState`
- Connection Ids are removed from the Address Store
- Transport manager is refactored to use the new state machine, this
keeps a minimal code around in the manager

### Testing Done
- added extra tests to the peer state
- tested with subp2p-explorer for discovering kusama

This builds upon #250 for a
bigger refactor effort to track addresses in a healthier

Aims at improving:
- #239
- #238
- #237

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
lexnv and dmitry-markin authored Oct 28, 2024
1 parent 2ef3751 commit f112b59
Show file tree
Hide file tree
Showing 5 changed files with 1,416 additions and 1,147 deletions.
36 changes: 5 additions & 31 deletions src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{error::DialError, types::ConnectionId, PeerId};
use crate::{error::DialError, PeerId};

use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
Expand Down Expand Up @@ -50,9 +50,6 @@ pub struct AddressRecord {

/// Address.
address: Multiaddr,

/// Connection ID, if specified.
connection_id: Option<ConnectionId>,
}

impl AsRef<Multiaddr> for AddressRecord {
Expand All @@ -64,12 +61,7 @@ impl AsRef<Multiaddr> for AddressRecord {
impl AddressRecord {
/// Create new `AddressRecord` and if `address` doesn't contain `P2p`,
/// append the provided `PeerId` to the address.
pub fn new(
peer: &PeerId,
address: Multiaddr,
score: i32,
connection_id: Option<ConnectionId>,
) -> Self {
pub fn new(peer: &PeerId, address: Multiaddr, score: i32) -> Self {
let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
address.with(Protocol::P2p(
Multihash::from_bytes(&peer.to_bytes()).expect("valid peer id"),
Expand All @@ -78,11 +70,7 @@ impl AddressRecord {
address
};

Self {
address,
score,
connection_id,
}
Self { address, score }
}

/// Create `AddressRecord` from `Multiaddr`.
Expand All @@ -97,7 +85,6 @@ impl AddressRecord {
Some(AddressRecord {
address,
score: 0i32,
connection_id: None,
})
}

Expand All @@ -112,20 +99,10 @@ impl AddressRecord {
&self.address
}

/// Get connection ID.
pub fn connection_id(&self) -> &Option<ConnectionId> {
&self.connection_id
}

/// Update score of an address.
pub fn update_score(&mut self, score: i32) {
self.score = self.score.saturating_add(score);
}

/// Set `ConnectionId` for the [`AddressRecord`].
pub fn set_connection_id(&mut self, connection_id: ConnectionId) {
self.connection_id = Some(connection_id);
}
}

impl PartialEq for AddressRecord {
Expand Down Expand Up @@ -161,8 +138,8 @@ impl FromIterator<Multiaddr> for AddressStore {
fn from_iter<T: IntoIterator<Item = Multiaddr>>(iter: T) -> Self {
let mut store = AddressStore::new();
for address in iter {
if let Some(address) = AddressRecord::from_multiaddr(address) {
store.insert(address);
if let Some(record) = AddressRecord::from_multiaddr(address) {
store.insert(record);
}
}

Expand Down Expand Up @@ -292,7 +269,6 @@ mod tests {
.with(Protocol::from(address.ip()))
.with(Protocol::Tcp(address.port())),
score,
None,
)
}

Expand All @@ -316,7 +292,6 @@ mod tests {
.with(Protocol::Tcp(address.port()))
.with(Protocol::Ws(std::borrow::Cow::Owned("/".to_string()))),
score,
None,
)
}

Expand All @@ -340,7 +315,6 @@ mod tests {
.with(Protocol::Udp(address.port()))
.with(Protocol::QuicV1),
score,
None,
)
}

Expand Down
103 changes: 44 additions & 59 deletions src/transport/manager/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use crate::{
executor::Executor,
protocol::ProtocolSet,
transport::manager::{
address::{AddressRecord, AddressStore},
types::{PeerContext, PeerState, SupportedTransport},
address::AddressRecord,
peer_state::StateDialResult,
types::{PeerContext, SupportedTransport},
ProtocolContext, TransportManagerEvent, LOG_TARGET,
},
types::{protocol::ProtocolName, ConnectionId},
Expand Down Expand Up @@ -223,11 +224,7 @@ impl TransportManagerHandle {
);

let mut peers = self.peers.write();
let entry = peers.entry(*peer).or_insert_with(|| PeerContext {
state: PeerState::Disconnected { dial_record: None },
addresses: AddressStore::new(),
secondary_connection: None,
});
let entry = peers.entry(*peer).or_insert_with(|| PeerContext::default());

// All addresses should be valid at this point, since the peer ID was either added or
// double checked.
Expand All @@ -249,36 +246,21 @@ impl TransportManagerHandle {
}

{
match self.peers.read().get(peer) {
Some(PeerContext {
state: PeerState::Connected { .. },
..
}) => return Err(ImmediateDialError::AlreadyConnected),
Some(PeerContext {
state: PeerState::Disconnected { dial_record },
addresses,
..
}) => {
if addresses.is_empty() {
return Err(ImmediateDialError::NoAddressAvailable);
}

// peer is already being dialed, don't dial again until the first dial concluded
if dial_record.is_some() {
tracing::debug!(
target: LOG_TARGET,
?peer,
?dial_record,
"peer is aready being dialed",
);
return Ok(());
}
}
Some(PeerContext {
state: PeerState::Dialing { .. } | PeerState::Opening { .. },
..
}) => return Ok(()),
None => return Err(ImmediateDialError::NoAddressAvailable),
let peers = self.peers.read();
let Some(PeerContext { state, addresses }) = peers.get(peer) else {
return Err(ImmediateDialError::NoAddressAvailable);
};

match state.can_dial() {
StateDialResult::AlreadyConnected =>
return Err(ImmediateDialError::AlreadyConnected),
StateDialResult::DialingInProgress => return Ok(()),
StateDialResult::Ok => {}
};

// Check if we have enough addresses to dial.
if addresses.is_empty() {
return Err(ImmediateDialError::NoAddressAvailable);
}
}

Expand Down Expand Up @@ -338,6 +320,11 @@ impl TransportHandle {

#[cfg(test)]
mod tests {
use crate::transport::manager::{
address::AddressStore,
peer_state::{ConnectionRecord, PeerState},
};

use super::*;
use multihash::Multihash;
use parking_lot::lock_api::RwLock;
Expand Down Expand Up @@ -454,16 +441,16 @@ mod tests {
peer,
PeerContext {
state: PeerState::Connected {
record: AddressRecord::from_multiaddr(
Multiaddr::empty()
record: ConnectionRecord {
address: Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
dial_record: None,
connection_id: ConnectionId::from(0),
},
secondary: None,
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down Expand Up @@ -497,15 +484,15 @@ mod tests {
peer,
PeerContext {
state: PeerState::Dialing {
record: AddressRecord::from_multiaddr(
Multiaddr::empty()
dial_record: ConnectionRecord {
address: Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
connection_id: ConnectionId::from(0),
},
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down Expand Up @@ -539,7 +526,6 @@ mod tests {
peer,
PeerContext {
state: PeerState::Disconnected { dial_record: None },
secondary_connection: None,
addresses: AddressStore::new(),
},
);
Expand All @@ -565,17 +551,16 @@ mod tests {
peer,
PeerContext {
state: PeerState::Disconnected {
dial_record: Some(
AddressRecord::from_multiaddr(
Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
)
.unwrap(),
),
dial_record: Some(ConnectionRecord::new(
peer,
Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
.with(Protocol::Tcp(8888))
.with(Protocol::P2p(Multihash::from(peer))),
ConnectionId::from(0),
)),
},
secondary_connection: None,

addresses: AddressStore::from_iter(
vec![Multiaddr::empty()
.with(Protocol::Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)))
Expand Down
Loading

0 comments on commit f112b59

Please sign in to comment.