Skip to content

Commit

Permalink
Add metadata to on_message callback
Browse files Browse the repository at this point in the history
  • Loading branch information
t00ts committed Dec 3, 2021
1 parent 11fddd6 commit 80f55d4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `auto_propagate` flag to Peer [#57]
- Add optional `height` parameter to `broadcast` method [#57]
- Add `send` method to public API [#58]
- Add metadata to `on_message` callback [#59]

### Changed

Expand All @@ -33,5 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

[#57]: https://github.com/dusk-network/kadcast/issues/57
[#58]: https://github.com/dusk-network/kadcast/issues/58
[#59]: https://github.com/dusk-network/kadcast/issues/59
[#60]: https://github.com/dusk-network/kadcast/issues/60
[#63]: https://github.com/dusk-network/kadcast/issues/63
10 changes: 6 additions & 4 deletions examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use clap::{App, Arg};
use kadcast::{NetworkListen, Peer};
use kadcast::{MessageInfo, NetworkListen, Peer};
use rustc_tools_util::{get_version_info, VersionInfo};
use std::io::{self, BufRead};

Expand Down Expand Up @@ -95,11 +95,13 @@ pub async fn main() {
}
pub struct DummyListener {}
impl NetworkListen for DummyListener {
fn on_message(&self, message: Vec<u8>) {
fn on_message(&self, message: Vec<u8>, md: MessageInfo) {
println!(
"Received {}",
"Received {} from {} (height: {})",
String::from_utf8(message.to_vec())
.unwrap_or_else(|_| "No UTF8 message received".to_string())
.unwrap_or_else(|_| "No UTF8 message received".to_string()),
md.src,
md.height,
);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::io::{Read, Write};
pub mod error;
mod header;
pub mod message;
pub mod metadata;
pub(crate) mod payload;

pub trait Marshallable {
Expand Down
13 changes: 13 additions & 0 deletions src/encoding/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::net::SocketAddr;

#[derive(Debug, PartialEq)]
pub struct MessageInfo {
pub src: SocketAddr,
pub height: u8,
}
21 changes: 15 additions & 6 deletions src/handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::RwLock;
use tracing::*;

use crate::encoding::message::{BroadcastPayload, Message, NodePayload};
use crate::encoding::metadata::MessageInfo;
use crate::kbucket::Tree;
use crate::peer::{PeerInfo, PeerNode};
use crate::transport::{MessageBeanIn, MessageBeanOut};
Expand All @@ -24,7 +25,7 @@ impl MessageHandler {
ktable: Arc<RwLock<Tree<PeerInfo>>>,
mut inbound_receiver: Receiver<MessageBeanIn>,
outbound_sender: Sender<MessageBeanOut>,
listener_sender: Sender<Vec<u8>>,
listener_sender: Sender<(Vec<u8>, MessageInfo)>,
auto_propagate: bool,
) {
tokio::spawn(async move {
Expand Down Expand Up @@ -107,11 +108,19 @@ impl MessageHandler {
}
Message::Broadcast(_, payload) => {
debug!("Received payload with height {:?}", payload);
listener_sender
.try_send(payload.gossip_frame.clone())
.unwrap_or_else(|op| {
error!("Unable to notify client {:?}", op)
});

// Aggregate message + metadata for lib client
let msg = payload.gossip_frame.clone();
let md = MessageInfo {
src: remote_node_addr.clone(),
height: payload.height,
};
let notif = (msg, md);

// Notify lib client
listener_sender.try_send(notif).unwrap_or_else(|op| {
error!("Unable to notify client {:?}", op)
});
if auto_propagate && payload.height > 0 {
let table_read = ktable.read().await;
debug!(
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use std::{convert::TryInto, net::SocketAddr, sync::Arc, time::Duration};

pub use encoding::metadata::MessageInfo;
use encoding::{message::Message, payload::BroadcastPayload};
use handling::MessageHandler;
use itertools::Itertools;
Expand Down Expand Up @@ -62,7 +63,7 @@ pub struct Peer {
/// [NetworkListen] is notified each time a broadcasted
/// message is received from the network
pub trait NetworkListen: Send {
fn on_message(&self, message: Vec<u8>);
fn on_message(&self, message: Vec<u8>, metadata: MessageInfo);
}

impl Peer {
Expand Down Expand Up @@ -104,11 +105,11 @@ impl Peer {
}

async fn notifier(
mut listener_channel_rx: Receiver<Vec<u8>>,
mut listener_channel_rx: Receiver<(Vec<u8>, MessageInfo)>,
listener: impl NetworkListen,
) {
while let Some(message) = listener_channel_rx.recv().await {
listener.on_message(message);
while let Some(notif) = listener_channel_rx.recv().await {
listener.on_message(notif.0, notif.1);
}
}

Expand Down

0 comments on commit 80f55d4

Please sign in to comment.