From 6e331f824c11f4b1b88772a70f53d2971f990647 Mon Sep 17 00:00:00 2001 From: Lean Mendoza Date: Mon, 19 Feb 2024 16:39:30 -0300 Subject: [PATCH 1/2] feat: implement `send` and `sendBinary` over comms --- .../src/comms/adapter/adapter_trait.rs | 1 + .../src/comms/adapter/livekit.rs | 22 +++- .../src/comms/adapter/ws_room.rs | 22 +++- .../src/comms/communication_manager.rs | 35 +++++- .../src/dcl/js/comms.rs | 106 ++++++++++++++++++ .../src/dcl/js/engine.rs | 45 ++++++-- .../src/dcl/js/events.rs | 60 ++++++++-- .../js/js_modules/CommunicationsController.js | 13 ++- rust/decentraland-godot-lib/src/dcl/js/mod.rs | 36 +++--- rust/decentraland-godot-lib/src/dcl/mod.rs | 20 ++-- .../src/dcl/scene_apis.rs | 4 + .../src/scene_runner/rpc_calls/mod.rs | 9 ++ .../src/scene_runner/scene_manager.rs | 8 +- .../src/scene_runner/update_scene.rs | 13 ++- 14 files changed, 344 insertions(+), 50 deletions(-) create mode 100644 rust/decentraland-godot-lib/src/dcl/js/comms.rs diff --git a/rust/decentraland-godot-lib/src/comms/adapter/adapter_trait.rs b/rust/decentraland-godot-lib/src/comms/adapter/adapter_trait.rs index 1a0372e0..a94b92b4 100644 --- a/rust/decentraland-godot-lib/src/comms/adapter/adapter_trait.rs +++ b/rust/decentraland-godot-lib/src/comms/adapter/adapter_trait.rs @@ -7,6 +7,7 @@ pub trait Adapter { fn clean(&mut self); fn consume_chats(&mut self) -> Vec<(H160, rfc4::Chat)>; + fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec)>; fn change_profile(&mut self, new_profile: UserProfile); fn send_rfc4(&mut self, packet: rfc4::Packet, unreliable: bool) -> bool; diff --git a/rust/decentraland-godot-lib/src/comms/adapter/livekit.rs b/rust/decentraland-godot-lib/src/comms/adapter/livekit.rs index 32138edc..edfc089e 100644 --- a/rust/decentraland-godot-lib/src/comms/adapter/livekit.rs +++ b/rust/decentraland-godot-lib/src/comms/adapter/livekit.rs @@ -60,6 +60,9 @@ pub struct LivekitRoom { last_profile_request_sent: Instant, last_profile_version_announced: u32, chats: Vec<(H160, rfc4::Chat)>, + + // Scene messges + incoming_scene_messages: HashMap)>>, } impl LivekitRoom { @@ -94,6 +97,7 @@ impl LivekitRoom { peer_alias_counter: 0, last_profile_version_announced: 0, chats: Vec::new(), + incoming_scene_messages: HashMap::new(), } } @@ -212,7 +216,15 @@ impl LivekitRoom { avatar_scene.update_avatar_by_alias(peer.alias, &profile); peer.profile = Some(profile); } - ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(_scene)) => {} + ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(scene)) => { + let entry = self + .incoming_scene_messages + .entry(scene.scene_id) + .or_default(); + + // TODO: should we limit the size of the queue or accumulated bytes? + entry.push((message.address, scene.data)); + } ToSceneMessage::Rfc4(rfc4::packet::Message::Voice(_voice)) => {} ToSceneMessage::InitVoice(frame) => { avatar_scene.spawn_voice_channel( @@ -347,6 +359,14 @@ impl Adapter for LivekitRoom { fn support_voice_chat(&self) -> bool { true } + + fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec)> { + if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) { + std::mem::take(messages) + } else { + Vec::new() + } + } } fn spawn_livekit_task( diff --git a/rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs b/rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs index d4eca3cd..72045fff 100644 --- a/rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs +++ b/rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs @@ -66,6 +66,9 @@ pub struct WebSocketRoom { last_profile_response_sent: Instant, last_profile_request_sent: Instant, last_profile_version_announced: u32, + + // Scene messges + incoming_scene_messages: HashMap)>>, } impl WebSocketRoom { @@ -104,6 +107,7 @@ impl WebSocketRoom { last_profile_request_sent: old_time, last_try_to_connect: old_time, last_profile_version_announced: 0, + incoming_scene_messages: HashMap::new(), } } @@ -483,7 +487,15 @@ impl WebSocketRoom { .unwrap() .profile = Some(profile); } - rfc4::packet::Message::Scene(_scene) => {} + rfc4::packet::Message::Scene(scene) => { + let entry = self + .incoming_scene_messages + .entry(scene.scene_id) + .or_default(); + + // TODO: should we limit the size of the queue or accumulated bytes? + entry.push((peer.address, scene.data)); + } rfc4::packet::Message::Voice(_voice) => {} } } @@ -595,4 +607,12 @@ impl Adapter for WebSocketRoom { fn support_voice_chat(&self) -> bool { false } + + fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec)> { + if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) { + std::mem::take(messages) + } else { + Vec::new() + } + } } diff --git a/rust/decentraland-godot-lib/src/comms/communication_manager.rs b/rust/decentraland-godot-lib/src/comms/communication_manager.rs index 092c9807..423f6522 100644 --- a/rust/decentraland-godot-lib/src/comms/communication_manager.rs +++ b/rust/decentraland-godot-lib/src/comms/communication_manager.rs @@ -113,7 +113,40 @@ impl INode for CommunicationManager { } } -impl CommunicationManager {} +impl CommunicationManager { + pub fn send_scene_message(&mut self, scene_id: String, data: Vec) { + let scene_message = rfc4::Packet { + message: Some(rfc4::packet::Message::Scene(rfc4::Scene { scene_id, data })), + }; + match &mut self.current_connection { + CommsConnection::Connected(adapter) => { + adapter.send_rfc4(scene_message, true); + } + #[cfg(feature = "use_livekit")] + CommsConnection::Archipelago(archipelago) => { + if let Some(adapter) = archipelago.adapter() { + adapter.send_rfc4(scene_message, true); + } + } + _ => {} + } + } + + pub fn get_pending_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec)> { + match &mut self.current_connection { + CommsConnection::Connected(adapter) => adapter.consume_scene_messages(scene_id), + #[cfg(feature = "use_livekit")] + CommsConnection::Archipelago(archipelago) => { + if let Some(adapter) = archipelago.adapter() { + adapter.consume_scene_messages(scene_id) + } else { + vec![] + } + } + _ => vec![], + } + } +} #[godot_api] impl CommunicationManager { diff --git a/rust/decentraland-godot-lib/src/dcl/js/comms.rs b/rust/decentraland-godot-lib/src/dcl/js/comms.rs new file mode 100644 index 00000000..8c6d261b --- /dev/null +++ b/rust/decentraland-godot-lib/src/dcl/js/comms.rs @@ -0,0 +1,106 @@ +use std::{cell::RefCell, rc::Rc}; + +use deno_core::{op, JsBuffer, Op, OpDecl, OpState}; +use ethers::types::H160; + +use crate::dcl::scene_apis::RpcCall; + +#[derive(Default)] +pub(crate) struct InternalPendingBinaryMessages { + pub messages: Vec<(H160, Vec)>, +} + +// list of op declarations +pub fn ops() -> Vec { + vec![op_comms_send_string::DECL, op_comms_send_binary::DECL] +} + +pub(crate) const COMMS_MSG_TYPE_STRING: u8 = 1; +pub(crate) const COMMS_MSG_TYPE_BINARY: u8 = 2; + +#[op] +async fn op_comms_send_string( + state: Rc>, + message: String, +) -> Result<(), anyhow::Error> { + let mut message = message.into_bytes(); + message.insert(0, COMMS_MSG_TYPE_STRING); + comms_send(state, vec![message]).await?; + Ok(()) +} + +#[op] +async fn op_comms_send_binary( + state: Rc>, + messages: Vec, +) -> Result>, anyhow::Error> { + let messages = messages + .iter() + .map(|m| { + let mut m = m.as_ref().to_vec(); + m.insert(0, COMMS_MSG_TYPE_BINARY); + m + }) + .collect(); + + comms_send(state.clone(), messages).await?; + + // Get pending Binary messages + if let Some(pending_messages) = state + .borrow_mut() + .try_take::() + { + // TODO: remove comms message type + // Ok(pending_messages.messages) + let messages = pending_messages + .messages + .into_iter() + .map(|(sender_address, mut data)| { + let sender_address_str = format!("{:#x}", sender_address); + let sender_address_str_bytes = sender_address_str.as_bytes(); + + // Remove the comms message type(-1 byte), add the sender address size (+1 byte) + // and add the address in bytes (for H160=20 to string 2+40) + let sender_len = sender_address_str_bytes.len(); + let original_data_len = data.len(); + let new_data_size = original_data_len + 1 + sender_len - 1; + + // Resize to fit the sender address + data.resize(new_data_size, 0); + + // Shift the data to the right to fit the sender address + data.copy_within(1..original_data_len, sender_len + 1); + + // Add the sender address size at the beginning of the data + data[0] = sender_len as u8; + + // Add the sender address at the beginning of the data + data[1..=sender_len].copy_from_slice(sender_address_str_bytes); + + data + }) + .collect(); + Ok(messages) + } else { + Ok(vec![]) + } +} + +async fn comms_send( + state: Rc>, + message: Vec>, +) -> Result<(), anyhow::Error> { + let (sx, rx) = tokio::sync::oneshot::channel::>(); + + state + .borrow_mut() + .borrow_mut::>() + .push(RpcCall::SendCommsMessage { + body: message, + response: sx.into(), + }); + + rx.await + .map_err(|e| anyhow::anyhow!(e))? + .map_err(anyhow::Error::msg) +} diff --git a/rust/decentraland-godot-lib/src/dcl/js/engine.rs b/rust/decentraland-godot-lib/src/dcl/js/engine.rs index 2a4e258c..6531d241 100644 --- a/rust/decentraland-godot-lib/src/dcl/js/engine.rs +++ b/rust/decentraland-godot-lib/src/dcl/js/engine.rs @@ -20,6 +20,7 @@ use crate::dcl::{ }; use super::{ + comms::{InternalPendingBinaryMessages, COMMS_MSG_TYPE_BINARY}, events::process_events, players::{get_player_data, get_players}, }; @@ -69,13 +70,13 @@ fn op_crdt_send_to_renderer(op_state: Rc>, messages: &[u8]) { let sender = op_state.borrow_mut::>(); sender - .send(SceneResponse::Ok( + .send(SceneResponse::Ok { scene_id, - dirty, - logs.0, - elapsed_time, + dirty_crdt_state: dirty, + logs: logs.0, + delta: elapsed_time, rpc_calls, - )) + }) .expect("error sending scene response!!") } @@ -105,11 +106,14 @@ async fn op_crdt_recv_from_renderer(op_state: Rc>) -> Vec { + Some(RendererResponse::Ok { + dirty_crdt_state, + incoming_comms_message, + }) => { let mut data_buf = Vec::new(); let mut data_writter = DclWriter::new(&mut data_buf); - for (component_id, entities) in dirty.lww.iter() { + for (component_id, entities) in dirty_crdt_state.lww.iter() { for entity_id in entities { if let Err(err) = put_or_delete_lww_component( &scene_crdt_state, @@ -122,7 +126,7 @@ async fn op_crdt_recv_from_renderer(op_state: Rc>) -> Vec>) -> Vec) = incoming_comms_message + .into_iter() + .filter(|v| !v.1.is_empty()) + .partition(|v| v.1[0] == COMMS_MSG_TYPE_BINARY); + + if !comms_binary.is_empty() { + let mut internal_pending_binary_messages = op_state + .try_take::() + .unwrap_or_default(); + + internal_pending_binary_messages + .messages + .extend(comms_binary.into_iter()); + op_state.put(internal_pending_binary_messages); + } + process_local_api_calls(local_api_calls, &scene_crdt_state); - process_events(&mut op_state, &scene_crdt_state, &dirty); + process_events( + &mut op_state, + &scene_crdt_state, + &dirty_crdt_state, + comms_string, + ); data_buf } diff --git a/rust/decentraland-godot-lib/src/dcl/js/events.rs b/rust/decentraland-godot-lib/src/dcl/js/events.rs index a765be30..8492a1e8 100644 --- a/rust/decentraland-godot-lib/src/dcl/js/events.rs +++ b/rust/decentraland-godot-lib/src/dcl/js/events.rs @@ -4,11 +4,13 @@ use crate::dcl::{ SceneComponentId, SceneEntityId, }, crdt::{ - grow_only_set::GenericGrowOnlySetComponentOperation, DirtyCrdtState, SceneCrdtState, + grow_only_set::GenericGrowOnlySetComponentOperation, + last_write_wins::LastWriteWinsComponentOperation, DirtyCrdtState, SceneCrdtState, SceneCrdtStateProtoComponents, }, }; use deno_core::{op, Op, OpDecl, OpState}; +use ethers::types::H160; use serde::Serialize; use std::{ collections::{HashMap, HashSet}, @@ -52,10 +54,9 @@ struct EventBodyRealmChanged { } #[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct EventBodyPlayerClicked { - user_id: String, - ray: EventBodyRay, +struct EventComms { + sender: String, + message: String, } #[derive(Serialize)] @@ -72,6 +73,13 @@ struct EventBodyVector3 { z: f32, } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct EventBodyPlayerClicked { + user_id: String, + ray: EventBodyRay, +} + impl From> for EventBodyRay { fn from(ray: Option<&RaycastHit>) -> Self { if let Some(ray) = ray { @@ -260,13 +268,51 @@ pub fn process_events( op_state: &mut OpState, crdt_state: &SceneCrdtState, dirty_crdt_state: &DirtyCrdtState, + comms_string: Vec<(H160, Vec)>, ) { process_events_players_stateful(op_state, crdt_state, dirty_crdt_state); process_events_players_stateless(op_state, crdt_state, dirty_crdt_state); + let messages = comms_string + .into_iter() + .map(|(sender_address, data)| { + let sender = format!("{:#x}", sender_address); + let message = String::from_utf8_lossy(&data[1..]).to_string(); + EventComms { sender, message } + }) + .collect::>(); + + if !messages.is_empty() { + if let Some(player_clicked_sender) = op_state.try_take::>() { + messages.into_iter().for_each(|message| { + player_clicked_sender + .inner + .send(serde_json::to_string(&message).unwrap()) + .unwrap(); + }); + op_state.put(player_clicked_sender); + } + } + + let engine_info_component = SceneCrdtStateProtoComponents::get_engine_info(crdt_state); + let tick_number = if let Some(entry) = engine_info_component.get(&SceneEntityId::ROOT) { + if let Some(value) = entry.value.as_ref() { + value.tick_number + } else { + 0 + } + } else { + 0 + }; + + if tick_number == 4 { + if let Some(scene_ready_sender) = op_state.try_take::>() { + scene_ready_sender.inner.send("{}".to_string()).unwrap(); + op_state.put(scene_ready_sender); + } + } + // TODO: RealmChanged, it needs to add a new component to the crdt (realmInfo or something) - // TODO: MessageBus, sender should be on the main thread side - // TODO: SceneReady, in the tick == 4, send the event } struct EventPlayerState { diff --git a/rust/decentraland-godot-lib/src/dcl/js/js_modules/CommunicationsController.js b/rust/decentraland-godot-lib/src/dcl/js/js_modules/CommunicationsController.js index c9a808a6..4ba7cdc6 100644 --- a/rust/decentraland-godot-lib/src/dcl/js/js_modules/CommunicationsController.js +++ b/rust/decentraland-godot-lib/src/dcl/js/js_modules/CommunicationsController.js @@ -1,2 +1,11 @@ -module.exports.send = async function (body) { return {} } -module.exports.sendBinary = async function (body) { return { data: [] } } \ No newline at end of file +module.exports.send = async function (body) { + await Deno.core.ops.op_comms_send_string(body.message); + return {} +} + +module.exports.sendBinary = async function (body) { + const data = (await Deno.core.ops.op_comms_send_binary([...body.data])).map((item) => new Uint8Array(item)); + return { + data + } +} \ No newline at end of file diff --git a/rust/decentraland-godot-lib/src/dcl/js/mod.rs b/rust/decentraland-godot-lib/src/dcl/js/mod.rs index b9dc4de4..24a1ba07 100644 --- a/rust/decentraland-godot-lib/src/dcl/js/mod.rs +++ b/rust/decentraland-godot-lib/src/dcl/js/mod.rs @@ -1,13 +1,14 @@ -pub mod engine; -pub mod ethereum_controller; -pub mod events; -pub mod fetch; -pub mod players; -pub mod portables; -pub mod restricted_actions; -pub mod runtime; -pub mod testing; -pub mod websocket; +mod comms; +mod engine; +mod ethereum_controller; +mod events; +mod fetch; +mod players; +mod portables; +mod restricted_actions; +mod runtime; +mod testing; +mod websocket; use crate::auth::ephemeral_auth_chain::EphemeralAuthChain; use crate::auth::ethereum_provider::EthereumProvider; @@ -49,7 +50,7 @@ pub fn create_runtime() -> deno_core::JsRuntime { // add core ops ext = ext.ops(vec![op_require::DECL, op_log::DECL, op_error::DECL]); - let op_sets: [Vec; 10] = [ + let op_sets: [Vec; 11] = [ engine::ops(), runtime::ops(), fetch::ops(), @@ -60,6 +61,7 @@ pub fn create_runtime() -> deno_core::JsRuntime { events::ops(), testing::ops(), ethereum_controller::ops(), + comms::ops(), ]; let mut op_map = HashMap::new(); @@ -130,13 +132,13 @@ pub(crate) fn scene_thread( let dirty = scene_crdt_state.take_dirty(); thread_sender_to_main - .send(SceneResponse::Ok( + .send(SceneResponse::Ok { scene_id, - dirty, - Vec::new(), - 0.0, - Vec::new(), - )) + dirty_crdt_state: dirty, + logs: Vec::new(), + delta: 0.0, + rpc_calls: Vec::new(), + }) .expect("error sending scene response!!"); scene_main_crdt = Some(buf); diff --git a/rust/decentraland-godot-lib/src/dcl/mod.rs b/rust/decentraland-godot-lib/src/dcl/mod.rs index 1dca93f4..5ae3e3ff 100644 --- a/rust/decentraland-godot-lib/src/dcl/mod.rs +++ b/rust/decentraland-godot-lib/src/dcl/mod.rs @@ -6,6 +6,7 @@ pub mod js; pub mod scene_apis; pub mod serialization; +use ethers::types::H160; use godot::builtin::{Vector2, Vector3}; use crate::{ @@ -40,7 +41,10 @@ impl SceneId { // data from renderer to scene #[derive(Debug)] pub enum RendererResponse { - Ok(DirtyCrdtState), + Ok { + dirty_crdt_state: Box, + incoming_comms_message: Vec<(H160, Vec)>, + }, Kill, } @@ -48,13 +52,13 @@ pub enum RendererResponse { #[derive(Debug)] pub enum SceneResponse { Error(SceneId, String), - Ok( - SceneId, - DirtyCrdtState, - Vec, - f32, - Vec, - ), + Ok { + scene_id: SceneId, + dirty_crdt_state: DirtyCrdtState, + logs: Vec, + delta: f32, + rpc_calls: Vec, + }, RemoveGodotScene(SceneId, Vec), TakeSnapshot { scene_id: SceneId, diff --git a/rust/decentraland-godot-lib/src/dcl/scene_apis.rs b/rust/decentraland-godot-lib/src/dcl/scene_apis.rs index 453a92d8..c5255ff4 100644 --- a/rust/decentraland-godot-lib/src/dcl/scene_apis.rs +++ b/rust/decentraland-godot-lib/src/dcl/scene_apis.rs @@ -168,6 +168,10 @@ pub enum RpcCall { body: RPCSendableMessage, response: RpcResultSender>, }, + SendCommsMessage { + body: Vec>, + response: RpcResultSender>, + }, } #[derive(Debug)] diff --git a/rust/decentraland-godot-lib/src/scene_runner/rpc_calls/mod.rs b/rust/decentraland-godot-lib/src/scene_runner/rpc_calls/mod.rs index 7842d65f..faffa820 100644 --- a/rust/decentraland-godot-lib/src/scene_runner/rpc_calls/mod.rs +++ b/rust/decentraland-godot-lib/src/scene_runner/rpc_calls/mod.rs @@ -97,6 +97,15 @@ pub fn process_rpcs(scene: &mut Scene, current_parcel_scene_id: &SceneId, rpc_ca .bind() .send_async(body, response); } + RpcCall::SendCommsMessage { body, response } => { + let scene_id = scene.scene_entity_definition.id.clone(); + let mut comms = DclGlobal::singleton().bind().get_comms(); + let mut communication_manager = comms.bind_mut(); + for data in body { + communication_manager.send_scene_message(scene_id.clone(), data); + } + response.send(Ok(())); + } } } } diff --git a/rust/decentraland-godot-lib/src/scene_runner/scene_manager.rs b/rust/decentraland-godot-lib/src/scene_runner/scene_manager.rs index f793f81f..6e6f319b 100644 --- a/rust/decentraland-godot-lib/src/scene_runner/scene_manager.rs +++ b/rust/decentraland-godot-lib/src/scene_runner/scene_manager.rs @@ -490,7 +490,13 @@ impl SceneManager { arguments.push(GString::from(&msg).to_variant()); self.console.callv(arguments); } - SceneResponse::Ok(scene_id, dirty_crdt_state, logs, _, rpc_calls) => { + SceneResponse::Ok { + scene_id, + dirty_crdt_state, + logs, + rpc_calls, + delta: _, + } => { if let Some(scene) = self.scenes.get_mut(&scene_id) { let dirty = Dirty { waiting_process: true, diff --git a/rust/decentraland-godot-lib/src/scene_runner/update_scene.rs b/rust/decentraland-godot-lib/src/scene_runner/update_scene.rs index 8b8c9352..6a84575e 100644 --- a/rust/decentraland-godot-lib/src/scene_runner/update_scene.rs +++ b/rust/decentraland-godot-lib/src/scene_runner/update_scene.rs @@ -333,9 +333,18 @@ pub fn _process_scene( pointer_events_result_component.append(entity, value); } + let incoming_comms_message = DclGlobal::singleton() + .bind_mut() + .comms + .bind_mut() + .get_pending_messages(&scene.scene_entity_definition.id); + // Set renderer response to the scene - let dirty = crdt_state.take_dirty(); - scene.current_dirty.renderer_response = Some(RendererResponse::Ok(dirty)); + let dirty_crdt_state = crdt_state.take_dirty(); + scene.current_dirty.renderer_response = Some(RendererResponse::Ok { + dirty_crdt_state: Box::new(dirty_crdt_state), + incoming_comms_message, + }); false } SceneUpdateState::ProcessRpcs => { From dda78549bb352380516f29d90d1e5b64a3f2dfef Mon Sep 17 00:00:00 2001 From: Lean Mendoza Date: Mon, 19 Feb 2024 16:42:53 -0300 Subject: [PATCH 2/2] remove comment --- rust/decentraland-godot-lib/src/dcl/js/comms.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/decentraland-godot-lib/src/dcl/js/comms.rs b/rust/decentraland-godot-lib/src/dcl/js/comms.rs index 8c6d261b..65466505 100644 --- a/rust/decentraland-godot-lib/src/dcl/js/comms.rs +++ b/rust/decentraland-godot-lib/src/dcl/js/comms.rs @@ -50,8 +50,6 @@ async fn op_comms_send_binary( .borrow_mut() .try_take::() { - // TODO: remove comms message type - // Ok(pending_messages.messages) let messages = pending_messages .messages .into_iter()