Skip to content

Commit

Permalink
fix: chunk serialization and arc channel removing
Browse files Browse the repository at this point in the history
  • Loading branch information
Liyze09 committed Oct 29, 2024
1 parent d01b723 commit 7fea150
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 38 deletions.
30 changes: 22 additions & 8 deletions crates/kernel/src/util/arc_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::thread::yield_now;
use std::time::{Duration, Instant};

pub struct ArcChannel<T> {
receivers: Vec<Sender<T>>,
receivers: HashMap<usize, Sender<T>>,
}

struct Sender<T> {
Expand Down Expand Up @@ -35,33 +35,47 @@ impl<T> Default for ArcChannel<T> {
impl<T> ArcChannel<T> {
pub fn new() -> ArcChannel<T> {
ArcChannel {
receivers: Vec::new(),
receivers: HashMap::new(),
}
}
pub fn broadcast(&self, item: T) {
let arc = Arc::new(item);
for receiver in self.receivers.iter() {
receiver.queue.lock().push_back(arc.clone());
receiver.1.queue.lock().push_back(arc.clone());
}
}

pub fn subscribe_with_id(&mut self, id: usize) -> Receiver<T> {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let parent = &self.receivers as *const _ as usize;
let sender = Sender {
parent,
id,
queue: queue.clone(),
};
self.receivers.insert(id, sender);
Receiver { parent, id, queue }
}
pub fn subscribe(&mut self) -> Receiver<T> {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let id = self.receivers.len();
let parent = self.receivers.as_ptr() as usize;
let mut id = fastrand::usize(usize::MIN..usize::MAX);
while self.receivers.contains_key(&id) {
id = fastrand::usize(usize::MIN..usize::MAX);
}
let parent = &self.receivers as *const _ as usize;
let sender = Sender {
parent,
id,
queue: queue.clone(),
};
self.receivers.push(sender);
self.receivers.insert(id, sender);
Receiver { parent, id, queue }
}
pub fn remove(&mut self, receiver: &Receiver<T>) -> bool {
if receiver.parent != self.receivers.as_ptr() as usize {
if receiver.parent != &self.receivers as *const _ as usize {
return false;
}
self.receivers.remove(receiver.id);
self.receivers.remove(&receiver.id);
true
}
}
Expand Down
62 changes: 44 additions & 18 deletions crates/kernel/src/util/raw.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,12 @@
pub struct Raw<T>(*const T);
impl<T> Raw<T> {
pub fn from(value: &T) -> Raw<T> {
Raw(value)
}

pub fn get(&self) -> Option<&T> {
if self.0.is_null() {
None
} else {
unsafe { Some(&*self.0) }
}
}
pub fn into(self) -> Option<T> {
if self.0.is_null() {
None
} else {
let value = unsafe {
if self.0.is_aligned() {
std::ptr::read_volatile(self.0)
} else {
std::ptr::read_unaligned(self.0)
}
};
Some(value)
}
}
pub fn into_raw(self) -> *const T {
let ptr = self.0;
std::mem::forget(self);
Expand Down Expand Up @@ -58,5 +40,49 @@ impl<T> AsRef<T> for Raw<T> {
self.get().unwrap()
}
}
impl<'a, T> Into<Option<&'a T>> for Raw<T> {
fn into(self) -> Option<&'a T> {
if self.0.is_null() {
None
} else {
Some(unsafe { &*self.0 })
}
}
}
impl<T> From<*const T> for Raw<T> {
fn from(value: *const T) -> Self {
Raw(value)
}
}
impl<T> From<Option<T>> for Raw<T> {
fn from(value: Option<T>) -> Self {
match value {
Some(value) => Raw::from(&value),
None => Raw::null(),
}
}
}
impl<T> From<&T> for Raw<T> {
fn from(value: &T) -> Raw<T> {
Raw(value)
}
}

impl<T> Into<*const T> for Raw<T> {
fn into(self) -> *const T {
self.0
}
}
impl<T> std::fmt::Pointer for Raw<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Pointer::fmt(&self.0, f)
}
}
impl<T> std::fmt::Debug for Raw<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Pointer::fmt(&self.0, f)
}
}

unsafe impl<T> Send for Raw<T> {}
unsafe impl<T> Sync for Raw<T> {}
33 changes: 21 additions & 12 deletions crates/kernel/src/world/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::entity::player::Player;
use crate::entity::Entity;
use crate::util::arc_channel::ArcChannel;
use crate::util::io::WriteExt;
use crate::util::raw::Raw;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl Chunk {
///
/// # Returns
/// * `MutexGuard<HeightMap>` - A guard that provides mutable access to the height map,
/// ensuring that no other thread can modify it while the guard is held.
/// ensuring that no other thread can modify it while the guard is held.
#[inline]
pub fn get_world_surface(&self) -> MutexGuard<HeightMap> {
self.world_surface.lock()
Expand All @@ -74,7 +75,7 @@ impl Chunk {
///
/// # Returns
/// * `MutexGuard<HeightMap>` - A guard that provides mutable access to the height map,
/// ensuring that no other thread can modify it while the guard is held.
/// ensuring that no other thread can modify it while the guard is held.
#[inline]
pub fn get_motion_blocking(&self) -> MutexGuard<HeightMap> {
self.motion_blocking.lock()
Expand Down Expand Up @@ -110,7 +111,12 @@ impl Chunk {
}

pub fn player_enter(&self, player: &mut Player) {
player.recv.add(self.pos, self.channel.write().subscribe());
player.recv.add(
self.pos,
self.channel
.write()
.subscribe_with_id(player.get_eid() as usize),
);
}
pub fn player_exit(&self, player: &mut Player) {
if let Some(recv) = player.recv.remove(self.pos) {
Expand Down Expand Up @@ -212,7 +218,7 @@ impl Chunk {
///
/// # Returns
/// * `Option<u8>` - The light value of the block at the specified position, or `None`
/// if the position is invalid.
/// if the position is invalid.
pub fn get_sky_light(&self, x: i32, y: i32, z: i32) -> Option<u8> {
let idx = check_pos(x, y, z, self.height)?;
let section = unsafe { self.data.get_unchecked(idx) };
Expand Down Expand Up @@ -266,7 +272,7 @@ impl Chunk {
///
/// # Returns
/// * `Option<u8>` - The light value of the block at the specified position, or `None`
/// if the position is invalid.
/// if the position is invalid.
pub fn get_block_light(&self, x: i32, y: i32, z: i32) -> Option<u8> {
let idx = check_pos(x, y, z, self.height)?;
let section = unsafe { self.data.get_unchecked(idx) };
Expand Down Expand Up @@ -413,23 +419,26 @@ impl Chunk {
buffer
.write_bitset(&empty_block_light_mask.into_inner())
.await?;

buffer.write_var_int(sky_mask.len() as i32).await?;
for (index, section) in self.data.iter().enumerate() {
if sky[index] {
let temp = {
let temp = section.sky_light.lock();
temp.to_vec()
};
buffer.write_var_int(2048).await?;
buffer
.write_all(section.sky_light.lock().as_slice())
.await?;
buffer.write_all(&temp).await?;
}
}
buffer.write_var_int(block_mask.len() as i32).await?;
for (index, section) in self.data.iter().enumerate() {
if block[index] {
let temp = {
let temp = section.block_light.lock();
temp.to_vec()
};
buffer.write_var_int(2048).await?;
buffer
.write_all(section.block_light.lock().as_slice())
.await?;
buffer.write_all(&temp).await?;
}
}
Ok(())
Expand Down

0 comments on commit 7fea150

Please sign in to comment.