Skip to content

Commit

Permalink
Fuel/Request_Response v0.0.2: More meaningful error messages (#2377)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
<!-- List of related issues/PRs -->

Closes #1311

## Description
<!-- List of detailed changes -->

This PR follows #2362, which introduces a new version of the protocol
`/fuel/req_response/0.0.2` which allows to return Errors in the
response. This PR makes the following changes:
- [X] the different errors that can be returned in a response are
implemented.
- [X] The type of `ResponseSender` used by a peer to propagate received
responses upstream is changed to allow the rrors to be propagated
- [X] Error Responses received by a peer following a request are
displayed with as log messages with `warning` level.


## Checklist
- [ ] Breaking changes are clearly marked as such in the PR description
and changelog
- [ ] New behavior is reflected in tests
- [ ] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [ ] I have reviewed the code myself
- [ ] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?
  • Loading branch information
acerone85 authored Nov 14, 2024
1 parent 1396b5c commit 552ba75
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.

#### Breaking
- [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`.
Expand Down
14 changes: 6 additions & 8 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,17 @@ impl NetworkCodec for PostcardCodec {
fn get_req_res_protocols(
&self,
) -> impl Iterator<Item = <Self as request_response::Codec>::Protocol> {
// TODO: Iterating over versions in reverse order should prefer
// TODO: https://github.com/FuelLabs/fuel-core/issues/2374
// Iterating over versions in reverse order should prefer
// peers to use V2 over V1 for exchanging messages. However, this is
// not guaranteed by the specs for the `request_response` protocol.
// not guaranteed by the specs for the `request_response` protocol,
// and it should be tested.
PostcardProtocol::iter().rev()
}
}

#[derive(Debug, Default, Clone, EnumIter)]
#[derive(Debug, Clone, EnumIter)]
pub enum PostcardProtocol {
#[default]
V1,
V2,
}
Expand All @@ -206,7 +207,6 @@ impl AsRef<str> for PostcardProtocol {
#[cfg(test)]
#[allow(non_snake_case)]
mod tests {

use fuel_core_types::blockchain::SealedBlockHeader;
use request_response::Codec as _;

Expand Down Expand Up @@ -310,10 +310,8 @@ mod tests {
async fn codec__serialzation_roundtrip_using_v1_on_error_response_returns_predefined_error_code(
) {
// Given
// TODO: https://github.com/FuelLabs/fuel-core/issues/1311
// Change this to a different ResponseMessageErrorCode once these have been implemented.
let response = V2ResponseMessage::SealedHeaders(Err(
ResponseMessageErrorCode::ProtocolV1EmptyResponse,
ResponseMessageErrorCode::RequestedRangeTooLarge,
));
let mut codec = PostcardCodec::new(1024);
let mut buf = Vec::with_capacity(1024);
Expand Down
28 changes: 13 additions & 15 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,7 @@ impl FuelP2PService {
let send_ok = match channel {
ResponseSender::SealedHeaders(c) => match response {
V2ResponseMessage::SealedHeaders(v) => {
// TODO: https://github.com/FuelLabs/fuel-core/issues/1311
// Change type of ResponseSender and remove the .ok() here
c.send(Ok((peer, Ok(v.ok())))).is_ok()
c.send(Ok((peer, Ok(v)))).is_ok()
}
_ => {
warn!(
Expand All @@ -690,7 +688,7 @@ impl FuelP2PService {
},
ResponseSender::Transactions(c) => match response {
V2ResponseMessage::Transactions(v) => {
c.send(Ok((peer, Ok(v.ok())))).is_ok()
c.send(Ok((peer, Ok(v)))).is_ok()
}
_ => {
warn!(
Expand All @@ -703,7 +701,7 @@ impl FuelP2PService {
},
ResponseSender::TransactionsFromPeer(c) => match response {
V2ResponseMessage::Transactions(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand All @@ -715,7 +713,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolAllTransactionsIds(c) => match response {
V2ResponseMessage::TxPoolAllTransactionsIds(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand All @@ -727,7 +725,7 @@ impl FuelP2PService {
},
ResponseSender::TxPoolFullTransactions(c) => match response {
V2ResponseMessage::TxPoolFullTransactions(v) => {
c.send((peer, Ok(v.ok()))).is_ok()
c.send((peer, Ok(v))).is_ok()
}
_ => {
warn!(
Expand Down Expand Up @@ -1719,12 +1717,12 @@ mod tests {

if let Ok(response) = response_message {
match response {
Ok((_, Ok(Some(sealed_headers)))) => {
Ok((_, Ok(Ok(sealed_headers)))) => {
let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
},
Ok((_, Ok(None))) => {
tracing::error!("Node A did not return any headers");
Ok((_, Ok(Err(e)))) => {
tracing::error!("Node A did not return any headers: {:?}", e);
let _ = tx_test_end.send(false).await;
},
Ok((_, Err(e))) => {
Expand Down Expand Up @@ -1752,12 +1750,12 @@ mod tests {

if let Ok(response) = response_message {
match response {
Ok((_, Ok(Some(transactions)))) => {
Ok((_, Ok(Ok(transactions)))) => {
let check = transactions.len() == 1 && transactions[0].0.len() == 5;
let _ = tx_test_end.send(check).await;
},
Ok((_, Ok(None))) => {
tracing::error!("Node A did not return any transactions");
Ok((_, Ok(Err(e)))) => {
tracing::error!("Node A did not return any transactions: {:?}", e);
let _ = tx_test_end.send(false).await;
},
Ok((_, Err(e))) => {
Expand All @@ -1782,7 +1780,7 @@ mod tests {
tokio::spawn(async move {
let response_message = rx_orchestrator.await;

if let Ok((_, Ok(Some(transaction_ids)))) = response_message {
if let Ok((_, Ok(Ok(transaction_ids)))) = response_message {
let tx_ids: Vec<TxId> = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect();
let check = transaction_ids.len() == 5 && transaction_ids.iter().zip(tx_ids.iter()).all(|(a, b)| a == b);
let _ = tx_test_end.send(check).await;
Expand All @@ -1799,7 +1797,7 @@ mod tests {
tokio::spawn(async move {
let response_message = rx_orchestrator.await;

if let Ok((_, Ok(Some(transactions)))) = response_message {
if let Ok((_, Ok(Ok(transactions)))) = response_message {
let txs: Vec<Option<NetworkableTransactionPool>> = tx_ids.iter().enumerate().map(|(i, _)| {
if i == 0 {
None
Expand Down
69 changes: 64 additions & 5 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ pub enum ResponseMessageErrorCode {
/// The peer sent an empty response using protocol `/fuel/req_res/0.0.1`
#[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")]
ProtocolV1EmptyResponse = 0,
#[error("The requested range is too large")]
RequestedRangeTooLarge = 1,
#[error("Timeout while processing request")]
Timeout = 2,
#[error("Sync processor is out of capacity")]
SyncProcessorOutOfCapacity = 3,
#[error("The peer sent an unknown error code")]
#[serde(skip_serializing, other)]
Unknown,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -113,11 +122,22 @@ pub type OnResponseWithPeerSelection<T> =

#[derive(Debug)]
pub enum ResponseSender {
SealedHeaders(OnResponseWithPeerSelection<Option<Vec<SealedBlockHeader>>>),
Transactions(OnResponseWithPeerSelection<Option<Vec<Transactions>>>),
TransactionsFromPeer(OnResponse<Option<Vec<Transactions>>>),
TxPoolAllTransactionsIds(OnResponse<Option<Vec<TxId>>>),
TxPoolFullTransactions(OnResponse<Option<Vec<Option<NetworkableTransactionPool>>>>),
SealedHeaders(
OnResponseWithPeerSelection<
Result<Vec<SealedBlockHeader>, ResponseMessageErrorCode>,
>,
),
Transactions(
OnResponseWithPeerSelection<Result<Vec<Transactions>, ResponseMessageErrorCode>>,
),
TransactionsFromPeer(OnResponse<Result<Vec<Transactions>, ResponseMessageErrorCode>>),

TxPoolAllTransactionsIds(OnResponse<Result<Vec<TxId>, ResponseMessageErrorCode>>),
TxPoolFullTransactions(
OnResponse<
Result<Vec<Option<NetworkableTransactionPool>>, ResponseMessageErrorCode>,
>,
),
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -146,3 +166,42 @@ pub enum ResponseSendError {
#[error("Failed to convert response to intermediate format")]
ConversionToIntermediateFailed,
}

#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
use super::ResponseMessageErrorCode;

#[test]
fn response_message_error_code__unknown_error_cannot_be_serialized() {
let error = super::ResponseMessageErrorCode::Unknown;
let serialized = postcard::to_allocvec(&error);
assert!(serialized.is_err());
}

#[test]
fn response_message_error_code__known_error_code_is_deserialized_to_variant() {
let serialized_error_code =
postcard::to_stdvec(&ResponseMessageErrorCode::ProtocolV1EmptyResponse)
.unwrap();
println!("Error code: {:?}", serialized_error_code);
let response_message_error_code: ResponseMessageErrorCode =
postcard::from_bytes(&serialized_error_code).unwrap();
assert!(matches!(
response_message_error_code,
ResponseMessageErrorCode::ProtocolV1EmptyResponse
));
}

#[test]
fn response_message_error_code__unknown_error_code_is_deserialized_to_unknown_variant(
) {
let serialized_error_code = vec![42];
let response_message_error_code: ResponseMessageErrorCode =
postcard::from_bytes(&serialized_error_code).unwrap();
assert!(matches!(
response_message_error_code,
ResponseMessageErrorCode::Unknown
));
}
}
Loading

0 comments on commit 552ba75

Please sign in to comment.