Skip to content

Commit

Permalink
feat: implement send and sendBinary over comms (#279)
Browse files Browse the repository at this point in the history
* feat: implement `send` and `sendBinary` over comms

* remove comment
  • Loading branch information
leanmendoza authored Feb 20, 2024
1 parent 8d07508 commit 361183c
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub trait Adapter {
fn clean(&mut self);

fn consume_chats(&mut self) -> Vec<(H160, rfc4::Chat)>;
fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)>;
fn change_profile(&mut self, new_profile: UserProfile);

fn send_rfc4(&mut self, packet: rfc4::Packet, unreliable: bool) -> bool;
Expand Down
22 changes: 21 additions & 1 deletion rust/decentraland-godot-lib/src/comms/adapter/livekit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct LivekitRoom {
last_profile_request_sent: Instant,
last_profile_version_announced: u32,
chats: Vec<(H160, rfc4::Chat)>,

// Scene messges
incoming_scene_messages: HashMap<String, Vec<(H160, Vec<u8>)>>,
}

impl LivekitRoom {
Expand Down Expand Up @@ -94,6 +97,7 @@ impl LivekitRoom {
peer_alias_counter: 0,
last_profile_version_announced: 0,
chats: Vec::new(),
incoming_scene_messages: HashMap::new(),
}
}

Expand Down Expand Up @@ -212,7 +216,15 @@ impl LivekitRoom {
avatar_scene.update_avatar_by_alias(peer.alias, &profile);
peer.profile = Some(profile);
}
ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(_scene)) => {}
ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(scene)) => {
let entry = self
.incoming_scene_messages
.entry(scene.scene_id)
.or_default();

// TODO: should we limit the size of the queue or accumulated bytes?
entry.push((message.address, scene.data));
}
ToSceneMessage::Rfc4(rfc4::packet::Message::Voice(_voice)) => {}
ToSceneMessage::InitVoice(frame) => {
avatar_scene.spawn_voice_channel(
Expand Down Expand Up @@ -347,6 +359,14 @@ impl Adapter for LivekitRoom {
fn support_voice_chat(&self) -> bool {
true
}

fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) {
std::mem::take(messages)
} else {
Vec::new()
}
}
}

fn spawn_livekit_task(
Expand Down
22 changes: 21 additions & 1 deletion rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct WebSocketRoom {
last_profile_response_sent: Instant,
last_profile_request_sent: Instant,
last_profile_version_announced: u32,

// Scene messges
incoming_scene_messages: HashMap<String, Vec<(H160, Vec<u8>)>>,
}

impl WebSocketRoom {
Expand Down Expand Up @@ -104,6 +107,7 @@ impl WebSocketRoom {
last_profile_request_sent: old_time,
last_try_to_connect: old_time,
last_profile_version_announced: 0,
incoming_scene_messages: HashMap::new(),
}
}

Expand Down Expand Up @@ -483,7 +487,15 @@ impl WebSocketRoom {
.unwrap()
.profile = Some(profile);
}
rfc4::packet::Message::Scene(_scene) => {}
rfc4::packet::Message::Scene(scene) => {
let entry = self
.incoming_scene_messages
.entry(scene.scene_id)
.or_default();

// TODO: should we limit the size of the queue or accumulated bytes?
entry.push((peer.address, scene.data));
}
rfc4::packet::Message::Voice(_voice) => {}
}
}
Expand Down Expand Up @@ -595,4 +607,12 @@ impl Adapter for WebSocketRoom {
fn support_voice_chat(&self) -> bool {
false
}

fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) {
std::mem::take(messages)
} else {
Vec::new()
}
}
}
35 changes: 34 additions & 1 deletion rust/decentraland-godot-lib/src/comms/communication_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,40 @@ impl INode for CommunicationManager {
}
}

impl CommunicationManager {}
impl CommunicationManager {
pub fn send_scene_message(&mut self, scene_id: String, data: Vec<u8>) {
let scene_message = rfc4::Packet {
message: Some(rfc4::packet::Message::Scene(rfc4::Scene { scene_id, data })),
};
match &mut self.current_connection {
CommsConnection::Connected(adapter) => {
adapter.send_rfc4(scene_message, true);
}
#[cfg(feature = "use_livekit")]
CommsConnection::Archipelago(archipelago) => {
if let Some(adapter) = archipelago.adapter() {
adapter.send_rfc4(scene_message, true);
}
}
_ => {}
}
}

pub fn get_pending_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
match &mut self.current_connection {
CommsConnection::Connected(adapter) => adapter.consume_scene_messages(scene_id),
#[cfg(feature = "use_livekit")]
CommsConnection::Archipelago(archipelago) => {
if let Some(adapter) = archipelago.adapter() {
adapter.consume_scene_messages(scene_id)
} else {
vec![]
}
}
_ => vec![],
}
}
}

