Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Peers that give bad Block Info #1331

Merged
merged 16 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ Description of the upcoming release here.
- [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing.
- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.

#### Breaking

- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed from the breaking section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were two Breaking sections.

The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code

### Changed

Expand All @@ -39,9 +32,11 @@ Description of the upcoming release here.
- [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`.
- [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set.
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.

### Removed

#### Breaking
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): The `manual_blocks_enabled` flag is removed from the CLI. The analog is a `debug` flag.
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.

### Removed
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 53 additions & 18 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_sync::ports::{
BlockImporterPort,
ConsensusPort,
PeerReportReason,
PeerToPeerPort,
};
use fuel_core_types::{
Expand All @@ -21,6 +22,10 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
AppScore,
PeerReport,
},
PeerId,
SourcePeer,
},
Expand All @@ -46,25 +51,17 @@ impl PeerToPeerPort for P2PAdapter {
async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
) -> anyhow::Result<Option<Vec<SourcePeer<SealedBlockHeader>>>> {
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
Ok(service
.get_sealed_block_headers(block_range_height)
.await?
.and_then(|(peer_id, headers)| {
let peer_id: PeerId = peer_id.into();
headers.map(|headers| {
headers
.into_iter()
.map(|header| SourcePeer {
peer_id: peer_id.clone(),
data: header,
})
.collect()
})
}))
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

Expand All @@ -81,7 +78,45 @@ impl PeerToPeerPort for P2PAdapter {
.get_transactions_from_peer(peer_id.into(), block)
.await
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_peer_report_reason: NewPeerReportReason = report.into();
service.report_peer(peer, new_peer_report_reason, service_name)?;
Ok(())
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}
}

struct NewPeerReportReason(PeerReportReason);

impl From<PeerReportReason> for NewPeerReportReason {
fn from(reason: PeerReportReason) -> Self {
Self(reason)
}
}

impl PeerReport for NewPeerReportReason {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are arbitrary values. We should do a spike or something to determine good values. Should we move these to a config?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a config struct or constants file containing all the reputation values seems useful. They could still be hardcoded for now, but it would let us manage all the reputation behaviors from a single place that's easy to find.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I added a config to P2PAdapter it's just hardcoded in the place where we init the adapter, but it gives us options in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn get_score_from_report(&self) -> AppScore {
match self.0 {
// Good
PeerReportReason::SuccessfulBlockImport => 5.,
// Bad
PeerReportReason::BadBlockHeader => -100.,
PeerReportReason::MissingTransactions => -100.,
PeerReportReason::InvalidTransactions => -100.,
PeerReportReason::MissingBlockHeaders => -100.,
PeerReportReason::InvalidBlock => -100.,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ impl ConnectionHandler for HeartbeatHandler {
Self::Error,
>,
> {
if let Some(inbound_block_height) = self.inbound.as_mut() {
match inbound_block_height.poll_unpin(cx) {
if let Some(inbound_stream_and_block_height) = self.inbound.as_mut() {
match inbound_stream_and_block_height.poll_unpin(cx) {
Poll::Ready(Err(_)) => {
debug!(target: "fuel-libp2p", "Incoming heartbeat errored");
self.inbound = None;
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
Some(ResponseChannelItem::SealedHeaders(channel)),
Ok(ResponseMessage::SealedHeaders(headers)),
) => {
if channel.send(Some((peer, headers))).is_err() {
if channel.send((peer, headers)).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
Expand Down Expand Up @@ -1191,7 +1191,7 @@ mod tests {
}
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
}

Expand Down Expand Up @@ -1595,7 +1595,7 @@ mod tests {

let expected = arbitrary_headers_for_range(range.clone());

if let Ok(Some((_, sealed_headers))) = response_message {
if let Ok((_, sealed_headers)) = response_message {
let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ impl PeerManager {
peer_id: &PeerId,
block_height: BlockHeight,
) {
if let Some(previous_heartbeat) = self
if let Some(time_elapsed) = self
.get_peer_info(peer_id)
.and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat())
{
debug!(target: "fuel-p2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat);
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed);
}

let heartbeat_data = HeartbeatData::new(block_height);
Expand Down
8 changes: 3 additions & 5 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1";
/// Max Size in Bytes of the Request Message
pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>();

pub type ChannelItem<T> = oneshot::Sender<Option<T>>;

// Peer receives a `RequestMessage`.
// It prepares a response in form of `OutboundResponse`
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
Expand Down Expand Up @@ -59,9 +57,9 @@ pub enum ResponseMessage {
/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(ChannelItem<SealedBlock>),
SealedHeaders(ChannelItem<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(ChannelItem<Vec<Transaction>>),
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
}

/// Response that is sent over the wire
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ enum TaskRequest {
},
GetSealedHeaders {
block_height_range: Range<u32>,
channel: oneshot::Sender<Option<(PeerId, Option<Vec<SealedBlockHeader>>)>>,
channel: oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>,
},
GetTransactions {
block_id: BlockId,
Expand Down Expand Up @@ -384,7 +384,7 @@ impl SharedState {
pub async fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> anyhow::Result<Option<(Vec<u8>, Option<Vec<SealedBlockHeader>>)>> {
) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
let (sender, receiver) = oneshot::channel();

if block_height_range.is_empty() {
Expand All @@ -402,7 +402,7 @@ impl SharedState {

receiver
.await
.map(|o| o.map(|(peer_id, headers)| (peer_id.to_bytes(), headers)))
.map(|(peer_id, headers)| (peer_id.to_bytes(), headers))
.map_err(|e| anyhow!("{}", e))
}

Expand Down
1 change: 1 addition & 0 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fuel-core-trace = { path = "../../trace" }
fuel-core-types = { path = "../../types", features = ["test-helpers"] }
mockall = { workspace = true }
test-case = { workspace = true }
tracing-subscriber = "0.3.17"
MitchTurner marked this conversation as resolved.
Show resolved Hide resolved

[features]
benchmarking = ["dep:mockall", "fuel-core-types/test-helpers"]
Loading