Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add waku-chat example #1

Merged
merged 6 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
969 changes: 967 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
members = [
"overwatch",
"overwatch-derive",
"examples/waku-chat"
]

[profile.release-opt]
Expand Down
19 changes: 19 additions & 0 deletions examples/waku-chat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "waku-chat"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["rt"] }
waku = { git = "https://github.com/waku-org/waku-rust-bindings" }
serde = "1"
bincode = "1"
overwatch = { path = "../../overwatch" }
overwatch-derive = { path = "../../overwatch-derive" }
tracing = "*"
async-trait = "0.1"
tracing-subscriber = "0.3"
clap = { version = "4.0.18", features = ["derive"] }
rand = "0.8"
90 changes: 90 additions & 0 deletions examples/waku-chat/src/chat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::network::*;
use async_trait::async_trait;
use overwatch::services::handle::ServiceStateHandle;
use overwatch::services::relay::{NoMessage, Relay};
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::channel;

/// Chat service handler
/// displays received messages, send new ones
pub struct ChatService {
service_state: ServiceStateHandle<Self>,
}

#[derive(Deserialize, Serialize)]
struct Message {
user: usize,
msg: Box<[u8]>,
}

impl ServiceData for ChatService {
const SERVICE_ID: ServiceId = "Chat";
type Settings = usize;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}

#[async_trait]
impl ServiceCore for ChatService {
fn init(service_state: ServiceStateHandle<Self>) -> Self {
Self { service_state }
}

async fn run(self) {
let Self {
mut service_state, ..
} = self;
// TODO: waku should not end up in the public interface of the network service, at least not as a type
let mut network_relay: Relay<NetworkService<waku::Waku>> =
service_state.overwatch_handle.relay();
let user = service_state.settings_reader.get_updated_settings();
network_relay.connect().await.unwrap();
let (sender, mut receiver) = channel(1);
// TODO: typestate so I can't call send if it's not connected
network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.unwrap();

// send new messages
// for interactive stdin I/O it's recommended to
// use an external thread, see https://docs.rs/tokio/latest/tokio/io/struct.Stdin.html
std::thread::spawn(move || loop {
let mut input = String::new();
std::io::stdin()
.read_line(&mut input)
.expect("error reading message");
input.truncate(input.trim().len());
network_relay
.blocking_send(NetworkMsg::Broadcast(
bincode::serialize(&Message {
user,
msg: input.as_bytes().to_vec().into_boxed_slice(),
})
.unwrap()
.into_boxed_slice(),
))
.unwrap();
tracing::debug!("[sending]: {}...", input);
});

// print received messages
while let Some(NetworkEvent::RawMessage(message)) = receiver.recv().await {
if let Ok(msg) = bincode::deserialize::<Message>(&message) {
if msg.user != user {
println!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe using tokio::io::stdout instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the additional complexity is worth it since we can expect println! to be executed reasonably fast.
tokio::io:;stdout may also result in interleaved output.

Blocking is also the default behavior for tracing-subscriber for the reasons outlined here, so I think it makes sense to follow the same approach.
This is also an interesting read.

"[received][{}]: {}",
msg.user,
String::from_utf8_lossy(&msg.msg)
);
}
}
}
}
}
51 changes: 51 additions & 0 deletions examples/waku-chat/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#![allow(dead_code)]
#![allow(unused)]
// public chat service
// messages are disseminated through waku,
// no consensus, no blocks
mod network;
// TODO: different chat rooms with different contentTopicId
mod chat;

use chat::*;
use clap::Parser;
use network::*;
use overwatch::{overwatch::*, services::handle::ServiceHandle};
use overwatch_derive::*;

/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Multiaddrs of other nodes participating in the protocol
#[arg(short, long)]
peers: Vec<String>,

/// Listening port
port: u16,
}

#[derive(Services)]
struct Services {
chat: ServiceHandle<ChatService>,
network: ServiceHandle<NetworkService<network::waku::Waku>>,
}

#[cfg(target_os = "linux")]
fn main() {
tracing_subscriber::fmt::init();
let Args { peers, port } = Args::parse();
let app = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
chat: rand::random(),
network: NetworkConfig { peers, port },
},
None,
);
app.wait_finished();
}

#[cfg(not(target_os = "linux"))]
fn main() {
println!("waku is only supported on linux");
}
81 changes: 81 additions & 0 deletions examples/waku-chat/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
pub mod waku;
use async_trait::async_trait;
use overwatch::services::handle::ServiceStateHandle;
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use std::fmt::Debug;
use tokio::sync::mpsc::Sender;

