Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize integrate_generated_columns #1895

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions crates/bindings-macro/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,11 @@ pub(crate) fn table_impl(mut args: TableArgs, mut item: MutItem<syn::DeriveInput
let integrate_gen_col = sequenced_columns.iter().map(|col| {
let field = col.field.ident.unwrap();
quote_spanned!(field.span()=>
if spacetimedb::table::IsSequenceTrigger::is_sequence_trigger(&_row.#field) {
_row.#field = spacetimedb::sats::bsatn::from_reader(_in).unwrap();
}
spacetimedb::table::SequenceTrigger::maybe_decode_into(&mut __row.#field, &mut __generated_cols);
Centril marked this conversation as resolved.
Show resolved Hide resolved
)
});
let integrate_generated_columns = quote_spanned!(item.span() =>
fn integrate_generated_columns(_row: &mut #row_type, mut _generated_cols: &[u8]) {
let mut _in = &mut _generated_cols;
fn integrate_generated_columns(__row: &mut #row_type, mut __generated_cols: &[u8]) {
#(#integrate_gen_col)*
}
);
Expand Down
61 changes: 50 additions & 11 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;
use std::marker::PhantomData;
use std::{fmt, ops};

use spacetimedb_lib::buffer::{BufReader, Cursor};
use spacetimedb_lib::buffer::{BufReader, Cursor, DecodeError};
use spacetimedb_lib::sats::{i256, u256};

pub use spacetimedb_lib::db::raw_def::v9::TableAccess;
Expand Down Expand Up @@ -755,39 +755,78 @@ impl_terminator!(
// impl<T, U, V> BTreeIndexBounds<(T, U, V)> for (T, U, Range<V>) {}
// impl<T, U, V> BTreeIndexBounds<(T, U, V)> for (T, U, V) {}

/// A trait for types that know if their value will trigger a sequence.
/// A trait for types that can have a sequence based on them.
/// This is used for auto-inc columns to determine if an insertion of a row
/// will require the column to be updated in the row.
///
/// For now, this is equivalent to a "is zero" test.
pub trait IsSequenceTrigger {
pub trait SequenceTrigger: Sized {
/// Is this value one that will trigger a sequence, if any,
/// when used as a column value.
fn is_sequence_trigger(&self) -> bool;
/// BufReader::get_[< self >]
fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError>;
/// Read a generated column from the slice, if this row was a sequence trigger.
#[inline(always)]
fn maybe_decode_into(&mut self, gen_cols: &mut &[u8]) {
if self.is_sequence_trigger() {
*self = Self::decode(gen_cols).unwrap_or_else(|_| sequence_decode_error())
}
}
}

#[cold]
#[inline(never)]
fn sequence_decode_error() -> ! {
unreachable!("a row was a sequence trigger but there was no generated column for it.")
}

macro_rules! impl_is_seq_trigger {
($($t:ty),*) => {
macro_rules! impl_seq_trigger {
($($get:ident($t:ty),)*) => {
$(
impl IsSequenceTrigger for $t {
impl SequenceTrigger for $t {
#[inline(always)]
fn is_sequence_trigger(&self) -> bool { *self == 0 }
#[inline(always)]
fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
reader.$get()
}
}
)*
};
}

impl_is_seq_trigger![u8, i8, u16, i16, u32, i32, u64, i64, u128, i128];
impl_seq_trigger!(
get_u8(u8),
get_i8(i8),
get_u16(u16),
get_i16(i16),
get_u32(u32),
get_i32(i32),
get_u64(u64),
get_i64(i64),
get_u128(u128),
get_i128(i128),
);

impl IsSequenceTrigger for crate::sats::i256 {
impl SequenceTrigger for crate::sats::i256 {
#[inline(always)]
fn is_sequence_trigger(&self) -> bool {
*self == Self::ZERO
}
#[inline(always)]
fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
reader.get_i256()
}
}

impl IsSequenceTrigger for crate::sats::u256 {
impl SequenceTrigger for crate::sats::u256 {
#[inline(always)]
fn is_sequence_trigger(&self) -> bool {
*self == Self::ZERO
}
#[inline(always)]
fn decode(reader: &mut &[u8]) -> Result<Self, DecodeError> {
reader.get_u256()
}
}

/// Insert a row of type `T` into the table identified by `table_id`.
Expand Down
2 changes: 1 addition & 1 deletion crates/commitlog/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<const N: usize> Decoder for ArrayDecoder<N> {
_tx_offset: u64,
reader: &mut R,
) -> Result<Self::Record, Self::Error> {
Ok(reader.get_array()?)
Ok(*reader.get_array()?)
}

fn skip_record<'a, R: BufReader<'a>>(
Expand Down
155 changes: 107 additions & 48 deletions crates/sats/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use core::str::Utf8Error;
pub enum DecodeError {
/// Not enough data was provided in the input.
BufferLength {
for_type: String,
for_type: &'static str,
expected: usize,
given: usize,
},
Expand Down Expand Up @@ -126,117 +126,161 @@ pub trait BufWriter {
}
}

macro_rules! get_int {
($self:ident, $int:ident) => {
match $self.get_array_chunk() {
Some(&arr) => Ok($int::from_le_bytes(arr)),
None => Err(DecodeError::BufferLength {
for_type: stringify!($int),
expected: std::mem::size_of::<$int>(),
given: $self.remaining(),
}),
}
};
}

/// A buffered reader of some kind.
///
/// The lifetime `'de` allows the output of deserialization to borrow from the input.
pub trait BufReader<'de> {
/// Reads and returns a byte slice of `.len() = size` advancing the cursor.
fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError>;
/// Reads and returns a chunk of `.len() = size` advancing the cursor iff `self.remaining() >= size`.
fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]>;

/// Returns the number of bytes left to read in the input.
fn remaining(&self) -> usize;

/// Reads and returns a chunk of `.len() = N` as an array, advancing the cursor.
#[inline]
fn get_array_chunk<const N: usize>(&mut self) -> Option<&'de [u8; N]> {
self.get_chunk(N)?.try_into().ok()
}

/// Reads and returns a byte slice of `.len() = size` advancing the cursor.
#[inline]
fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> {
self.get_chunk(size).ok_or(DecodeError::BufferLength {
for_type: "[u8]",
expected: size,
given: self.remaining(),
})
}

/// Reads an array of type `[u8; N]` from the input.
#[inline]
fn get_array<const N: usize>(&mut self) -> Result<&'de [u8; N], DecodeError> {
self.get_array_chunk().ok_or(DecodeError::BufferLength {
for_type: "[u8; _]",
expected: N,
given: self.remaining(),
})
}

/// Reads a `u8` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u8(&mut self) -> Result<u8, DecodeError> {
self.get_array().map(u8::from_le_bytes)
get_int!(self, u8)
}
coolreader18 marked this conversation as resolved.
Show resolved Hide resolved

/// Reads a `u16` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u16(&mut self) -> Result<u16, DecodeError> {
self.get_array().map(u16::from_le_bytes)
get_int!(self, u16)
}

/// Reads a `u32` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u32(&mut self) -> Result<u32, DecodeError> {
self.get_array().map(u32::from_le_bytes)
get_int!(self, u32)
}

/// Reads a `u64` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u64(&mut self) -> Result<u64, DecodeError> {
self.get_array().map(u64::from_le_bytes)
get_int!(self, u64)
}

/// Reads a `u128` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u128(&mut self) -> Result<u128, DecodeError> {
self.get_array().map(u128::from_le_bytes)
get_int!(self, u128)
}

/// Reads a `u256` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_u256(&mut self) -> Result<u256, DecodeError> {
self.get_array().map(u256::from_le_bytes)
get_int!(self, u256)
}