#[godot_api]
impl CommunicationManager {
Expand Down
104 changes: 104 additions & 0 deletions rust/decentraland-godot-lib/src/dcl/js/comms.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{cell::RefCell, rc::Rc};

use deno_core::{op, JsBuffer, Op, OpDecl, OpState};
use ethers::types::H160;

use crate::dcl::scene_apis::RpcCall;

#[derive(Default)]
pub(crate) struct InternalPendingBinaryMessages {
pub messages: Vec<(H160, Vec<u8>)>,
}

// list of op declarations
pub fn ops() -> Vec<OpDecl> {
vec![op_comms_send_string::DECL, op_comms_send_binary::DECL]
}

pub(crate) const COMMS_MSG_TYPE_STRING: u8 = 1;
pub(crate) const COMMS_MSG_TYPE_BINARY: u8 = 2;

#[op]
async fn op_comms_send_string(
state: Rc<RefCell<OpState>>,
message: String,
) -> Result<(), anyhow::Error> {
let mut message = message.into_bytes();
message.insert(0, COMMS_MSG_TYPE_STRING);
comms_send(state, vec![message]).await?;
Ok(())
}

#[op]
async fn op_comms_send_binary(
state: Rc<RefCell<OpState>>,
messages: Vec<JsBuffer>,
) -> Result<Vec<Vec<u8>>, anyhow::Error> {
let messages = messages
.iter()
.map(|m| {
let mut m = m.as_ref().to_vec();
m.insert(0, COMMS_MSG_TYPE_BINARY);
m
})
.collect();

comms_send(state.clone(), messages).await?;

// Get pending Binary messages
if let Some(pending_messages) = state
.borrow_mut()
.try_take::<InternalPendingBinaryMessages>()
{
let messages = pending_messages
.messages
.into_iter()
.map(|(sender_address, mut data)| {
let sender_address_str = format!("{:#x}", sender_address);
let sender_address_str_bytes = sender_address_str.as_bytes();

// Remove the comms message type(-1 byte), add the sender address size (+1 byte)
// and add the address in bytes (for H160=20 to string 2+40)
let sender_len = sender_address_str_bytes.len();
let original_data_len = data.len();
let new_data_size = original_data_len + 1 + sender_len - 1;

// Resize to fit the sender address
data.resize(new_data_size, 0);

// Shift the data to the right to fit the sender address
data.copy_within(1..original_data_len, sender_len + 1);

// Add the sender address size at the beginning of the data
data[0] = sender_len as u8;

// Add the sender address at the beginning of the data
data[1..=sender_len].copy_from_slice(sender_address_str_bytes);

data
})
.collect();
Ok(messages)
} else {
Ok(vec![])
}
}

async fn comms_send(
state: Rc<RefCell<OpState>>,
message: Vec<Vec<u8>>,
) -> Result<(), anyhow::Error> {
let (sx, rx) = tokio::sync::oneshot::channel::<Result<(), String>>();

state
.borrow_mut()
.borrow_mut::<Vec<RpcCall>>()
.push(RpcCall::SendCommsMessage {
body: message,
response: sx.into(),
});

rx.await
.map_err(|e| anyhow::anyhow!(e))?
.map_err(anyhow::Error::msg)
}
45 changes: 35 additions & 10 deletions rust/decentraland-godot-lib/src/dcl/js/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::dcl::{
};

use super::{
comms::{InternalPendingBinaryMessages, COMMS_MSG_TYPE_BINARY},
events::process_events,
players::{get_player_data, get_players},
};
Expand Down Expand Up @@ -69,13 +70,13 @@ fn op_crdt_send_to_renderer(op_state: Rc<RefCell<OpState>>, messages: &[u8]) {
let sender = op_state.borrow_mut::<std::sync::mpsc::SyncSender<SceneResponse>>();

sender
.send(SceneResponse::Ok(
.send(SceneResponse::Ok {
scene_id,
dirty,
logs.0,
elapsed_time,
dirty_crdt_state: dirty,
logs: logs.0,
delta: elapsed_time,
rpc_calls,
))
})
.expect("error sending scene response!!")
}

Expand Down Expand Up @@ -105,11 +106,14 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
let scene_crdt_state = cloned_scene_crdt.lock().unwrap();

let data = match response {
Some(RendererResponse::Ok(dirty)) => {
Some(RendererResponse::Ok {
dirty_crdt_state,
incoming_comms_message,
}) => {
let mut data_buf = Vec::new();
let mut data_writter = DclWriter::new(&mut data_buf);

for (component_id, entities) in dirty.lww.iter() {
for (component_id, entities) in dirty_crdt_state.lww.iter() {
for entity_id in entities {
if let Err(err) = put_or_delete_lww_component(
&scene_crdt_state,
Expand All @@ -122,7 +126,7 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
}
}

for (component_id, entities) in dirty.gos.iter() {
for (component_id, entities) in dirty_crdt_state.gos.iter() {
for (entity_id, element_count) in entities {
if let Err(err) = append_gos_component(
&scene_crdt_state,
Expand All @@ -136,12 +140,33 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
}
}

for entity_id in dirty.entities.died.iter() {
for entity_id in dirty_crdt_state.entities.died.iter() {
delete_entity(entity_id, &mut data_writter);
}

let (comms_binary, comms_string): (_, Vec<_>) = incoming_comms_message
.into_iter()
.filter(|v| !v.1.is_empty())
.partition(|v| v.1[0] == COMMS_MSG_TYPE_BINARY);

if !comms_binary.is_empty() {
let mut internal_pending_binary_messages = op_state
.try_take::<InternalPendingBinaryMessages>()
.unwrap_or_default();

internal_pending_binary_messages
.messages
.extend(comms_binary.into_iter());
op_state.put(internal_pending_binary_messages);
}

process_local_api_calls(local_api_calls, &scene_crdt_state);
process_events(&mut op_state, &scene_crdt_state, &dirty);
process_events(
&mut op_state,
&scene_crdt_state,
&dirty_crdt_state,
comms_string,
);

data_buf
}
Expand Down
Loading

0 comments on commit 361183c

Please sign in to comment.