Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify rpc codec logic #6304

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
use crate::rpc::protocol::{
Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN,
};
use crate::rpc::{InboundRequest, OutboundRequest};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
Expand Down Expand Up @@ -57,13 +57,13 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
max_packet_size,
}
}
}

// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
/// Encodes RPC Responses sent to peers.
fn encode_response(
&mut self,
item: RPCCodedResponse<E>,
dst: &mut BytesMut,
) -> Result<(), RPCError> {
let bytes = match &item {
RPCCodedResponse::Success(resp) => match &resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
Expand Down Expand Up @@ -125,6 +125,21 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
}
}

// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.clear();
dst.reserve(1);
dst.put_u8(
item.as_u8()
.expect("Should never encode a stream termination"),
);
self.encode_response(item, dst)
}
}

// Decoder for inbound streams: Decodes RPC requests from peers
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
type Item = InboundRequest<E>;
Expand Down Expand Up @@ -188,6 +203,8 @@ pub struct SSZSnappyOutboundCodec<E: EthSpec> {
/// The fork name corresponding to the received context bytes.
fork_name: Option<ForkName>,
fork_context: Arc<ForkContext>,
/// Keeps track of the current response code for a chunk.
current_response_code: Option<u8>,
phantom: PhantomData<E>,
}

Expand All @@ -209,66 +226,12 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
fork_name: None,
fork_context,
phantom: PhantomData,
current_response_code: None,
}
}
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(r) => match r {
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
},
OutboundRequest::BlocksByRoot(r) => match r {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}

// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;

let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;

// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}

// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
type Item = RPCResponse<E>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// Decode an Rpc response.
fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<RPCResponse<E>>, RPCError> {
// Read the context bytes if required
if self.protocol.has_context_bytes() && self.fork_name.is_none() {
if src.len() >= CONTEXT_BYTES_LEN {
Expand Down Expand Up @@ -318,15 +281,8 @@ impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
}
}

impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type CodecErrorType = ErrorType;

fn decode_error(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Self::CodecErrorType>, RPCError> {
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<ErrorType>, RPCError> {
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
return Ok(None);
};
Expand Down Expand Up @@ -361,6 +317,95 @@ impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E>
}
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(r) => match r {
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
},
OutboundRequest::BlocksByRoot(r) => match r {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}

// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;

let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;

// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}

// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
type Item = RPCCodedResponse<E>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// if we have only received the response code, wait for more bytes
if src.len() <= 1 {
return Ok(None);
}
// using the response code determine which kind of payload needs to be decoded.
let response_code = self.current_response_code.unwrap_or_else(|| {
let resp_code = src.split_to(1)[0];
self.current_response_code = Some(resp_code);
resp_code
});

let inner_result = {
if RPCCodedResponse::<E>::is_response(response_code) {
// decode an actual response and mutates the buffer if enough bytes have been read
// returning the result.
self.decode_response(src)
.map(|r| r.map(RPCCodedResponse::Success))
} else {
// decode an error
self.decode_error(src)
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
}
};
// if the inner decoder was capable of decoding a chunk, we need to reset the current
// response code for the next chunk
if let Ok(Some(_)) = inner_result {
self.current_response_code = None;
}
// return the result
inner_result
}
}

/// Handle errors that we get from decoding an RPC message from the stream.
/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream.
/// `max_compressed_len` is the maximum compressed size for a given uncompressed size.
Expand Down Expand Up @@ -1030,7 +1075,7 @@ mod tests {
let mut snappy_inbound_codec =
SSZSnappyInboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);

snappy_inbound_codec.encode(message, &mut buf)?;
snappy_inbound_codec.encode_response(message, &mut buf)?;
Ok(buf)
}

Expand Down Expand Up @@ -1075,7 +1120,7 @@ mod tests {
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
// decode message just as snappy message
snappy_outbound_codec.decode(message)
snappy_outbound_codec.decode_response(message)
}

/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
Expand Down Expand Up @@ -1847,4 +1892,129 @@ mod tests {
RPCError::InvalidData(_)
));
}

#[test]
fn test_decode_status_message() {
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);

let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
fork_context,
);

// remove response code
let mut snappy_buf = buf.clone();
let _ = snappy_buf.split_to(1);

// decode message just as snappy message
let _snappy_decoded_message = snappy_outbound_codec
.decode_response(&mut snappy_buf)
.unwrap();

// decode message as ssz snappy chunk
let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap();
}

#[test]
fn test_invalid_length_prefix() {
let mut uvi_codec: Uvi<u128> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);

// Smallest > 10 byte varint
let len: u128 = 2u128.pow(70);

// Insert length-prefix
uvi_codec.encode(len, &mut dst).unwrap();

let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
fork_context,
);

let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err();

assert_eq!(
snappy_decoded_message,
RPCError::IoError("input bytes exceed maximum".to_string()),
"length-prefix of > 10 bytes is invalid"
);
}

#[test]
fn test_length_limits() {
fn encode_len(len: usize) -> BytesMut {
let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);
uvi_codec.encode(len, &mut dst).unwrap();
dst
}

let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);

// Response limits
let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));

let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));

// Request limits
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));

let mut min = encode_len(limit.min - 1);
let mut codec =
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
assert!(matches!(
codec.decode_response(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));
}
}
Loading
Loading