From 1b36389c8fd49f45226ceb8460e2fa412c7a7bef Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 25 Mar 2024 23:45:12 -0500 Subject: [PATCH 1/5] perf: use thread_local for encoding packets Ideally this will reduce the time using flume channels to send packets because we can batch them together. --- Cargo.lock | 21 +++++++ server/Cargo.toml | 4 +- server/src/io.rs | 68 +---------------------- server/src/lib.rs | 26 ++++++--- server/src/singleton.rs | 1 + server/src/singleton/encoder.rs | 60 ++++++++++++++++++++ server/src/system/entity_move_logic.rs | 22 +++++--- server/src/system/init_entity.rs | 3 +- server/src/system/reset_bounding_boxes.rs | 2 - server/src/system/tps_message.rs | 17 +++--- 10 files changed, 131 insertions(+), 93 deletions(-) create mode 100644 server/src/singleton.rs create mode 100644 server/src/singleton/encoder.rs diff --git a/Cargo.lock b/Cargo.lock index 3011bc95..7758bb57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,16 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libmimalloc-sys" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1115,6 +1125,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mimalloc" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa01922b5ea280a911e323e4d2fd24b7fe5cc4042e0d2cda3c40775cdc4bdc9c" +dependencies = [ + "libmimalloc-sys", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1713,6 +1732,7 @@ dependencies = [ "itertools", "jemalloc-ctl", "jemallocator", + "mimalloc", "monoio", "ndarray", "pprof", @@ -1724,6 +1744,7 @@ dependencies = [ "signal-hook", "smallvec 2.0.0-alpha.5", "thread-priority", + "thread_local", "tracing", "tracing-flame", "tracing-subscriber", diff --git a/server/Cargo.toml b/server/Cargo.toml index aa451d49..8e4b2f8f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -54,7 +54,8 @@ jemalloc-ctl = "0.5.4" # removing this because jemalloc-ctl is nice for memory stats also # https://github.com/rust-lang/rust-analyzer/issues/1441#issuecomment-509506279 # jemalloc uses significantly less memory -#mimalloc = { version = "0.1.39" , default-features = false } +mimalloc = { version = "0.1.39" , default-features = false } +thread_local = { version = "1.1.8", features = ["nightly"] } [lints.rust] @@ -123,6 +124,7 @@ cast_precision_loss = "allow" # consider denying missing_errors_doc = "allow" # consider denying wildcard_imports = "allow" non_ascii_literal = "allow" +no_mangle_with_rust_abi = "allow" perf = "deny" diff --git a/server/src/io.rs b/server/src/io.rs index 8c6932f0..fd599fe0 100644 --- a/server/src/io.rs +++ b/server/src/io.rs @@ -77,17 +77,6 @@ pub struct WriterComm { enc: PacketEncoder, } -pub fn encode_packet