/// Reads an `i8` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i8(&mut self) -> Result<i8, DecodeError> {
self.get_array().map(i8::from_le_bytes)
get_int!(self, i8)
}

/// Reads an `i16` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i16(&mut self) -> Result<i16, DecodeError> {
self.get_array().map(i16::from_le_bytes)
get_int!(self, i16)
}

/// Reads an `i32` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i32(&mut self) -> Result<i32, DecodeError> {
self.get_array().map(i32::from_le_bytes)
get_int!(self, i32)
}

/// Reads an `i64` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i64(&mut self) -> Result<i64, DecodeError> {
self.get_array().map(i64::from_le_bytes)
get_int!(self, i64)
}

/// Reads an `i128` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i128(&mut self) -> Result<i128, DecodeError> {
self.get_array().map(i128::from_le_bytes)
get_int!(self, i128)
}

/// Reads an `i256` in little endian (LE) encoding from the input.
///
/// This method is provided for convenience
/// and is derived from [`get_slice`](BufReader::get_slice)'s definition.
/// and is derived from [`get_chunk`](BufReader::get_chunk)'s definition.
#[inline]
fn get_i256(&mut self) -> Result<i256, DecodeError> {
self.get_array().map(i256::from_le_bytes)
}

/// Reads an array of type `[u8; C]` from the input.
fn get_array<const C: usize>(&mut self) -> Result<[u8; C], DecodeError> {
let mut buf: [u8; C] = [0; C];
buf.copy_from_slice(self.get_slice(C)?);
Ok(buf)
get_int!(self, i256)
}
}

Expand Down Expand Up @@ -297,19 +341,25 @@ impl<W1: BufWriter, W2: BufWriter> BufWriter for TeeWriter<W1, W2> {
}

impl<'de> BufReader<'de> for &'de [u8] {
fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> {
#[inline]
fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]> {
// TODO: split_at_checked once our msrv >= 1.80
if self.len() < size {
return Err(DecodeError::BufferLength {
for_type: "[u8]".into(),
expected: size,
given: self.len(),
});
return None;
}
let (ret, rest) = self.split_at(size);
*self = rest;
Ok(ret)
Some(ret)
}

#[inline]
fn get_array_chunk<const N: usize>(&mut self) -> Option<&'de [u8; N]> {
let (ret, rest) = self.split_first_chunk()?;
*self = rest;
Some(ret)
}

#[inline(always)]
fn remaining(&self) -> usize {
self.len()
}
Expand All @@ -334,19 +384,28 @@ impl<I> Cursor<I> {
}

impl<'de, I: AsRef<[u8]>> BufReader<'de> for &'de Cursor<I> {
fn get_slice(&mut self, size: usize) -> Result<&'de [u8], DecodeError> {
#[inline]
fn get_chunk(&mut self, size: usize) -> Option<&'de [u8]> {
// "Read" the slice `buf[pos..size]`.
let buf = &self.buf.as_ref()[self.pos.get()..];
let ret = buf.get(..size).ok_or_else(|| DecodeError::BufferLength {
for_type: "Cursor".into(),
expected: size,
given: buf.len(),
})?;
let ret = buf.get(..size)?;

// Advance the cursor by `size` bytes.
self.pos.set(self.pos.get() + size);

Ok(ret)
Some(ret)
}

#[inline]
fn get_array_chunk<const N: usize>(&mut self) -> Option<&'de [u8; N]> {
// "Read" the slice `buf[pos..size]`.
let buf = &self.buf.as_ref()[self.pos.get()..];
let ret = buf.first_chunk()?;

// Advance the cursor by `size` bytes.
self.pos.set(self.pos.get() + N);

Some(ret)
}

fn remaining(&self) -> usize {
Expand Down
Loading
Loading