Skip to content

Commit

Permalink
Merge pull request #149 from dusk-network/ray-id
Browse files Browse the repository at this point in the history
Add Ray-ID for message tracking
  • Loading branch information
herr-seppia authored Sep 28, 2024
2 parents 6534e9a + 0c9a476 commit 922088b
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 79 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `max_udp_len` configuration parameter
- Add range checks to MTU (between 1296 and 8192)
- Add network version to handshake messages
- Add Ray-ID to MessageInfo for message tracking

### Fixed

Expand All @@ -20,7 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Change the EncodedChunk UUID generation
- Change the EncodedChunk UUID generation (aka RaptorqHeader)
- Change `raptorq` dependency from `1.6` to `2.0`

## [0.6.1] - 2024-04-10
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kadcast"
authors = ["herr-seppia <seppia@dusk.network>"]
version = "0.7.0-rc.6"
version = "0.7.0-rc.7"
edition = "2018"
description = "Implementation of the Kadcast Network Protocol."
categories = ["network-programming"]
Expand All @@ -26,6 +26,7 @@ serde_derive = "1"
serde = "1"
humantime-serde = "1"
semver = "1"
hex = "0.4"

[dev-dependencies]
clap = "2.33.3"
Expand Down
2 changes: 1 addition & 1 deletion src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ mod tests {
#[test]
fn test_encode_broadcast() -> Result<()> {
let peer = PeerNode::generate("192.168.0.1:666", 0)?;
let a = Message::Broadcast(
let a = Message::broadcast(
peer.to_header(),
BroadcastPayload {
height: 10,
Expand Down
10 changes: 7 additions & 3 deletions src/encoding/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ pub(crate) enum Message {
Pong(Header, Version),
FindNodes(Header, Version, BinaryKey),
Nodes(Header, Version, NodePayload), //should we pass node[] as ref?
Broadcast(Header, BroadcastPayload),
Broadcast(Header, BroadcastPayload, [u8; 32]),
}

impl Message {
pub fn broadcast(header: Header, payload: BroadcastPayload) -> Self {
Self::Broadcast(header, payload, [0; 32])
}

pub(crate) fn type_byte(&self) -> u8 {
match self {
Message::Ping(..) => ID_MSG_PING,
Expand Down Expand Up @@ -120,7 +124,7 @@ impl Marshallable for Message {
version.marshal_binary(writer)?;
node_payload.marshal_binary(writer)?;
}
Message::Broadcast(_, broadcast_payload) => {
Message::Broadcast(_, broadcast_payload, ..) => {
broadcast_payload.marshal_binary(writer)?;
}
};
Expand Down Expand Up @@ -153,7 +157,7 @@ impl Marshallable for Message {
}
ID_MSG_BROADCAST => {
let payload = BroadcastPayload::unmarshal_binary(reader)?;
Ok(Message::Broadcast(header, payload))
Ok(Message::broadcast(header, payload))
}
unknown => Err(Error::new(
ErrorKind::Other,
Expand Down
32 changes: 22 additions & 10 deletions src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{RwLock, K_K};
pub struct MessageInfo {
pub(crate) src: SocketAddr,
pub(crate) height: u8,
pub(crate) ray_id: [u8; 32],
}

impl MessageInfo {
Expand All @@ -35,6 +36,10 @@ impl MessageInfo {
pub fn height(&self) -> u8 {
self.height
}
/// Returns the ray-id for this message (if any)
pub fn ray_id(&self) -> &[u8] {
&self.ray_id
}
}

pub(crate) struct MessageHandler {
Expand Down Expand Up @@ -103,8 +108,7 @@ impl MessageHandler {
while let Some((message, mut remote_peer_addr)) =
inbound_receiver.recv().await
{
debug!("Handler received message");
trace!("Handler received message {:?}", message);
trace!("Handler received message {}", message.type_byte());
remote_peer_addr.set_port(message.header().sender_port);

let header = message.header();
Expand All @@ -122,7 +126,7 @@ impl MessageHandler {
match handler.handle_peer(remote_peer, &message).await {
Ok(_) => {}
Err(NodeInsertError::Full(n)) => {
debug!(
trace!(
"Unable to insert node - FULL {}",
n.value().address()
)
Expand Down Expand Up @@ -224,8 +228,9 @@ impl MessageHandler {
self.handle_find_nodes(remote_node_addr, &target).await
}
Message::Nodes(_, _, nodes) => self.handle_nodes(nodes).await,
Message::Broadcast(_, payload) => {
self.handle_broadcast(remote_node_addr, payload).await
Message::Broadcast(_, payload, ray_id) => {
self.handle_broadcast(remote_node_addr, payload, ray_id)
.await
}
}
}
Expand Down Expand Up @@ -307,17 +312,24 @@ impl MessageHandler {
&self,
src: SocketAddr,
payload: BroadcastPayload,
ray_id: [u8; 32],
) {
let height = payload.height;
let gossip_frame = payload.gossip_frame;
debug!(
"Received payload with height {height} and len {}",
gossip_frame.len()
event = "handle broadcast",
height,
size = gossip_frame.len(),
ray = hex::encode(ray_id)
);

// Aggregate message + metadata for lib client
let msg = gossip_frame.clone();
let md = MessageInfo { src, height };
let md = MessageInfo {
src,
height,
ray_id,
};

// Notify lib client
self.listener_sender
Expand All @@ -327,7 +339,7 @@ impl MessageHandler {

if self.auto_propagate && height > 0 {
let new_height = height - 1;
debug!("Extracting for height {new_height}");
trace!("Extracting for height {new_height}");

let messages: Vec<_> = {
let table_read = self.ktable.read().await;
Expand All @@ -341,7 +353,7 @@ impl MessageHandler {
height,
gossip_frame,
};
let msg = Message::Broadcast(self.my_header, payload);
let msg = Message::broadcast(self.my_header, payload);
let targets =
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ impl Peer {
.await
.extract(height)
.map(|(height, nodes)| {
let msg = Message::Broadcast(
let msg = Message::broadcast(
self.header,
BroadcastPayload {
height,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
gossip_frame: message.to_vec(),
},
);
let targets =
Expand Down Expand Up @@ -244,7 +244,7 @@ impl Peer {
}
// We use the Broadcast message type while setting height to 0
// to prevent further propagation at the receiver
let msg = Message::Broadcast(
let msg = Message::broadcast(
self.header,
BroadcastPayload {
height: 0,
Expand Down
6 changes: 3 additions & 3 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use socket2::SockRef;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use crate::config::Config;
use crate::encoding::message::Message;
Expand Down Expand Up @@ -117,7 +117,7 @@ impl WireNetwork {
if let Some((data, src)) = dec_chan_rx.recv().await {
match Message::unmarshal_binary(&mut &data[..]) {
Ok(deser) => {
debug!("> Received raw message {}", deser.type_byte());
trace!("> Received raw message {}", deser.type_byte());
Self::handle_raw_message(
&mut decoder,
deser,
Expand Down Expand Up @@ -164,7 +164,7 @@ impl WireNetwork {
debug!("WireNetwork::outgoing loop started");
loop {
if let Some((message, targets)) = out_channel_rx.recv().await {
debug!(
trace!(
"< Message to send to ({targets:?}) - {:?} ",
message.type_byte()
);
Expand Down
18 changes: 9 additions & 9 deletions src/transport/encoding/raptorq.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@

- The encoded `ChunkedPayload` is constructed from a `BroadcastPayload`.
- The encoding of a `ChunkedPayload` consists of the following components:
- `UID` (Unique Identifier): A 32-byte hash of the broadcast payload, excluding the height field.
- `RAY_ID` (Unique Identifier): A 32-byte hash of the broadcast payload, excluding the height field.
- `ObjectTransmissionInformation` (RaptorQ header): A 12-byte header specific to the RaptorQ encoding scheme.
- `Encoded Chunk`: The chunked and encoded data using RaptorQ.

| Field | Length (bytes) | Description |
|-------------------|----------------|-----------------------------------|
| UID (Blake2s256) | 32 | Unique identifier for the chunked payload. |
| Transmission Info | 12 | Object Transmission Information (RaptorQ header). |
| Encoded Chunk | Variable | The RaptorQ encoded chunk of the payload. |
| Field | Length (bytes) | Description |
|---------------------|----------------|-----------------------------------|
| RAY_ID (Blake2s256) | 32 | Unique identifier for the chunked payload. |
| Transmission Info | 12 | Object Transmission Information (RaptorQ header). |
| Encoded Chunk | Variable | The RaptorQ encoded chunk of the payload. |

**Decoding**:

- When a `BroadcastPayload` is transformed into a `ChunkedPayload`, it checks if the payload length is at least `MIN_CHUNKED_SIZE`, which is the minimum required length to consider it a valid `ChunkedPayload`. If not, an error is raised.

- The `ChunkedPayload` holds the following components:
- `UID`: The unique identifier for the chunk, extracted from the first 32 bytes of the `gossip_frame`.
- `RAY_ID`: The unique identifier for the chunk, extracted from the first 32 bytes of the `gossip_frame`.
- `ObjectTransmissionInformation`: The 12-byte RaptorQ header, parsed from the `gossip_frame` bytes.
- `Encoded Chunk`: The remaining bytes after UID and RaptorQ header, containing the encoded chunk.
- `Encoded Chunk`: The remaining bytes after RAY_ID and RaptorQ header, containing the encoded chunk.

- `ChunkedPayload` is used in a cache to manage the decoding process for broadcast messages. It tracks the state of a broadcast message's chunk as either receiving or processed.

- The cache stores the `UID` of the broadcast message as the key and the `CacheStatus` as the value, which tracks the state.
- The cache stores the `RAY_ID`+`ObjectTransmissionInformation` (aka `ChunkedHeader`) of the broadcast message as the key and the `CacheStatus` as the value, which tracks the state.

- The `CacheStatus` can be in two states:
1. **Receiving**: In this state, a RaptorQ decoder is initialized with the `ObjectTransmissionInformation`. The decoder processes incoming encoded chunks and attempts to decode them. If a chunk is successfully decoded, the message is checked for integrity, and if it's valid, it's stored as a fully processed message. If not, it's discarded.
Expand Down
39 changes: 22 additions & 17 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ struct ChunkedPayload<'a>(&'a BroadcastPayload);
// ObjectTransmissionInformation Size (Raptorq header)
const TRANSMISSION_INFO_SIZE: usize = 12;

// UID Size (Blake2s256)
const UID_SIZE: usize = 32;
// RAY_ID Size (Blake2s256)
const RAY_ID_SIZE: usize = 32;

// CHUNKED_HEADER_SIZE Size
const CHUNKED_HEADER_SIZE: usize = RAY_ID_SIZE + TRANSMISSION_INFO_SIZE;

// EncodingPacket min size (RaptorQ packet)
const MIN_ENCODING_PACKET_SIZE: usize = 5;

const MIN_CHUNKED_SIZE: usize =
UID_SIZE + TRANSMISSION_INFO_SIZE + MIN_ENCODING_PACKET_SIZE;
const MIN_CHUNKED_SIZE: usize = CHUNKED_HEADER_SIZE + MIN_ENCODING_PACKET_SIZE;

impl<'a> TryFrom<&'a BroadcastPayload> for ChunkedPayload<'a> {
type Error = io::Error;
Expand All @@ -53,40 +55,43 @@ impl BroadcastPayload {
self.marshal_binary(&mut bytes)?;
Ok(bytes)
}
fn generate_uid(&self) -> io::Result<[u8; UID_SIZE]> {
fn generate_ray_id(&self) -> io::Result<[u8; RAY_ID_SIZE]> {
let mut hasher = Blake2s256::new();
// Remove the kadcast `height` field from the hash
hasher.update(&self.bytes()?[1..]);
Ok(hasher.finalize().into())
}
}
impl<'a> ChunkedPayload<'a> {
fn uid(&self) -> &[u8] {
&self.0.gossip_frame[0..UID_SIZE]
fn ray_id(&self) -> &[u8] {
&self.0.gossip_frame[0..RAY_ID_SIZE]
}

fn transmission_info(
&self,
max_udp_len: u64,
) -> Result<SafeObjectTransmissionInformation, TransmissionInformationError>
{
let slice =
&self.0.gossip_frame[UID_SIZE..(UID_SIZE + TRANSMISSION_INFO_SIZE)];
let slice = self.transmission_info_bytes();
let info = SafeObjectTransmissionInformation::try_from(slice)?;
match info.inner.transfer_length() < max_udp_len {
true => Ok(info),
false => Err(TransmissionInformationError::TransferLengthExceeded),
}
}

fn transmission_info_bytes(&self) -> &[u8] {
&self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
}

fn encoded_chunk(&self) -> &[u8] {
&self.0.gossip_frame[(UID_SIZE + TRANSMISSION_INFO_SIZE)..]
&self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..]
}

fn uid_with_info(&self) -> [u8; UID_SIZE + TRANSMISSION_INFO_SIZE] {
let uid = &self.0.gossip_frame[0..UID_SIZE + TRANSMISSION_INFO_SIZE];
fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] {
let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE];

// Why do we need transmission info?
// Why do we need transmission info included into the header?
//
// Transmission info should be sent over a reliable channel, because
// it is critical to decode packets.
Expand All @@ -99,7 +104,7 @@ impl<'a> ChunkedPayload<'a> {
// Since the correctness of an UDP packet is already guaranteed by OS
// checksum checks, Hashing has been removed in order to increase the
// decoding performance.
uid.try_into().expect("slice to be length 44")
header.try_into().expect("slice to be length 44")
}
}

Expand Down Expand Up @@ -135,7 +140,7 @@ mod tests {
gossip_frame: data,
};
println!("orig payload len {}", payload.bytes()?.len());
let message = Message::Broadcast(header, payload);
let message = Message::broadcast(header, payload);
let message_bytes = message.bytes()?;
println!("orig message len {}", message_bytes.len());
let start = Instant::now();
Expand Down Expand Up @@ -192,7 +197,7 @@ mod tests {
gossip_frame: data,
};
println!("orig payload len {}", payload.bytes()?.len());
let message = Message::Broadcast(header, payload);
let message = Message::broadcast(header, payload);
let message_bytes = message.bytes()?;
println!("orig message len {}", message_bytes.len());
let start = Instant::now();
Expand All @@ -213,7 +218,7 @@ mod tests {
for _ in 0..DATA_LEN {
gossip_frame.push(rand::Rng::gen(&mut rand::thread_rng()));
}
let msg = Message::Broadcast(
let msg = Message::broadcast(
header,
BroadcastPayload {
height: 255,
Expand Down
Loading

0 comments on commit 922088b

Please sign in to comment.