Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement send and sendBinary over comms #279

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub trait Adapter {
fn clean(&mut self);

fn consume_chats(&mut self) -> Vec<(H160, rfc4::Chat)>;
fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)>;
fn change_profile(&mut self, new_profile: UserProfile);

fn send_rfc4(&mut self, packet: rfc4::Packet, unreliable: bool) -> bool;
Expand Down
22 changes: 21 additions & 1 deletion rust/decentraland-godot-lib/src/comms/adapter/livekit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub struct LivekitRoom {
last_profile_request_sent: Instant,
last_profile_version_announced: u32,
chats: Vec<(H160, rfc4::Chat)>,

// Scene messges
incoming_scene_messages: HashMap<String, Vec<(H160, Vec<u8>)>>,
}

impl LivekitRoom {
Expand Down Expand Up @@ -94,6 +97,7 @@ impl LivekitRoom {
peer_alias_counter: 0,
last_profile_version_announced: 0,
chats: Vec::new(),
incoming_scene_messages: HashMap::new(),
}
}

Expand Down Expand Up @@ -212,7 +216,15 @@ impl LivekitRoom {
avatar_scene.update_avatar_by_alias(peer.alias, &profile);
peer.profile = Some(profile);
}
ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(_scene)) => {}
ToSceneMessage::Rfc4(rfc4::packet::Message::Scene(scene)) => {
let entry = self
.incoming_scene_messages
.entry(scene.scene_id)
.or_default();

// TODO: should we limit the size of the queue or accumulated bytes?
entry.push((message.address, scene.data));
}
ToSceneMessage::Rfc4(rfc4::packet::Message::Voice(_voice)) => {}
ToSceneMessage::InitVoice(frame) => {
avatar_scene.spawn_voice_channel(
Expand Down Expand Up @@ -347,6 +359,14 @@ impl Adapter for LivekitRoom {
fn support_voice_chat(&self) -> bool {
true
}

fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) {
std::mem::take(messages)
} else {
Vec::new()
}
}
}

fn spawn_livekit_task(
Expand Down
22 changes: 21 additions & 1 deletion rust/decentraland-godot-lib/src/comms/adapter/ws_room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct WebSocketRoom {
last_profile_response_sent: Instant,
last_profile_request_sent: Instant,
last_profile_version_announced: u32,

// Scene messges
incoming_scene_messages: HashMap<String, Vec<(H160, Vec<u8>)>>,
}

impl WebSocketRoom {
Expand Down Expand Up @@ -104,6 +107,7 @@ impl WebSocketRoom {
last_profile_request_sent: old_time,
last_try_to_connect: old_time,
last_profile_version_announced: 0,
incoming_scene_messages: HashMap::new(),
}
}

Expand Down Expand Up @@ -483,7 +487,15 @@ impl WebSocketRoom {
.unwrap()
.profile = Some(profile);
}
rfc4::packet::Message::Scene(_scene) => {}
rfc4::packet::Message::Scene(scene) => {
let entry = self
.incoming_scene_messages
.entry(scene.scene_id)
.or_default();

// TODO: should we limit the size of the queue or accumulated bytes?
entry.push((peer.address, scene.data));
}
rfc4::packet::Message::Voice(_voice) => {}
}
}
Expand Down Expand Up @@ -595,4 +607,12 @@ impl Adapter for WebSocketRoom {
fn support_voice_chat(&self) -> bool {
false
}

fn consume_scene_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
if let Some(messages) = self.incoming_scene_messages.get_mut(scene_id) {
std::mem::take(messages)
} else {
Vec::new()
}
}
}
35 changes: 34 additions & 1 deletion rust/decentraland-godot-lib/src/comms/communication_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,40 @@ impl INode for CommunicationManager {
}
}

impl CommunicationManager {}
impl CommunicationManager {
pub fn send_scene_message(&mut self, scene_id: String, data: Vec<u8>) {
let scene_message = rfc4::Packet {
message: Some(rfc4::packet::Message::Scene(rfc4::Scene { scene_id, data })),
};
match &mut self.current_connection {
CommsConnection::Connected(adapter) => {
adapter.send_rfc4(scene_message, true);
}
#[cfg(feature = "use_livekit")]
CommsConnection::Archipelago(archipelago) => {
if let Some(adapter) = archipelago.adapter() {
adapter.send_rfc4(scene_message, true);
}
}
_ => {}
}
}

pub fn get_pending_messages(&mut self, scene_id: &str) -> Vec<(H160, Vec<u8>)> {
match &mut self.current_connection {
CommsConnection::Connected(adapter) => adapter.consume_scene_messages(scene_id),
#[cfg(feature = "use_livekit")]
CommsConnection::Archipelago(archipelago) => {
if let Some(adapter) = archipelago.adapter() {
adapter.consume_scene_messages(scene_id)
} else {
vec![]
}
}
_ => vec![],
}
}
}

