Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Peers that give bad Block Info #1331

Merged
merged 16 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@ Description of the upcoming release here.
- [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing.
- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.

#### Breaking

- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed from the breaking section?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were two Breaking sections.

The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code

### Changed

Expand All @@ -39,9 +32,11 @@ Description of the upcoming release here.
- [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`.
- [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set.
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.

### Removed

#### Breaking
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322): The `manual_blocks_enabled` flag is removed from the CLI. The analog is a `debug` flag.
- [#1322](https://github.com/FuelLabs/fuel-core/pull/1322):
The `debug` flag is added to the CLI. The flag should be used for local development only. Enabling debug mode:
- Allows GraphQL Endpoints to arbitrarily advance blocks.
- Enables debugger GraphQL Endpoints.
- Allows setting `utxo_validation` to `false`.

### Removed
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
use fuel_core_consensus_module::block_verifier::Verifier;
use fuel_core_txpool::service::SharedState as TxPoolSharedState;
use fuel_core_types::fuel_types::BlockHeight;
#[cfg(feature = "p2p")]
use fuel_core_types::services::p2p::peer_reputation::AppScore;
use std::sync::Arc;

pub mod block_importer;
Expand Down Expand Up @@ -87,6 +89,17 @@ pub struct BlockImporterAdapter {
#[derive(Clone)]
pub struct P2PAdapter {
service: Option<fuel_core_p2p::service::SharedState>,
peer_report_config: PeerReportConfig,
}

#[cfg(feature = "p2p")]
#[derive(Clone)]
pub struct PeerReportConfig {
pub successful_block_import: AppScore,
pub missing_block_headers: AppScore,
pub bad_block_header: AppScore,
pub missing_transactions: AppScore,
pub invalid_transactions: AppScore,
}

#[cfg(not(feature = "p2p"))]
Expand All @@ -95,8 +108,14 @@ pub struct P2PAdapter;

#[cfg(feature = "p2p")]
impl P2PAdapter {
pub fn new(service: Option<fuel_core_p2p::service::SharedState>) -> Self {
Self { service }
pub fn new(
service: Option<fuel_core_p2p::service::SharedState>,
peer_report_config: PeerReportConfig,
) -> Self {
Self {
service,
peer_report_config,
}
}
}

Expand Down
79 changes: 61 additions & 18 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use fuel_core_services::stream::BoxStream;
use fuel_core_sync::ports::{
BlockImporterPort,
ConsensusPort,
PeerReportReason,
PeerToPeerPort,
};
use fuel_core_types::{
Expand All @@ -21,6 +22,10 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::{
peer_reputation::{
AppScore,
PeerReport,
},
PeerId,
SourcePeer,
},
Expand All @@ -46,25 +51,17 @@ impl PeerToPeerPort for P2PAdapter {
async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
) -> anyhow::Result<Option<Vec<SourcePeer<SealedBlockHeader>>>> {
) -> anyhow::Result<SourcePeer<Option<Vec<SealedBlockHeader>>>> {
if let Some(service) = &self.service {
Ok(service
.get_sealed_block_headers(block_range_height)
.await?
.and_then(|(peer_id, headers)| {
let peer_id: PeerId = peer_id.into();
headers.map(|headers| {
headers
.into_iter()
.map(|header| SourcePeer {
peer_id: peer_id.clone(),
data: header,
})
.collect()
})
}))
let (peer_id, headers) =
service.get_sealed_block_headers(block_range_height).await?;
let sourced_headers = SourcePeer {
peer_id: peer_id.into(),
data: headers,
};
Ok(sourced_headers)
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

Expand All @@ -81,11 +78,57 @@ impl PeerToPeerPort for P2PAdapter {
.get_transactions_from_peer(peer_id.into(), block)
.await
} else {
Ok(None)
Err(anyhow::anyhow!("No P2P service available"))
}
}

async fn report_peer(
&self,
peer: PeerId,
report: PeerReportReason,
) -> anyhow::Result<()> {
if let Some(service) = &self.service {
let service_name = "Sync";
let new_report = self.process_report(report);
service.report_peer(peer, new_report, service_name)?;
Ok(())
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
}
}

impl P2PAdapter {
fn process_report(&self, reason: PeerReportReason) -> P2PAdapterPeerReport {
let score = match &reason {
PeerReportReason::SuccessfulBlockImport => {
self.peer_report_config.successful_block_import
}
PeerReportReason::MissingBlockHeaders => {
self.peer_report_config.missing_block_headers
}
PeerReportReason::BadBlockHeader => self.peer_report_config.bad_block_header,
PeerReportReason::MissingTransactions => {
self.peer_report_config.missing_transactions
}
PeerReportReason::InvalidTransactions => {
self.peer_report_config.invalid_transactions
}
};
P2PAdapterPeerReport { score }
}
}

struct P2PAdapterPeerReport {
score: AppScore,
}

impl PeerReport for P2PAdapterPeerReport {
fn get_score_from_report(&self) -> AppScore {
self.score
}
}

#[async_trait::async_trait]
impl BlockImporterPort for BlockImporterAdapter {
fn committed_height_stream(&self) -> BoxStream<BlockHeight> {
Expand Down
19 changes: 17 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,23 @@ pub fn init_sub_services(
};

#[cfg(feature = "p2p")]
let p2p_adapter =
P2PAdapter::new(network.as_ref().map(|network| network.shared.clone()));
let p2p_adapter = {
use crate::service::adapters::PeerReportConfig;

// Hardcoded for now, but left here to be configurable in the future.
let peer_report_config = PeerReportConfig {
successful_block_import: 5.,
missing_block_headers: -100.,
bad_block_header: -100.,
missing_transactions: -100.,
invalid_transactions: -100.,
MitchTurner marked this conversation as resolved.
Show resolved Hide resolved
};
P2PAdapter::new(
network.as_ref().map(|network| network.shared.clone()),
peer_report_config,
)
};

#[cfg(not(feature = "p2p"))]
let p2p_adapter = P2PAdapter::new();

Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ impl ConnectionHandler for HeartbeatHandler {
Self::Error,
>,
> {
if let Some(inbound_block_height) = self.inbound.as_mut() {
match inbound_block_height.poll_unpin(cx) {
if let Some(inbound_stream_and_block_height) = self.inbound.as_mut() {
match inbound_stream_and_block_height.poll_unpin(cx) {
Poll::Ready(Err(_)) => {
debug!(target: "fuel-libp2p", "Incoming heartbeat errored");
self.inbound = None;
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
Some(ResponseChannelItem::SealedHeaders(channel)),
Ok(ResponseMessage::SealedHeaders(headers)),
) => {
if channel.send(Some((peer, headers))).is_err() {
if channel.send((peer, headers)).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
Expand Down Expand Up @@ -1191,7 +1191,7 @@ mod tests {
}
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
}

Expand Down Expand Up @@ -1595,7 +1595,7 @@ mod tests {

let expected = arbitrary_headers_for_range(range.clone());

if let Ok(Some((_, sealed_headers))) = response_message {
if let Ok((_, sealed_headers)) = response_message {
let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ impl PeerManager {
peer_id: &PeerId,
block_height: BlockHeight,
) {
if let Some(previous_heartbeat) = self
if let Some(time_elapsed) = self
.get_peer_info(peer_id)
.and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat())
{
debug!(target: "fuel-p2p", "Previous hearbeat happened {:?} seconds ago", previous_heartbeat);
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed);
}

let heartbeat_data = HeartbeatData::new(block_height);
Expand Down
8 changes: 3 additions & 5 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1";
/// Max Size in Bytes of the Request Message
pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>();

pub type ChannelItem<T> = oneshot::Sender<Option<T>>;

// Peer receives a `RequestMessage`.
// It prepares a response in form of `OutboundResponse`
// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format.
Expand Down Expand Up @@ -59,9 +57,9 @@ pub enum ResponseMessage {
/// Holds oneshot channels for specific responses
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(ChannelItem<SealedBlock>),
SealedHeaders(ChannelItem<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(ChannelItem<Vec<Transaction>>),
Block(oneshot::Sender<Option<SealedBlock>>),
SealedHeaders(oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>),
Transactions(oneshot::Sender<Option<Vec<Transaction>>>),
}

/// Response that is sent over the wire
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ enum TaskRequest {
},
GetSealedHeaders {
block_height_range: Range<u32>,
channel: oneshot::Sender<Option<(PeerId, Option<Vec<SealedBlockHeader>>)>>,
channel: oneshot::Sender<(PeerId, Option<Vec<SealedBlockHeader>>)>,
},
GetTransactions {
block_id: BlockId,
Expand Down Expand Up @@ -384,7 +384,7 @@ impl SharedState {
pub async fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> anyhow::Result<Option<(Vec<u8>, Option<Vec<SealedBlockHeader>>)>> {
) -> anyhow::Result<(Vec<u8>, Option<Vec<SealedBlockHeader>>)> {
let (sender, receiver) = oneshot::channel();

if block_height_range.is_empty() {
Expand All @@ -402,7 +402,7 @@ impl SharedState {

receiver
.await
.map(|o| o.map(|(peer_id, headers)| (peer_id.to_bytes(), headers)))
.map(|(peer_id, headers)| (peer_id.to_bytes(), headers))
.map_err(|e| anyhow!("{}", e))
}

Expand Down
1 change: 1 addition & 0 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fuel-core-trace = { path = "../../trace" }
fuel-core-types = { path = "../../types", features = ["test-helpers"] }
mockall = { workspace = true }
test-case = { workspace = true }
tracing-subscriber = { workspace = true }

[features]
benchmarking = ["dep:mockall", "fuel-core-types/test-helpers"]
Loading