From 47bf65f05fd21d15f5ba99e00a9f84a206cf0e07 Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 23 Oct 2024 14:10:55 -0500 Subject: [PATCH] Optimize integrate_generated_columns --- crates/bindings-macro/src/table.rs | 7 +- crates/bindings/src/table.rs | 61 +++++++++-- crates/commitlog/src/payload.rs | 2 +- crates/sats/src/buffer.rs | 155 +++++++++++++++++++--------- crates/standalone/src/control_db.rs | 4 +- 5 files changed, 162 insertions(+), 67 deletions(-) diff --git a/crates/bindings-macro/src/table.rs b/crates/bindings-macro/src/table.rs index 004e512e512..162f0cbeeb7 100644 --- a/crates/bindings-macro/src/table.rs +++ b/crates/bindings-macro/src/table.rs @@ -526,14 +526,11 @@ pub(crate) fn table_impl(mut args: TableArgs, mut item: MutItem - if spacetimedb::table::IsSequenceTrigger::is_sequence_trigger(&_row.#field) { - _row.#field = spacetimedb::sats::bsatn::from_reader(_in).unwrap(); - } + spacetimedb::table::SequenceTrigger::maybe_decode_into(&mut __row.#field, &mut __generated_cols); ) }); let integrate_generated_columns = quote_spanned!(item.span() => - fn integrate_generated_columns(_row: &mut #row_type, mut _generated_cols: &[u8]) { - let mut _in = &mut _generated_cols; + fn integrate_generated_columns(__row: &mut #row_type, mut __generated_cols: &[u8]) { #(#integrate_gen_col)* } ); diff --git a/crates/bindings/src/table.rs b/crates/bindings/src/table.rs index 2e8a1256a72..d1eb0269154 100644 --- a/crates/bindings/src/table.rs +++ b/crates/bindings/src/table.rs @@ -3,7 +3,7 @@ use std::convert::Infallible; use std::marker::PhantomData; use std::{fmt, ops}; -use spacetimedb_lib::buffer::{BufReader, Cursor}; +use spacetimedb_lib::buffer::{BufReader, Cursor, DecodeError}; use spacetimedb_lib::sats::{i256, u256}; pub use spacetimedb_lib::db::raw_def::v9::TableAccess; @@ -755,39 +755,78 @@ impl_terminator!( // impl BTreeIndexBounds<(T, U, V)> for (T, U, Range) {} // impl BTreeIndexBounds<(T, U, V)> for (T, U, V) {} -/// A trait for types that know if their value will trigger a sequence. +/// A trait for types that can have a sequence based on them. /// This is used for auto-inc columns to determine if an insertion of a row /// will require the column to be updated in the row. -/// -/// For now, this is equivalent to a "is zero" test. -pub trait IsSequenceTrigger { +pub trait SequenceTrigger: Sized { /// Is this value one that will trigger a sequence, if any, /// when used as a column value. fn is_sequence_trigger(&self) -> bool; + /// BufReader::get_[< self >] + fn decode(reader: &mut &[u8]) -> Result; + /// Read a generated column from the slice, if this row was a sequence trigger. + #[inline(always)] + fn maybe_decode_into(&mut self, gen_cols: &mut &[u8]) { + if self.is_sequence_trigger() { + *self = Self::decode(gen_cols).unwrap_or_else(|_| sequence_decode_error()) + } + } +} + +#[cold] +#[inline(never)] +fn sequence_decode_error() -> ! { + unreachable!("a row was a sequence trigger but there was no generated column for it.") } -macro_rules! impl_is_seq_trigger { - ($($t:ty),*) => { +macro_rules! impl_seq_trigger { + ($($get:ident($t:ty),)*) => { $( - impl IsSequenceTrigger for $t { + impl SequenceTrigger for $t { + #[inline(always)] fn is_sequence_trigger(&self) -> bool { *self == 0 } + #[inline(always)] + fn decode(reader: &mut &[u8]) -> Result { + reader.$get() + } } )* }; } -impl_is_seq_trigger![u8, i8, u16, i16, u32, i32, u64, i64, u128, i128]; +impl_seq_trigger!( + get_u8(u8), + get_i8(i8), + get_u16(u16), + get_i16(i16), + get_u32(u32), + get_i32(i32), + get_u64(u64), + get_i64(i64), + get_u128(u128), + get_i128(i128), +); -impl IsSequenceTrigger for crate::sats::i256 { +impl SequenceTrigger for crate::sats::i256 { + #[inline(always)] fn is_sequence_trigger(&self) -> bool { *self == Self::ZERO } + #[inline(always)] + fn decode(reader: &mut &[u8]) -> Result { + reader.get_i256() + } } -impl IsSequenceTrigger for crate::sats::u256 { +impl SequenceTrigger for crate::sats::u256 { + #[inline(always)] fn is_sequence_trigger(&self) -> bool { *self == Self::ZERO } + #[inline(always)] + fn decode(reader: &mut &[u8]) -> Result { + reader.get_u256() + } } /// Insert a row of type `T` into the table identified by `table_id`. diff --git a/crates/commitlog/src/payload.rs b/crates/commitlog/src/payload.rs index 5661eba672f..538405c7e7d 100644 --- a/crates/commitlog/src/payload.rs +++ b/crates/commitlog/src/payload.rs @@ -115,7 +115,7 @@ impl Decoder for ArrayDecoder { _tx_offset: u64, reader: &mut R, ) -> Result { - Ok(reader.get_array()?) + Ok(*reader.get_array()?) } fn skip_record<'a, R: BufReader<'a>>( diff --git a/crates/sats/src/buffer.rs b/crates/sats/src/buffer.rs index 9a246fdb827..231e638c50d 100644 --- a/crates/sats/src/buffer.rs +++ b/crates/sats/src/buffer.rs @@ -12,7 +12,7 @@ use core::str::Utf8Error; pub enum DecodeError { /// Not enough data was provided in the input. BufferLength { - for_type: String, + for_type: &'static str, expected: usize, given: usize, }, @@ -126,117 +126,161 @@ pub trait BufWriter { } } +macro_rules! get_int { + ($self:ident, $int:ident) => { + match $self.get_array_chunk() { + Some(&arr) => Ok($int::from_le_bytes(arr)), + None => Err(DecodeError::BufferLength { + for_type: stringify!($int), + expected: std::mem::size_of::<$int>(), + given: $self.remaining(), + }), + } + }; +} + /// A buffered reader of some kind. /// /// The lifetime `'de` allows the output of deserialization to borrow from the input. pub trait BufReader<'de> { - /// Reads and returns a byte slice of `.len() = size` advancing the cursor. - fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError>; + /// Reads and returns a chunk of `.len() = size` advancing the cursor iff `self.remaining() >= size`. + fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]>; /// Returns the number of bytes left to read in the input. fn remaining(&self) -> usize; + /// Reads and returns a chunk of `.len() = N` as an array, advancing the cursor. + #[inline] + fn get_array_chunk(&mut self) -> Option<&'de [u8; N]> { + self.get_chunk(N)?.try_into().ok() + } + + /// Reads and returns a byte slice of `.len() = size` advancing the cursor. + #[inline] + fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> { + self.get_chunk(size).ok_or(DecodeError::BufferLength { + for_type: "[u8]", + expected: size, + given: self.remaining(), + }) + } + + /// Reads an array of type `[u8; N]` from the input. + #[inline] + fn get_array(&mut self) -> Result<&'de [u8; N], DecodeError> { + self.get_array_chunk().ok_or(DecodeError::BufferLength { + for_type: "[u8; _]", + expected: N, + given: self.remaining(), + }) + } + /// Reads a `u8` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u8(&mut self) -> Result { - self.get_array().map(u8::from_le_bytes) + get_int!(self, u8) } /// Reads a `u16` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u16(&mut self) -> Result { - self.get_array().map(u16::from_le_bytes) + get_int!(self, u16) } /// Reads a `u32` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u32(&mut self) -> Result { - self.get_array().map(u32::from_le_bytes) + get_int!(self, u32) } /// Reads a `u64` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u64(&mut self) -> Result { - self.get_array().map(u64::from_le_bytes) + get_int!(self, u64) } /// Reads a `u128` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u128(&mut self) -> Result { - self.get_array().map(u128::from_le_bytes) + get_int!(self, u128) } /// Reads a `u256` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_u256(&mut self) -> Result { - self.get_array().map(u256::from_le_bytes) + get_int!(self, u256) } /// Reads an `i8` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i8(&mut self) -> Result { - self.get_array().map(i8::from_le_bytes) + get_int!(self, i8) } /// Reads an `i16` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i16(&mut self) -> Result { - self.get_array().map(i16::from_le_bytes) + get_int!(self, i16) } /// Reads an `i32` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i32(&mut self) -> Result { - self.get_array().map(i32::from_le_bytes) + get_int!(self, i32) } /// Reads an `i64` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i64(&mut self) -> Result { - self.get_array().map(i64::from_le_bytes) + get_int!(self, i64) } /// Reads an `i128` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i128(&mut self) -> Result { - self.get_array().map(i128::from_le_bytes) + get_int!(self, i128) } /// Reads an `i256` in little endian (LE) encoding from the input. /// /// This method is provided for convenience - /// and is derived from [`get_slice`](BufReader::get_slice)'s definition. + /// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition. + #[inline] fn get_i256(&mut self) -> Result { - self.get_array().map(i256::from_le_bytes) - } - - /// Reads an array of type `[u8; C]` from the input. - fn get_array(&mut self) -> Result<[u8; C], DecodeError> { - let mut buf: [u8; C] = [0; C]; - buf.copy_from_slice(self.get_slice(C)?); - Ok(buf) + get_int!(self, i256) } } @@ -297,19 +341,25 @@ impl BufWriter for TeeWriter { } impl<'de> BufReader<'de> for &'de [u8] { - fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> { + #[inline] + fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]> { + // TODO: split_at_checked once our msrv >= 1.80 if self.len() < size { - return Err(DecodeError::BufferLength { - for_type: "[u8]".into(), - expected: size, - given: self.len(), - }); + return None; } let (ret, rest) = self.split_at(size); *self = rest; - Ok(ret) + Some(ret) } + #[inline] + fn get_array_chunk(&mut self) -> Option<&'de [u8; N]> { + let (ret, rest) = self.split_first_chunk()?; + *self = rest; + Some(ret) + } + + #[inline(always)] fn remaining(&self) -> usize { self.len() } @@ -334,19 +384,28 @@ impl Cursor { } impl<'de, I: AsRef<[u8]>> BufReader<'de> for &'de Cursor { - fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> { + #[inline] + fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]> { // "Read" the slice `buf[pos..size]`. let buf = &self.buf.as_ref()[self.pos.get()..]; - let ret = buf.get(..size).ok_or_else(|| DecodeError::BufferLength { - for_type: "Cursor".into(), - expected: size, - given: buf.len(), - })?; + let ret = buf.get(..size)?; // Advance the cursor by `size` bytes. self.pos.set(self.pos.get() + size); - Ok(ret) + Some(ret) + } + + #[inline] + fn get_array_chunk(&mut self) -> Option<&'de [u8; N]> { + // "Read" the slice `buf[pos..size]`. + let buf = &self.buf.as_ref()[self.pos.get()..]; + let ret = buf.first_chunk()?; + + // Advance the cursor by `size` bytes. + self.pos.set(self.pos.get() + N); + + Some(ret) } fn remaining(&self) -> usize { diff --git a/crates/standalone/src/control_db.rs b/crates/standalone/src/control_db.rs index 4a2bfa7363d..d1384cacae8 100644 --- a/crates/standalone/src/control_db.rs +++ b/crates/standalone/src/control_db.rs @@ -399,7 +399,7 @@ impl ControlDb { } }; let arr = <[u8; 16]>::try_from(balance_entry.1.as_ref()).map_err(|_| bsatn::DecodeError::BufferLength { - for_type: "balance_entry".into(), + for_type: "balance_entry", expected: 16, given: balance_entry.1.len(), })?; @@ -421,7 +421,7 @@ impl ControlDb { let value = tree.get(identity.to_byte_array())?; if let Some(value) = value { let arr = <[u8; 16]>::try_from(value.as_ref()).map_err(|_| bsatn::DecodeError::BufferLength { - for_type: "Identity".into(), + for_type: "Identity", expected: 16, given: value.as_ref().len(), })?;