Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use local_thread for incoming io #50

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ skip = [
"futures-lite",
"fastrand",
"event-listener",
"base64"
"base64",
"regex-syntax",
"regex-automata"
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" },
]
# Similarly to `skip` allows you to skip certain crates during duplicate
Expand All @@ -244,7 +246,7 @@ unknown-git = "warn"
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
# List of URLs for allowed Git repositories
allow-git = [
"https://github.com/rj00a/evenio",
"https://github.com/andrewgazelka/evenio",
"https://github.com/servo/rust-smallvec"
]

Expand Down
58 changes: 37 additions & 21 deletions server/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::module_name_repetitions)]

use std::{borrow::Cow, collections::BTreeSet, io, io::ErrorKind};
use std::{borrow::Cow, cell::UnsafeCell, collections::BTreeSet, io, io::ErrorKind, sync::Mutex};

use anyhow::{ensure, Context};
use base64::Engine;
Expand Down Expand Up @@ -54,6 +54,7 @@ fn offline_uuid(username: &str) -> anyhow::Result<Uuid> {
pub struct ClientConnection {
pub packets: Packets,
pub name: Box<str>,
pub uuid: Uuid,
}

pub struct Io {
Expand All @@ -77,8 +78,6 @@ pub struct WriterComm {
enc: PacketEncoder,
}

type ReaderComm = flume::Receiver<PacketFrame>;

impl WriterComm {
pub fn serialize<P>(&mut self, pkt: &P) -> anyhow::Result<bytes::Bytes>
where
Expand Down Expand Up @@ -172,6 +171,11 @@ impl WriterComm {
}
}

pub struct UserPacketFrame {
pub packet: PacketFrame,
pub user: Uuid,
}

impl IoRead {
pub async fn recv_packet_raw(&mut self) -> anyhow::Result<PacketFrame> {
loop {
Expand Down Expand Up @@ -212,7 +216,6 @@ impl IoWrite {

pub struct Packets {
pub writer: WriterComm,
pub reader: ReaderComm,
}

impl Io {
Expand All @@ -224,7 +227,6 @@ impl Io {
if let Some(frame) = self.dec.try_next_packet()? {
self.frame = frame;
let decode: P = self.frame.decode()?;
// info!("read packet {decode:#?}");
return Ok(decode);
}

Expand Down Expand Up @@ -340,8 +342,10 @@ impl Io {

let username: Box<str> = Box::from(username.0);

let uuid = offline_uuid(&username)?; // todo: random

let packet = LoginSuccessS2c {
uuid: offline_uuid(&username)?,
uuid,
username: Bounded::from(&*username),
properties: Cow::default(),
};
Expand All @@ -351,7 +355,6 @@ impl Io {

// bound at 1024 packets
let (s2c_tx, s2c_rx) = flume::unbounded();
let (c2s_tx, c2s_rx) = flume::unbounded();

let (read, write) = self.stream.into_split();

Expand All @@ -360,8 +363,6 @@ impl Io {
enc: self.enc,
};

let reader_comm = c2s_rx;

let mut io_write = IoWrite { write };

let mut io_read = IoRead {
Expand All @@ -372,12 +373,10 @@ impl Io {
info!("Finished handshake for {username}");

monoio::spawn(async move {
debug!("start receiving packets");
while let Ok(raw) = io_read.recv_packet_raw().await {
if let Err(e) = c2s_tx.send(raw) {
error!("{e:?}");
break;
}
while let Ok(packet) = io_read.recv_packet_raw().await {
#[allow(clippy::undocumented_unsafe_blocks)]
let packets = unsafe { &mut *LOCAL_PACKETS.get() };
packets.push(UserPacketFrame { packet, user: uuid });
}
});

Expand All @@ -392,12 +391,12 @@ impl Io {

let packets = Packets {
writer: writer_comm,
reader: reader_comm,
};

let conn = ClientConnection {
packets,
name: username,
uuid,
};

tx.send(conn).unwrap();
Expand Down Expand Up @@ -466,7 +465,12 @@ async fn print_errors(future: impl core::future::Future<Output = anyhow::Result<
}
}

async fn run(tx: flume::Sender<ClientConnection>) {
#[thread_local]
static LOCAL_PACKETS: UnsafeCell<Vec<UserPacketFrame>> = UnsafeCell::new(Vec::new());

pub static GLOBAL_PACKETS: Mutex<Vec<UserPacketFrame>> = Mutex::new(Vec::new());

async fn run(tx: flume::Sender<ClientConnection>, update_global: flume::Receiver<()>) {
// start socket 25565
// todo: remove unwrap
let addr = "0.0.0.0:25565";
Expand All @@ -483,6 +487,15 @@ async fn run(tx: flume::Sender<ClientConnection>) {

let mut id = 0;

monoio::spawn(async move {
while update_global.recv_async().await == Ok(()) {
#[allow(clippy::undocumented_unsafe_blocks)]
let packets = unsafe { &mut *LOCAL_PACKETS.get() };
let mut global_packets = GLOBAL_PACKETS.lock().unwrap();
global_packets.append(packets);
}
});

// accept incoming connections
loop {
let Ok((stream, _)) = listener.accept().await else {
Expand All @@ -504,8 +517,11 @@ async fn run(tx: flume::Sender<ClientConnection>) {
}
}

pub fn server(shutdown: flume::Receiver<()>) -> anyhow::Result<flume::Receiver<ClientConnection>> {
let (tx, rx) = flume::unbounded();
pub fn server(
shutdown: flume::Receiver<()>,
) -> anyhow::Result<(flume::Receiver<ClientConnection>, flume::Sender<()>)> {
let (connection_tx, connection_rx) = flume::unbounded();
let (update_global_tx, update_global_rx) = flume::unbounded();

std::thread::Builder::new()
.name("io".to_string())
Expand All @@ -515,7 +531,7 @@ pub fn server(shutdown: flume::Receiver<()>) -> anyhow::Result<flume::Receiver<C
.unwrap();

runtime.block_on(async move {
let run = run(tx);
let run = run(connection_tx, update_global_rx);
let shutdown = shutdown.recv_async();

monoio::select! {
Expand All @@ -526,7 +542,7 @@ pub fn server(shutdown: flume::Receiver<()>) -> anyhow::Result<flume::Receiver<C
})
.context("failed to spawn io thread")?;

Ok(rx)
Ok((connection_rx, update_global_tx))
}

fn registry_codec_raw(codec: &RegistryCodec) -> anyhow::Result<Compound> {
Expand Down
62 changes: 43 additions & 19 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use valence_protocol::math::DVec3;

use crate::{
bounding_box::BoundingBox,
io::{server, ClientConnection, Packets},
singleton::encoder::Encoder,
io::{server, ClientConnection, Packets, GLOBAL_PACKETS},
singleton::{encoder::Encoder, player_lookup::PlayerLookup},
};

mod global;
Expand Down Expand Up @@ -55,6 +55,7 @@ struct InitPlayer {
entity: EntityId,
io: Packets,
name: Box<str>,
uuid: uuid::Uuid,
pos: FullEntityPose,
}

Expand Down Expand Up @@ -124,6 +125,7 @@ pub struct Game {
last_ms_per_tick: VecDeque<f64>,
tick_on: u64,
incoming: flume::Receiver<ClientConnection>,
req_packets: flume::Sender<()>,
}

impl Game {
Expand Down Expand Up @@ -157,7 +159,7 @@ impl Game {
}
});

let server = server(shutdown_rx)?;
let (incoming, req_packets) = server(shutdown_rx)?;

let mut world = World::new();

Expand All @@ -180,12 +182,16 @@ impl Game {
let encoder = world.spawn();
world.insert(encoder, Encoder);

let lookup = world.spawn();
world.insert(lookup, PlayerLookup::default());

let mut game = Self {
world,
last_ticks: VecDeque::default(),
last_ms_per_tick: VecDeque::default(),
tick_on: 0,
incoming: server,
incoming,
req_packets,
};

game.last_ticks.push_back(Instant::now());
Expand Down Expand Up @@ -238,14 +244,19 @@ impl Game {
}

while let Ok(connection) = self.incoming.try_recv() {
let ClientConnection { packets, name } = connection;
let ClientConnection {
packets,
name,
uuid,
} = connection;

let player = self.world.spawn();

let event = InitPlayer {
entity: player,
io: packets,
name,
uuid,
pos: FullEntityPose {
position: DVec3::new(0.0, 2.0, 0.0),
bounding: BoundingBox::create(DVec3::new(0.0, 2.0, 0.0), 0.6, 1.8),
Expand All @@ -259,6 +270,8 @@ impl Game {

self.world.send(Gametick);

self.req_packets.send(()).unwrap();

let ms = now.elapsed().as_nanos() as f64 / 1_000_000.0;
self.last_ms_per_tick.push_back(ms);

Expand Down Expand Up @@ -304,26 +317,37 @@ impl Game {
fn process_packets(
_: Receiver<Gametick>,
mut fetcher: Fetcher<(EntityId, &mut Player, &mut FullEntityPose)>,
lookup: Single<&PlayerLookup>,
mut sender: Sender<(KickPlayer, InitEntity, KillAllEntities)>,
) {
// todo: flume the best things to use here? also this really ust needs to be mpsc not mpmc
// let (tx, rx) = flume::unbounded();
// uuid to entity id map

fetcher.iter_mut().for_each(|(_id, player, position)| {
// info!("Processing packets for player: {:?}", id);
while let Ok(packet) = player.packets.reader.try_recv() {
// info!("Received packet: {:?}", packet);
if let Err(e) = packets::switch(packet, player, position, &mut sender) {
let reason = format!("error: {e}");
let packets: Vec<_> = {
let mut packets = GLOBAL_PACKETS.lock().unwrap();
core::mem::take(&mut packets)
};

// todo: handle error
let _ = player.packets.writer.send_chat_message(&reason);
let lookup = lookup.0;

warn!("invalid packet: {reason}");
// let _ = tx.send(KickPlayer { target: id, reason });
}
for packet in packets {
let id = packet.user;
let Some(&user) = lookup.get(&id) else { return };

let Ok((_, player, position)) = fetcher.get_mut(user) else {
return;
};

let packet = packet.packet;

if let Err(e) = packets::switch(packet, player, position, &mut sender) {
let reason = format!("error: {e}");

// todo: handle error
let _ = player.packets.writer.send_chat_message(&reason);

warn!("invalid packet: {reason}");
}
});
}
}

static SHUTDOWN: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
Expand Down
1 change: 1 addition & 0 deletions server/src/singleton.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod encoder;
pub mod player_lookup;
23 changes: 23 additions & 0 deletions server/src/singleton/player_lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::collections::HashMap;

use evenio::{entity::EntityId, prelude::Component};
use uuid::Uuid;

#[derive(Component, Default, Debug)]
pub struct PlayerLookup {
inner: HashMap<Uuid, EntityId>,
}

impl PlayerLookup {
pub fn insert(&mut self, uuid: Uuid, entity: EntityId) {
self.inner.insert(uuid, entity);
}

pub fn remove(&mut self, uuid: &Uuid) {
self.inner.remove(uuid);
}

pub fn get(&self, uuid: &Uuid) -> Option<&EntityId> {
self.inner.get(uuid)
}
}
7 changes: 6 additions & 1 deletion server/src/system/init_player.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use evenio::prelude::*;
use tracing::instrument;

use crate::{EntityReaction, FullEntityPose, InitPlayer, Player, PlayerJoinWorld, Targetable};
use crate::{
EntityReaction, FullEntityPose, InitPlayer, Player, PlayerJoinWorld, Targetable, Uuid,
};

#[instrument(skip_all)]
pub fn init_player(
Expand All @@ -10,6 +12,7 @@ pub fn init_player(
Insert<FullEntityPose>,
Insert<Player>,
Insert<EntityReaction>,
Insert<Uuid>,
Insert<Targetable>,
PlayerJoinWorld,
)>,
Expand All @@ -21,6 +24,7 @@ pub fn init_player(
entity,
io,
name,
uuid,
pos,
} = event;

Expand All @@ -32,6 +36,7 @@ pub fn init_player(
name,
locale: None,
});
s.insert(entity, Uuid(uuid));

s.insert(entity, EntityReaction::default());

Expand Down
Loading