#[derive(Debug)]
pub enum NetworkMsg {
Broadcast(Box<[u8]>),
Subscribe {
kind: EventKind,
sender: Sender<NetworkEvent>,
},
}

impl RelayMessage for NetworkMsg {}

#[derive(Debug)]
pub enum EventKind {
Message,
}

#[derive(Debug)]
pub enum NetworkEvent {
RawMessage(Box<[u8]>),
}

#[derive(Clone, Debug)]
pub struct NetworkConfig {
pub port: u16,
pub peers: Vec<String>,
}

pub struct NetworkService<I: NetworkBackend + Send + 'static> {
implem: I,
service_state: ServiceStateHandle<Self>,
}

impl<I: NetworkBackend + Send + 'static> ServiceData for NetworkService<I> {
const SERVICE_ID: ServiceId = "Network";
type Settings = NetworkConfig;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NetworkMsg;
}

#[async_trait]
impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
implem: <I as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings(),
),
service_state,
}
}

async fn run(mut self) {
let Self {
service_state,
mut implem,
} = self;
let mut relay = service_state.inbound_relay;

while let Some(msg) = relay.recv().await {
match msg {
NetworkMsg::Broadcast(msg) => implem.broadcast(msg),
NetworkMsg::Subscribe { kind: _, sender } => implem.subscribe(sender),
}
}
}
}

pub trait NetworkBackend {
fn new(config: NetworkConfig) -> Self;
fn broadcast(&self, msg: Box<[u8]>);
fn subscribe(&mut self, sender: Sender<NetworkEvent>);
}
76 changes: 76 additions & 0 deletions examples/waku-chat/src/network/waku.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::*;
use ::waku::*;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
use tokio::sync::mpsc::Sender;

pub struct Waku {
waku: WakuNodeHandle<Running>,
subscribers: Arc<RwLock<Vec<Sender<NetworkEvent>>>>,
}

impl NetworkBackend for Waku {
fn new(config: NetworkConfig) -> Self {
let waku_config = WakuNodeConfig {
port: Some(config.port.into()),
..Default::default()
};
let waku = waku_new(Some(waku_config)).unwrap().start().unwrap();
for peer in config.peers {
let addr = Multiaddr::from_str(&peer).unwrap();
let peer_id = waku.add_peer(&addr, waku::ProtocolId::Relay).unwrap();
waku.connect_peer_with_id(peer_id, None).unwrap();
}
waku.relay_subscribe(None).unwrap();
assert!(waku.relay_enough_peers(None).unwrap());
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
Self {
waku,
subscribers: Arc::new(RwLock::new(Vec::new())),
}
}

fn subscribe(&mut self, sender: Sender<NetworkEvent>) {
self.subscribers.write().unwrap().push(sender);
tracing::debug!("someone subscribed");
let subscribers = Arc::clone(&self.subscribers);
waku_set_event_callback(move |sig| {
match sig.event() {
Event::WakuMessage(ref message_event) => {
tracing::debug!("received message event");
// we can probably avoid sending a copy to each subscriber and just borrow / clone on demand
for s in subscribers.read().unwrap().iter() {
s.try_send(NetworkEvent::RawMessage(
message_event
.waku_message()
.payload()
.to_vec()
.into_boxed_slice(),
))
.unwrap()
}
}
_ => tracing::debug!("unsupported event"),
}
});
}

fn broadcast(&self, msg: Box<[u8]>) {
let content_topic = WakuContentTopic::from_str("/waku/2/default-waku/proto").unwrap();
let message = WakuMessage::new(
msg,
content_topic,
1,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as usize,
);
let msg_id = self
.waku
.relay_publish_message(&message, None, None)
.unwrap();
tracing::debug!("sent msg {:?} with id {}", message.payload(), msg_id);
}
}
12 changes: 12 additions & 0 deletions overwatch/src/services/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ impl<S: ServiceCore> Relay<S> {
}
}

#[instrument(skip_all, err(Debug))]
pub fn blocking_send(&mut self, message: S::Message) -> Result<(), RelayError> {
if let RelayState::Connected(outbound_relay) = &mut self.state {
outbound_relay
.sender
.blocking_send(message)
.map_err(|_| RelayError::Send)
} else {
Err(RelayError::Disconnected)
}
}

async fn request_relay(&mut self, reply: oneshot::Sender<RelayResult>) {
let relay_command = OverwatchCommand::Relay(RelayCommand {
service_id: S::SERVICE_ID,
Expand Down