(pkt: &P) -> anyhow::Result -where - P: valence_protocol::Packet + Encode, -{ - let mut enc = PacketEncoder::default(); - enc.append_packet(pkt)?; - let bytes = enc.take(); - - Ok(bytes.freeze()) -} - type ReaderComm = flume::Receiver; impl WriterComm { @@ -394,60 +383,9 @@ impl Io { monoio::spawn(async move { while let Ok(bytes) = s2c_rx.recv_async().await { - // if macos - // if there are multiple elements in the channel, batch them. - // This is especially useful on macOS which does not support - // io_uring and has a high cost for each write (context switch for each syscall). - #[cfg(target_os = "macos")] - { - if s2c_rx.is_empty() { - if let Err(e) = io_write.send_packet(bytes).await { - error!("{e:?}"); - break; - } - continue; - } - - let mut byte_collect = bytes.to_vec(); - - // we are using drain so we do not go in infinite loop - for other_byte in s2c_rx.drain() { - let other_byte = other_byte.to_vec(); - // todo: or extend slice - byte_collect.extend(other_byte); - } - - let bytes = bytes::Bytes::from(byte_collect); - - if let Err(e) = io_write.send_packet(bytes).await { - error!("{e:?}"); - break; - } - continue; - } - - // if linux - #[cfg(target_os = "linux")] - { - if let Err(e) = io_write.send_packet(bytes).await { - error!("{e:?}"); - break; - } - continue; - } - - // if windows panic - #[cfg(target_os = "windows")] - { - panic!("windows not supported"); - continue; - } - - // if other panic - #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] - { - panic!("unsupported os"); - continue; + if let Err(e) = io_write.send_packet(bytes).await { + error!("{e:?}"); + break; } } }); diff --git a/server/src/lib.rs b/server/src/lib.rs index bae5b23a..a4bbbff1 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,10 +1,13 @@ #![allow(clippy::many_single_char_names)] +#![feature(thread_local)] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + extern crate core; mod chunk; +mod singleton; use std::{ cell::UnsafeCell, @@ -16,6 +19,7 @@ use std::{ use anyhow::Context; use evenio::prelude::*; use jemalloc_ctl::{epoch, stats}; +use ndarray::s; use signal_hook::iterator::Signals; use tracing::{info, instrument, warn}; use valence_protocol::math::DVec3; @@ -24,6 +28,7 @@ use crate::{ bounding_box::BoundingBox, io::{server, ClientConnection, Packets}, }; +use crate::singleton::encoder::Encoder; mod global; mod io; @@ -97,8 +102,8 @@ struct KillAllEntities; #[derive(Event, Copy, Clone)] #[allow(dead_code)] struct StatsEvent { - ms_per_tick_mean: f64, - ms_per_tick_std_dev: f64, + ms_per_tick_mean_1s: f64, + ms_per_tick_mean_5s: f64, allocated: usize, resident: usize, } @@ -172,6 +177,10 @@ impl Game { let bounding_boxes = world.spawn(); world.insert(bounding_boxes, bounding_box::EntityBoundingBoxes::default()); + + let encoder = world.spawn(); + world.insert(encoder, Encoder::default()); + let mut game = Self { world, @@ -220,7 +229,7 @@ impl Game { #[instrument(skip_all)] pub fn tick(&mut self) { const LAST_TICK_HISTORY_SIZE: usize = 100; - const MSPT_HISTORY_SIZE: usize = 20; + const MSPT_HISTORY_SIZE: usize = 100; let now = Instant::now(); self.last_ticks.push_back(now); @@ -257,10 +266,11 @@ impl Game { if self.last_ms_per_tick.len() > MSPT_HISTORY_SIZE { // efficient - let arr = ndarray::Array::from_iter(self.last_ms_per_tick.iter().copied()); + let arr = ndarray::Array::from_iter(self.last_ms_per_tick.iter().copied().rev()); - let std_dev = arr.std(0.0); - let mean = arr.mean().unwrap(); + // last 1 second (20 ticks) 5 seconds (100 ticks) and 25 seconds (500 ticks) + let mean_1_second = arr.slice(s![..20]).mean().unwrap(); + let mean_5_seconds = arr.slice(s![..100]).mean().unwrap(); let allocated = stats::allocated::mib().unwrap(); let resident = stats::resident::mib().unwrap(); @@ -276,8 +286,8 @@ impl Game { let resident = resident.read().unwrap(); self.world.send(StatsEvent { - ms_per_tick_mean: mean, - ms_per_tick_std_dev: std_dev, + ms_per_tick_mean_1s: mean_1_second, + ms_per_tick_mean_5s: mean_5_seconds, allocated, resident, }); diff --git a/server/src/singleton.rs b/server/src/singleton.rs new file mode 100644 index 00000000..7d4eb3c0 --- /dev/null +++ b/server/src/singleton.rs @@ -0,0 +1 @@ +pub mod encoder; \ No newline at end of file diff --git a/server/src/singleton/encoder.rs b/server/src/singleton/encoder.rs new file mode 100644 index 00000000..23c0904e --- /dev/null +++ b/server/src/singleton/encoder.rs @@ -0,0 +1,60 @@ +// https://stackoverflow.com/a/61681112/4889030 +// https://matklad.github.io/2020/10/03/fast-thread-locals-in-rust.html +use std::cell::UnsafeCell; + +use evenio::component::Component; +use thread_local::ThreadLocal; +use valence_protocol::{Encode, Packet, PacketEncoder}; + +#[derive(Default, Component)] +pub struct Encoder { + local: ThreadLocal>, +} + +impl Encoder { + pub fn append(&self, packet: &P) -> anyhow::Result<()> { + let encoder = self.local.get_or_default(); + + // Safety: + // The use of `unsafe` here is justified by the guarantees provided by the `ThreadLocal` and + // `UnsafeCell` usage patterns: + // 1. Thread-local storage ensures that the `UnsafeCell` is accessed only + // within the context of a single thread, eliminating the risk of concurrent access + // violations. + // 2. `UnsafeCell` is the fundamental building block for mutable shared state in Rust. By + // using `UnsafeCell`, we're explicitly signaling that the contained value + // (`PacketEncoder`) may be mutated through a shared reference. This is necessary because + // Rust's borrowing rules disallow mutable aliasing, which would be violated if we + // attempted to mutate through a shared reference without `UnsafeCell`. + // 3. The dereference of `encoder.get()` to obtain a mutable reference to the + // `PacketEncoder` (`&mut *encoder.get()`) is safe under the assumption that no other + // references to the `PacketEncoder` are concurrently alive. This assumption is upheld by + // the `ThreadLocal` storage, ensuring that the mutable reference is exclusive to the + // current thread. + // Therefore, the use of `unsafe` is encapsulated within this method and does not leak + // unsafe guarantees to the caller, provided the `Encoder` struct itself is used in a + // thread-safe manner. + let encoder = unsafe { &mut *encoder.get() }; + encoder.append_packet(packet) + } + + pub fn drain(&mut self) -> impl Iterator + '_ { + self.local.iter_mut().map(|encoder| { + let encoder = encoder.get_mut(); + encoder.take().freeze() + }) + } +} + +#[cfg(test)] +mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; + + use crate::singleton::encoder::Encoder; + + const fn _assert_auto_trait_impls() + where + Encoder: Send + Sync + UnwindSafe + RefUnwindSafe, + { + } +} diff --git a/server/src/system/entity_move_logic.rs b/server/src/system/entity_move_logic.rs index 1f8f5879..44dfac7d 100644 --- a/server/src/system/entity_move_logic.rs +++ b/server/src/system/entity_move_logic.rs @@ -1,7 +1,7 @@ use evenio::{ entity::EntityId, event::Receiver, - fetch::Fetcher, + fetch::{Fetcher, Single}, query::{Not, Query, With}, rayon::prelude::*, }; @@ -12,8 +12,8 @@ use valence_protocol::{ }; use crate::{ - io::encode_packet, EntityReaction, FullEntityPose, Gametick, MinecraftEntity, Player, - RunningSpeed, Targetable, + singleton::encoder::Encoder, EntityReaction, FullEntityPose, Gametick, + MinecraftEntity, Player, RunningSpeed, Targetable, }; // 0 &mut FullEntityPose @@ -46,6 +46,7 @@ pub fn entity_move_logic( &Player, // 3 Not<&MinecraftEntity>, // not 1 )>, + encoder: Single<&mut Encoder>, ) { use valence_protocol::packets::play; @@ -56,6 +57,8 @@ pub fn entity_move_logic( let target = target.position; + let encoder = encoder.0; + entities.par_iter_mut().for_each(|query| { let EntityQuery { id, @@ -114,12 +117,13 @@ pub fn entity_move_logic( }; // todo: remove unwrap - let pos = encode_packet(&pos).unwrap(); - let look = encode_packet(&look).unwrap(); + encoder.append(&pos).unwrap(); + encoder.append(&look).unwrap(); + }); - player.iter().for_each(|(_, player, ..)| { - let _ = player.packets.writer.send_raw(pos.clone()); - let _ = player.packets.writer.send_raw(look.clone()); + for bytes in encoder.drain() { + player.par_iter().for_each(|(_, player, _)| { + let _ = player.packets.writer.send_raw(bytes.clone()); }); - }); + } } diff --git a/server/src/system/init_entity.rs b/server/src/system/init_entity.rs index 48aaf775..b14f75eb 100644 --- a/server/src/system/init_entity.rs +++ b/server/src/system/init_entity.rs @@ -79,6 +79,7 @@ pub fn init_entity( }; players.iter_mut().for_each(|player| { - player.packets.writer.send_packet(&pkt).unwrap(); + // todo: handle error + let _ = player.packets.writer.send_packet(&pkt); }); } diff --git a/server/src/system/reset_bounding_boxes.rs b/server/src/system/reset_bounding_boxes.rs index c96c51ac..fc209de4 100644 --- a/server/src/system/reset_bounding_boxes.rs +++ b/server/src/system/reset_bounding_boxes.rs @@ -13,8 +13,6 @@ pub struct EntityQuery<'a> { pose: &'a mut FullEntityPose, } -// #[no_mangle] -// #[instrument(skip_all, name = "reset_bounding_boxes")] pub fn reset_bounding_boxes( _: Receiver, entity_bounding_boxes: Single<&mut EntityBoundingBoxes>, diff --git a/server/src/system/tps_message.rs b/server/src/system/tps_message.rs index 2c3c00e2..765245a1 100644 --- a/server/src/system/tps_message.rs +++ b/server/src/system/tps_message.rs @@ -1,13 +1,16 @@ use evenio::prelude::*; use tracing::instrument; -use crate::{bytes_to_mb, Player, StatsEvent}; +use crate::{bytes_to_mb, FullEntityPose, Player, StatsEvent}; #[instrument(skip_all, name = "tps_message")] -pub fn tps_message(r: Receiver, mut players: Fetcher<&mut Player>) { +pub fn tps_message( + r: Receiver, + mut players: Fetcher<(&mut Player, &FullEntityPose)> +) { let StatsEvent { - ms_per_tick_mean, - ms_per_tick_std_dev, + ms_per_tick_mean_1s, + ms_per_tick_mean_5s, resident, .. } = r.event; @@ -15,10 +18,10 @@ pub fn tps_message(r: Receiver, mut players: Fetcher<&mut Player>) { // let allocated = bytes_to_mb(*allocated); let resident = bytes_to_mb(*resident); - // make sexy with stddev & mean symbol - let message = format!("µ={ms_per_tick_mean:.2}, σ={ms_per_tick_std_dev:.2}, {resident:.2}MiB"); + players.iter_mut().for_each(|(player, _)| { + // make sexy with stddev & mean symbol + let message = format!("µms {ms_per_tick_mean_1s:.2} {ms_per_tick_mean_5s:.2}, {resident:.2}MiB"); - players.iter_mut().for_each(|player| { // todo: handle error let _ = player.packets.writer.send_chat_message(&message); }); From a4be2d741182b5bc16181d98acb1c352788e93c1 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 25 Mar 2024 23:53:32 -0500 Subject: [PATCH 2/5] fmt --- server/src/lib.rs | 6 ++---- server/src/singleton.rs | 2 +- server/src/system/entity_move_logic.rs | 4 ++-- server/src/system/tps_message.rs | 8 +++----- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index a4bbbff1..ced69a51 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -4,7 +4,6 @@ #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; - extern crate core; mod chunk; mod singleton; @@ -27,8 +26,8 @@ use valence_protocol::math::DVec3; use crate::{ bounding_box::BoundingBox, io::{server, ClientConnection, Packets}, + singleton::encoder::Encoder, }; -use crate::singleton::encoder::Encoder; mod global; mod io; @@ -177,10 +176,9 @@ impl Game { let bounding_boxes = world.spawn(); world.insert(bounding_boxes, bounding_box::EntityBoundingBoxes::default()); - + let encoder = world.spawn(); world.insert(encoder, Encoder::default()); - let mut game = Self { world, diff --git a/server/src/singleton.rs b/server/src/singleton.rs index 7d4eb3c0..4c19e5ba 100644 --- a/server/src/singleton.rs +++ b/server/src/singleton.rs @@ -1 +1 @@ -pub mod encoder; \ No newline at end of file +pub mod encoder; diff --git a/server/src/system/entity_move_logic.rs b/server/src/system/entity_move_logic.rs index 44dfac7d..687ad63c 100644 --- a/server/src/system/entity_move_logic.rs +++ b/server/src/system/entity_move_logic.rs @@ -12,8 +12,8 @@ use valence_protocol::{ }; use crate::{ - singleton::encoder::Encoder, EntityReaction, FullEntityPose, Gametick, - MinecraftEntity, Player, RunningSpeed, Targetable, + singleton::encoder::Encoder, EntityReaction, FullEntityPose, Gametick, MinecraftEntity, Player, + RunningSpeed, Targetable, }; // 0 &mut FullEntityPose diff --git a/server/src/system/tps_message.rs b/server/src/system/tps_message.rs index 765245a1..c4682183 100644 --- a/server/src/system/tps_message.rs +++ b/server/src/system/tps_message.rs @@ -4,10 +4,7 @@ use tracing::instrument; use crate::{bytes_to_mb, FullEntityPose, Player, StatsEvent}; #[instrument(skip_all, name = "tps_message")] -pub fn tps_message( - r: Receiver, - mut players: Fetcher<(&mut Player, &FullEntityPose)> -) { +pub fn tps_message(r: Receiver, mut players: Fetcher<(&mut Player, &FullEntityPose)>) { let StatsEvent { ms_per_tick_mean_1s, ms_per_tick_mean_5s, @@ -20,7 +17,8 @@ pub fn tps_message( players.iter_mut().for_each(|(player, _)| { // make sexy with stddev & mean symbol - let message = format!("µms {ms_per_tick_mean_1s:.2} {ms_per_tick_mean_5s:.2}, {resident:.2}MiB"); + let message = + format!("µms {ms_per_tick_mean_1s:.2} {ms_per_tick_mean_5s:.2}, {resident:.2}MiB"); // todo: handle error let _ = player.packets.writer.send_chat_message(&message); From b73606c17a1d14d48088b4a1807ec368b32394e6 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 25 Mar 2024 23:54:46 -0500 Subject: [PATCH 3/5] remove mimimalloc from server --- Cargo.lock | 20 -------------------- server/Cargo.toml | 3 ++- server/benches/many_zombies.rs | 6 ------ 3 files changed, 2 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7758bb57..cc2f2e9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,16 +1050,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libmimalloc-sys" -version = "0.1.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -1125,15 +1115,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mimalloc" -version = "0.1.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa01922b5ea280a911e323e4d2fd24b7fe5cc4042e0d2cda3c40775cdc4bdc9c" -dependencies = [ - "libmimalloc-sys", -] - [[package]] name = "miniz_oxide" version = "0.7.2" @@ -1732,7 +1713,6 @@ dependencies = [ "itertools", "jemalloc-ctl", "jemallocator", - "mimalloc", "monoio", "ndarray", "pprof", diff --git a/server/Cargo.toml b/server/Cargo.toml index 8e4b2f8f..092533a3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -54,7 +54,8 @@ jemalloc-ctl = "0.5.4" # removing this because jemalloc-ctl is nice for memory stats also # https://github.com/rust-lang/rust-analyzer/issues/1441#issuecomment-509506279 # jemalloc uses significantly less memory -mimalloc = { version = "0.1.39" , default-features = false } +#mimalloc = { version = "0.1.39" , default-features = false } + thread_local = { version = "1.1.8", features = ["nightly"] } diff --git a/server/benches/many_zombies.rs b/server/benches/many_zombies.rs index 34ea1e2b..ef95da91 100644 --- a/server/benches/many_zombies.rs +++ b/server/benches/many_zombies.rs @@ -9,12 +9,6 @@ use divan::Bencher; use server::{bounding_box::BoundingBox, FullEntityPose, Game, InitEntity, Targetable}; use valence_protocol::math::DVec3; -// #[global_allocator] -// static ALLOC: AllocProfiler = AllocProfiler::system(); - -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - fn main() { // Run registered benchmarks. divan::main(); From f6972ce07cc6529e5c36d28018ddb7f6ea722d27 Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Mon, 25 Mar 2024 23:56:40 -0500 Subject: [PATCH 4/5] test use nightly --- .github/workflows/build.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d88ed2b5..768eac68 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,6 +40,8 @@ jobs: - name: Setup Rust toolchain and cache uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: ${{ env.RUST_NIGHTLY_TOOLCHAIN }} - name: Run cargo test run: cargo test --workspace --all-features From e471b13a69ab8c07ddf6e7a3e297cb455353838e Mon Sep 17 00:00:00 2001 From: Andrew Gazelka Date: Tue, 26 Mar 2024 00:29:54 -0500 Subject: [PATCH 5/5] use #[thread_local] instead of thread_local crate --- Cargo.lock | 1 - server/Cargo.toml | 3 - server/src/lib.rs | 2 +- server/src/singleton/encoder.rs | 78 +++++++++++++++++++++----- server/src/system/entity_move_logic.rs | 8 +-- 5 files changed, 70 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc2f2e9c..3011bc95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1724,7 +1724,6 @@ dependencies = [ "signal-hook", "smallvec 2.0.0-alpha.5", "thread-priority", - "thread_local", "tracing", "tracing-flame", "tracing-subscriber", diff --git a/server/Cargo.toml b/server/Cargo.toml index 092533a3..c2f4cff8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -56,9 +56,6 @@ jemalloc-ctl = "0.5.4" # jemalloc uses significantly less memory #mimalloc = { version = "0.1.39" , default-features = false } -thread_local = { version = "1.1.8", features = ["nightly"] } - - [lints.rust] warnings = "deny" diff --git a/server/src/lib.rs b/server/src/lib.rs index ced69a51..6c0ed1f2 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -178,7 +178,7 @@ impl Game { world.insert(bounding_boxes, bounding_box::EntityBoundingBoxes::default()); let encoder = world.spawn(); - world.insert(encoder, Encoder::default()); + world.insert(encoder, Encoder); let mut game = Self { world, diff --git a/server/src/singleton/encoder.rs b/server/src/singleton/encoder.rs index 23c0904e..0572894c 100644 --- a/server/src/singleton/encoder.rs +++ b/server/src/singleton/encoder.rs @@ -2,19 +2,64 @@ // https://matklad.github.io/2020/10/03/fast-thread-locals-in-rust.html use std::cell::UnsafeCell; +use anyhow::ensure; +use bytes::{BufMut, Bytes}; use evenio::component::Component; -use thread_local::ThreadLocal; -use valence_protocol::{Encode, Packet, PacketEncoder}; +use valence_protocol::{Encode, Packet, VarInt}; -#[derive(Default, Component)] -pub struct Encoder { - local: ThreadLocal>, +#[derive(Default)] +struct ConstPacketEncoder { + buf: Vec, } +impl ConstPacketEncoder { + pub const fn new() -> Self { + Self { buf: Vec::new() } + } + + pub fn append_packet

(&mut self, pkt: &P) -> anyhow::Result<()> + where + P: Packet + Encode, + { + let start_len = self.buf.len(); + + pkt.encode_with_id((&mut self.buf).writer())?; + + let data_len = self.buf.len() - start_len; + + let packet_len = data_len; + + ensure!( + packet_len <= valence_protocol::MAX_PACKET_SIZE as usize, + "packet exceeds maximum length" + ); + + #[allow(clippy::cast_possible_wrap)] + let packet_len_size = VarInt(packet_len as i32).written_size(); + + self.buf.put_bytes(0, packet_len_size); + self.buf + .copy_within(start_len..start_len + data_len, start_len + packet_len_size); + + #[allow(clippy::indexing_slicing)] + let front = &mut self.buf[start_len..]; + + #[allow(clippy::cast_possible_wrap)] + VarInt(packet_len as i32).encode(front)?; + + Ok(()) + } +} + +#[thread_local] +static ENCODER: UnsafeCell = UnsafeCell::new(ConstPacketEncoder::new()); + +#[derive(Component)] +pub struct Encoder; + impl Encoder { + #[allow(clippy::unused_self)] pub fn append(&self, packet: &P) -> anyhow::Result<()> { - let encoder = self.local.get_or_default(); - // Safety: // The use of `unsafe` here is justified by the guarantees provided by the `ThreadLocal` and // `UnsafeCell` usage patterns: @@ -34,15 +79,22 @@ impl Encoder { // Therefore, the use of `unsafe` is encapsulated within this method and does not leak // unsafe guarantees to the caller, provided the `Encoder` struct itself is used in a // thread-safe manner. - let encoder = unsafe { &mut *encoder.get() }; + let encoder = unsafe { &mut *ENCODER.get() }; encoder.append_packet(packet) } - pub fn drain(&mut self) -> impl Iterator + '_ { - self.local.iter_mut().map(|encoder| { - let encoder = encoder.get_mut(); - encoder.take().freeze() - }) + #[allow(clippy::unused_self)] + pub fn par_drain(&self, f: F) + where + F: Fn(Bytes) + Sync, + { + rayon::broadcast(move |_| { + // Safety: + // ditto + let encoder = unsafe { &mut *ENCODER.get() }; + let buf = core::mem::take(&mut encoder.buf); + f(Bytes::from(buf)); + }); } } diff --git a/server/src/system/entity_move_logic.rs b/server/src/system/entity_move_logic.rs index 687ad63c..8348753f 100644 --- a/server/src/system/entity_move_logic.rs +++ b/server/src/system/entity_move_logic.rs @@ -121,9 +121,9 @@ pub fn entity_move_logic( encoder.append(&look).unwrap(); }); - for bytes in encoder.drain() { - player.par_iter().for_each(|(_, player, _)| { + encoder.par_drain(|bytes| { + for (_, player, _) in &player { let _ = player.packets.writer.send_raw(bytes.clone()); - }); - } + } + }); }