#[godot_api]
impl CommunicationManager {
Expand Down
104 changes: 104 additions & 0 deletions rust/decentraland-godot-lib/src/dcl/js/comms.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{cell::RefCell, rc::Rc};

use deno_core::{op, JsBuffer, Op, OpDecl, OpState};
use ethers::types::H160;

use crate::dcl::scene_apis::RpcCall;

#[derive(Default)]
pub(crate) struct InternalPendingBinaryMessages {
pub messages: Vec<(H160, Vec<u8>)>,
}

// list of op declarations
pub fn ops() -> Vec<OpDecl> {
vec![op_comms_send_string::DECL, op_comms_send_binary::DECL]
}

pub(crate) const COMMS_MSG_TYPE_STRING: u8 = 1;
pub(crate) const COMMS_MSG_TYPE_BINARY: u8 = 2;

#[op]
async fn op_comms_send_string(
state: Rc<RefCell<OpState>>,
message: String,
) -> Result<(), anyhow::Error> {
let mut message = message.into_bytes();
message.insert(0, COMMS_MSG_TYPE_STRING);
comms_send(state, vec![message]).await?;
Ok(())
}

#[op]
async fn op_comms_send_binary(
state: Rc<RefCell<OpState>>,
messages: Vec<JsBuffer>,
) -> Result<Vec<Vec<u8>>, anyhow::Error> {
let messages = messages
.iter()
.map(|m| {
let mut m = m.as_ref().to_vec();
m.insert(0, COMMS_MSG_TYPE_BINARY);
m
})
.collect();

comms_send(state.clone(), messages).await?;

// Get pending Binary messages
if let Some(pending_messages) = state
.borrow_mut()
.try_take::<InternalPendingBinaryMessages>()
{
let messages = pending_messages
.messages
.into_iter()
.map(|(sender_address, mut data)| {
let sender_address_str = format!("{:#x}", sender_address);
let sender_address_str_bytes = sender_address_str.as_bytes();

// Remove the comms message type(-1 byte), add the sender address size (+1 byte)
// and add the address in bytes (for H160=20 to string 2+40)
let sender_len = sender_address_str_bytes.len();
let original_data_len = data.len();
let new_data_size = original_data_len + 1 + sender_len - 1;

// Resize to fit the sender address
data.resize(new_data_size, 0);

// Shift the data to the right to fit the sender address
data.copy_within(1..original_data_len, sender_len + 1);

// Add the sender address size at the beginning of the data
data[0] = sender_len as u8;

// Add the sender address at the beginning of the data
data[1..=sender_len].copy_from_slice(sender_address_str_bytes);

data
})
.collect();
Ok(messages)
} else {
Ok(vec![])
}
}

async fn comms_send(
state: Rc<RefCell<OpState>>,
message: Vec<Vec<u8>>,
) -> Result<(), anyhow::Error> {
let (sx, rx) = tokio::sync::oneshot::channel::<Result<(), String>>();

state
.borrow_mut()
.borrow_mut::<Vec<RpcCall>>()
.push(RpcCall::SendCommsMessage {
body: message,
response: sx.into(),
});

rx.await
.map_err(|e| anyhow::anyhow!(e))?
.map_err(anyhow::Error::msg)
}
45 changes: 35 additions & 10 deletions rust/decentraland-godot-lib/src/dcl/js/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::dcl::{
};

use super::{
comms::{InternalPendingBinaryMessages, COMMS_MSG_TYPE_BINARY},
events::process_events,
players::{get_player_data, get_players},
};
Expand Down Expand Up @@ -69,13 +70,13 @@ fn op_crdt_send_to_renderer(op_state: Rc<RefCell<OpState>>, messages: &[u8]) {
let sender = op_state.borrow_mut::<std::sync::mpsc::SyncSender<SceneResponse>>();

sender
.send(SceneResponse::Ok(
.send(SceneResponse::Ok {
scene_id,
dirty,
logs.0,
elapsed_time,
dirty_crdt_state: dirty,
logs: logs.0,
delta: elapsed_time,
rpc_calls,
))
})
.expect("error sending scene response!!")
}

Expand Down Expand Up @@ -105,11 +106,14 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
let scene_crdt_state = cloned_scene_crdt.lock().unwrap();

let data = match response {
Some(RendererResponse::Ok(dirty)) => {
Some(RendererResponse::Ok {
dirty_crdt_state,
incoming_comms_message,
}) => {
let mut data_buf = Vec::new();
let mut data_writter = DclWriter::new(&mut data_buf);

for (component_id, entities) in dirty.lww.iter() {
for (component_id, entities) in dirty_crdt_state.lww.iter() {
for entity_id in entities {
if let Err(err) = put_or_delete_lww_component(
&scene_crdt_state,
Expand All @@ -122,7 +126,7 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
}
}

for (component_id, entities) in dirty.gos.iter() {
for (component_id, entities) in dirty_crdt_state.gos.iter() {
for (entity_id, element_count) in entities {
if let Err(err) = append_gos_component(
&scene_crdt_state,
Expand All @@ -136,12 +140,33 @@ async fn op_crdt_recv_from_renderer(op_state: Rc<RefCell<OpState>>) -> Vec<Vec<u
}
}

for entity_id in dirty.entities.died.iter() {
for entity_id in dirty_crdt_state.entities.died.iter() {
delete_entity(entity_id, &mut data_writter);
}

let (comms_binary, comms_string): (_, Vec<_>) = incoming_comms_message
.into_iter()
.filter(|v| !v.1.is_empty())
.partition(|v| v.1[0] == COMMS_MSG_TYPE_BINARY);

if !comms_binary.is_empty() {
let mut internal_pending_binary_messages = op_state
.try_take::<InternalPendingBinaryMessages>()
.unwrap_or_default();

internal_pending_binary_messages
.messages
.extend(comms_binary.into_iter());
op_state.put(internal_pending_binary_messages);
}

process_local_api_calls(local_api_calls, &scene_crdt_state);
process_events(&mut op_state, &scene_crdt_state, &dirty);
process_events(
&mut op_state,
&scene_crdt_state,
&dirty_crdt_state,
comms_string,
);

data_buf
}
Expand Down
Loading
Loading