Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Tidy up the code of events #4713

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/run_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bevy_ecs::{
component::Component,
prelude::{ParallelSystemDescriptorCoercion, Res, RunCriteriaDescriptorCoercion},
schedule::{ShouldRun, Stage, SystemStage},
system::{IntoSystem, Query},
system::Query,
world::World,
};
use criterion::{criterion_group, criterion_main, Criterion};
Expand Down
202 changes: 94 additions & 108 deletions crates/bevy_ecs/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate as bevy_ecs;
use crate::system::{Local, Res, ResMut, SystemParam};
use bevy_utils::tracing::trace;
use std::ops::{Deref, DerefMut};
use std::{
fmt::{self},
hash::Hash,
Expand Down Expand Up @@ -56,12 +57,6 @@ struct EventInstance<E: Event> {
pub event: E,
}

#[derive(Debug)]
enum State {
A,
B,
}

/// An event collection that represents the events that occurred within the last two
/// [`Events::update`] calls.
/// Events can be written to using an [`EventWriter`]
Expand Down Expand Up @@ -135,39 +130,59 @@ enum State {
///
#[derive(Debug)]
pub struct Events<E: Event> {
events_a: Vec<EventInstance<E>>,
events_b: Vec<EventInstance<E>>,
a_start_event_count: usize,
b_start_event_count: usize,
/// Holds the oldest still active events.
/// Note that a.start_event_count + a.len() should always === events_b.start_event_count.
events_a: EventSequence<E>,
/// Holds the newer events.
events_b: EventSequence<E>,
event_count: usize,
state: State,
}

// Derived Default impl would incorrectly require E: Default
impl<E: Event> Default for Events<E> {
fn default() -> Self {
Events {
a_start_event_count: 0,
b_start_event_count: 0,
event_count: 0,
events_a: Vec::new(),
events_b: Vec::new(),
state: State::A,
Self {
events_a: Default::default(),
events_b: Default::default(),
event_count: Default::default(),
}
}
}

#[derive(Debug)]
struct EventSequence<E: Event> {
events: Vec<EventInstance<E>>,
start_event_count: usize,
}

// Derived Default impl would incorrectly require E: Default
impl<E: Event> Default for EventSequence<E> {
fn default() -> Self {
Self {
events: Default::default(),
start_event_count: Default::default(),
}
}
}

fn map_instance_event_with_id<E: Event>(event_instance: &EventInstance<E>) -> (&E, EventId<E>) {
(&event_instance.event, event_instance.event_id)
impl<E: Event> Deref for EventSequence<E> {
type Target = Vec<EventInstance<E>>;

fn deref(&self) -> &Self::Target {
&self.events
}
}

fn map_instance_event<E: Event>(event_instance: &EventInstance<E>) -> &E {
&event_instance.event
impl<E: Event> DerefMut for EventSequence<E> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.events
}
}

/// Reads events of type `T` in order and tracks which events have already been read.
#[derive(SystemParam)]
pub struct EventReader<'w, 's, E: Event> {
last_event_count: Local<'s, (usize, PhantomData<E>)>,
reader: Local<'s, ManualEventReader<E>>,
events: Res<'w, Events<E>>,
}

Expand Down Expand Up @@ -199,6 +214,7 @@ impl<'w, 's, E: Event> EventWriter<'w, 's, E> {
}
}

#[derive(Debug)]
pub struct ManualEventReader<E: Event> {
last_event_count: usize,
_marker: PhantomData<E>,
Expand All @@ -220,7 +236,7 @@ impl<E: Event> ManualEventReader<E> {
&'a mut self,
events: &'a Events<E>,
) -> impl DoubleEndedIterator<Item = &'a E> + ExactSizeIterator<Item = &'a E> {
internal_event_reader(&mut self.last_event_count, events).map(|(e, _)| e)
self.iter_with_id(events).map(|(e, _)| e)
}

/// See [`EventReader::iter_with_id`]
Expand All @@ -229,12 +245,34 @@ impl<E: Event> ManualEventReader<E> {
events: &'a Events<E>,
) -> impl DoubleEndedIterator<Item = (&'a E, EventId<E>)>
+ ExactSizeIterator<Item = (&'a E, EventId<E>)> {
internal_event_reader(&mut self.last_event_count, events)
// if the reader has seen some of the events in a buffer, find the proper index offset.
// otherwise read all events in the buffer
let a_index = (self.last_event_count).saturating_sub(events.events_a.start_event_count);
let b_index = (self.last_event_count).saturating_sub(events.events_b.start_event_count);
let a = events.events_a.get(a_index..).unwrap_or_default();
let b = events.events_b.get(b_index..).unwrap_or_default();
let unread_count = a.len() + b.len();
// Ensure `len` is implemented correctly
debug_assert_eq!(unread_count, self.len(events));
self.last_event_count = events.event_count - unread_count;
// Iterate the oldest first, then the newer events
let iterator = a.iter().chain(b.iter());
iterator
.map(|e| (&e.event, e.event_id))
.with_exact_size(unread_count)
.inspect(move |(_, id)| self.last_event_count = (id.id + 1).max(self.last_event_count))
}

/// See [`EventReader::len`]
pub fn len(&self, events: &Events<E>) -> usize {
internal_event_reader(&mut self.last_event_count.clone(), events).len()
// The number of events in this reader is the difference between the most recent event
// and the last event seen by it. This will be at most the number of events contained
// with the events (any others have already been dropped)
// TODO: Warn when there are dropped events, or return e.g. a `Result<usize, (usize, usize)>`
events
.event_count
.saturating_sub(self.last_event_count)
.min(events.len())
}

/// See [`EventReader::is_empty`]
Expand All @@ -243,39 +281,6 @@ impl<E: Event> ManualEventReader<E> {
}
}

/// Like [`iter_with_id`](EventReader::iter_with_id) except not emitting any traces for read
/// messages.
fn internal_event_reader<'a, E: Event>(
last_event_count: &'a mut usize,
events: &'a Events<E>,
) -> impl DoubleEndedIterator<Item = (&'a E, EventId<E>)> + ExactSizeIterator<Item = (&'a E, EventId<E>)>
{
// if the reader has seen some of the events in a buffer, find the proper index offset.
// otherwise read all events in the buffer
let a_index = if *last_event_count > events.a_start_event_count {
*last_event_count - events.a_start_event_count
} else {
0
};
let b_index = if *last_event_count > events.b_start_event_count {
*last_event_count - events.b_start_event_count
} else {
0
};
let a = events.events_a.get(a_index..).unwrap_or_default();
let b = events.events_b.get(b_index..).unwrap_or_default();
let unread_count = a.len() + b.len();
*last_event_count = events.event_count - unread_count;
let iterator = match events.state {
State::A => b.iter().chain(a.iter()),
State::B => a.iter().chain(b.iter()),
};
iterator
.map(map_instance_event_with_id)
.with_exact_size(unread_count)
.inspect(move |(_, id)| *last_event_count = (id.id + 1).max(*last_event_count))
}

trait IteratorExt {
fn with_exact_size(self, len: usize) -> ExactSize<Self>
where
Expand Down Expand Up @@ -343,15 +348,15 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> {
&mut self,
) -> impl DoubleEndedIterator<Item = (&E, EventId<E>)> + ExactSizeIterator<Item = (&E, EventId<E>)>
{
internal_event_reader(&mut self.last_event_count.0, &self.events).map(|(event, id)| {
self.reader.iter_with_id(&self.events).map(|r @ (_, id)| {
trace!("EventReader::iter() -> {}", id);
(event, id)
r
})
}

/// Determines the number of events available to be read from this [`EventReader`] without consuming any.
pub fn len(&self) -> usize {
internal_event_reader(&mut self.last_event_count.0.clone(), &self.events).len()
self.reader.len(&self.events)
}

/// Determines if are any events available to be read without consuming any.
Expand All @@ -372,11 +377,7 @@ impl<E: Event> Events<E> {

let event_instance = EventInstance { event_id, event };

match self.state {
State::A => self.events_a.push(event_instance),
State::B => self.events_b.push(event_instance),
}

self.events_b.push(event_instance);
self.event_count += 1;
}

Expand All @@ -390,36 +391,28 @@ impl<E: Event> Events<E> {

/// Gets a new [`ManualEventReader`]. This will include all events already in the event buffers.
pub fn get_reader(&self) -> ManualEventReader<E> {
ManualEventReader {
last_event_count: 0,
_marker: PhantomData,
}
ManualEventReader::default()
}

/// Gets a new [`ManualEventReader`]. This will ignore all events already in the event buffers.
/// It will read all future events.
pub fn get_reader_current(&self) -> ManualEventReader<E> {
ManualEventReader {
last_event_count: self.event_count,
_marker: PhantomData,
..Default::default()
}
}

/// Swaps the event buffers and clears the oldest event buffer. In general, this should be
/// called once per frame/update.
pub fn update(&mut self) {
match self.state {
State::A => {
self.events_b.clear();
self.state = State::B;
self.b_start_event_count = self.event_count;
}
State::B => {
self.events_a.clear();
self.state = State::A;
self.a_start_event_count = self.event_count;
}
}
std::mem::swap(&mut self.events_a, &mut self.events_b);
self.events_b.clear();
self.events_b.start_event_count = self.event_count;
debug_assert_eq!(
self.events_a.start_event_count + self.events_a.len(),
self.events_b.start_event_count
);
}

/// A system that calls [`Events::update`] once per frame.
Expand All @@ -429,8 +422,8 @@ impl<E: Event> Events<E> {

#[inline]
fn reset_start_event_count(&mut self) {
self.a_start_event_count = self.event_count;
self.b_start_event_count = self.event_count;
self.events_a.start_event_count = self.event_count;
self.events_b.start_event_count = self.event_count;
}

/// Removes all events.
Expand All @@ -441,29 +434,26 @@ impl<E: Event> Events<E> {
self.events_b.clear();
}

#[inline]
pub fn len(&self) -> usize {
self.events_a.len() + self.events_b.len()
}

/// Returns true if there are no events in this collection.
#[inline]
pub fn is_empty(&self) -> bool {
self.events_a.is_empty() && self.events_b.is_empty()
self.len() == 0
}

/// Creates a draining iterator that removes all events.
pub fn drain(&mut self) -> impl Iterator<Item = E> + '_ {
self.reset_start_event_count();

let map = |i: EventInstance<E>| i.event;
match self.state {
State::A => self
.events_b
.drain(..)
.map(map)
.chain(self.events_a.drain(..).map(map)),
State::B => self
.events_a
.drain(..)
.map(map)
.chain(self.events_b.drain(..).map(map)),
}
// Drain the oldest events first, then the newest
self.events_a
.drain(..)
.chain(self.events_b.drain(..))
.map(|i| i.event)
}

/// Iterates over events that happened since the last "update" call.
Expand All @@ -475,10 +465,7 @@ impl<E: Event> Events<E> {
pub fn iter_current_update_events(
&self,
) -> impl DoubleEndedIterator<Item = &E> + ExactSizeIterator<Item = &E> {
match self.state {
State::A => self.events_a.iter().map(map_instance_event),
State::B => self.events_b.iter().map(map_instance_event),
}
self.events_b.iter().map(|i| &i.event)
}
}

Expand All @@ -497,10 +484,7 @@ impl<E: Event> std::iter::Extend<E> for Events<E> {
EventInstance { event_id, event }
});

match self.state {
State::A => self.events_a.extend(events),
State::B => self.events_b.extend(events),
}
self.events_b.extend(events);

trace!(
"Events::extend() -> ids: ({}..{})",
Expand Down Expand Up @@ -719,6 +703,8 @@ mod tests {
let mut events = Events::<TestEvent>::default();
events.send(TestEvent { i: 0 });
let reader = events.get_reader_current();
dbg!(&reader);
dbg!(&events);
assert!(reader.is_empty(&events));
events.send(TestEvent { i: 0 });
assert_eq!(reader.len(&events), 1);
Expand Down