Skip to content

Commit

Permalink
Expose delete_by_rel (#515)
Browse files Browse the repository at this point in the history
* expose delete_by_rel

* delete_by_rel: address pheobe's smallvec comment request
  • Loading branch information
Centril authored and kulakowski committed Nov 7, 2023
1 parent 339ffc7 commit d1da33d
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ sha1 = "0.10.1"
sha3 = "0.10.0"
slab = "0.4.7"
sled = "0.34.7"
smallvec = { version = "1.11.1", features = ["union", "const_generics"] }
sqlparser = "0.38.0"
sqllogictest-engines = "0.17"
sqllogictest = "0.17"
Expand Down
36 changes: 36 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ pub mod raw {
out: *mut u32,
) -> u16;

/// Deletes those rows, in the table identified by `table_id`,
/// that match any row in `relation`.
///
/// Matching is defined by first BSATN-decoding
/// the byte string pointed to at by `relation` to a `Vec<ProductValue>`
/// according to the row schema of the table
/// and then using `Ord for AlgebraicValue`.
///
/// The number of rows deleted is written to the WASM pointer `out`.
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - `(relation, relation_len)` doesn't decode from BSATN to a `Vec<ProductValue>`
/// according to the `ProductValue` that the table's schema specifies for rows.
/// - `relation + relation_len` overflows a 64-bit integer
/// - writing to `out` would overflow a 32-bit integer
pub fn _delete_by_rel(table_id: TableId, relation: *const u8, relation_len: usize, out: *mut u32) -> u16;

/// Start iteration on each row, as bytes, of a table identified by `table_id`.
///
/// The iterator is registered in the host environment
Expand Down Expand Up @@ -593,6 +611,24 @@ pub fn delete_by_col_eq(table_id: TableId, col_id: ColId, value: &[u8]) -> Resul
unsafe { call(|out| raw::_delete_by_col_eq(table_id, col_id, value.as_ptr(), value.len(), out)) }
}

/// Deletes those rows, in the table identified by `table_id`,
/// that match any row in `relation`.
///
/// Matching is defined by first BSATN-decoding
/// the byte string pointed to at by `relation` to a `Vec<ProductValue>`
/// according to the row schema of the table
/// and then using `Ord for AlgebraicValue`.
///
/// Returns the number of rows deleted.
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - `(relation, relation_len)` doesn't decode from BSATN to a `Vec<ProductValue>`
#[inline]
pub fn delete_by_rel(table_id: TableId, relation: &[u8]) -> Result<u32, Errno> {
unsafe { call(|out| raw::_delete_by_rel(table_id, relation.as_ptr(), relation.len(), out)) }
}

/// Returns an iterator for each row, as bytes, of a table identified by `table_id`.
/// The rows can be put through an optional `filter`,
/// which is encoded in the embedded language defined by `spacetimedb_lib::filter::Expr`.
Expand Down
40 changes: 36 additions & 4 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use spacetimedb_lib::ser::Serialize;
use spacetimedb_lib::{bsatn, ColumnIndexAttribute, IndexType, PrimaryKey, ProductType, ProductValue};
use std::cell::RefCell;
use std::marker::PhantomData;
use std::slice::from_ref;
use std::{fmt, panic};

pub use spacetimedb_bindings_macro::{duration, query, spacetimedb, TableType};
Expand Down Expand Up @@ -206,14 +207,36 @@ pub fn delete_by_col_eq(table_id: TableId, col_id: u8, value: &impl Serialize) -
})
}

/*
pub fn delete_pk(table_id: u32, primary_key: &PrimaryKey) -> Result<()> {
/// Deletes those rows, in the table identified by `table_id`,
/// that match any row in `relation`.
///
/// The `relation` will be BSATN encoded to `[ProductValue]`
/// i.e., a list of product values, so each element in `relation`
/// must serialize to what a `ProductValue` would in BSATN.
///
/// Matching is then defined by first BSATN-decoding
/// the resulting bsatn to a `Vec<ProductValue>`
/// according to the row schema of the table
/// and then using `Ord for AlgebraicValue`.
///
/// Returns the number of rows deleted.
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - `(relation, relation_len)` doesn't decode from BSATN to a `Vec<ProductValue>`
///
/// Panics when serialization fails.
pub fn delete_by_rel(table_id: TableId, relation: &[impl Serialize]) -> Result<u32> {
with_row_buf(|bytes| {
primary_key.encode(bytes);
sys::delete_pk(table_id, bytes)
// Encode `value` as BSATN into `bytes` and then use that.
bsatn::to_writer(bytes, relation).unwrap();
sys::delete_by_rel(table_id, bytes)
})
}

/// A table iterator which yields values of the `TableType` corresponding to the table.
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;

pub fn delete_filter<F: Fn(&ProductValue) -> bool>(table_id: u32, f: F) -> Result<usize> {
with_row_buf(|bytes| {
let mut count = 0;
Expand Down Expand Up @@ -418,6 +441,15 @@ pub trait TableType: SpacetimeType + DeserializeOwned + Serialize {
fn iter_filtered(filter: spacetimedb_lib::filter::Expr) -> TableIter<Self> {
table_iter(Self::table_id().0, Some(filter)).unwrap()
}

/// Deletes this row `self` from the table.
///
/// Returns `true` if the row was deleted.
fn delete(&self) -> bool {
let count = delete_by_rel(Self::table_id(), from_ref(self)).unwrap();
debug_assert!(count < 2);
count == 1
}
}

mod sealed {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl RelationalDB {
bytes: &[u8],
) -> Result<AlgebraicValue, DBError> {
let schema = self.schema_for_column(tx, table_id, col_id)?;
Ok(AlgebraicValue::decode(&schema, &mut &bytes[..])?)
Ok(AlgebraicValue::decode(&schema, &mut &*bytes)?)
}

/// Begin a transaction.
Expand Down
22 changes: 21 additions & 1 deletion crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl InstanceEnv {
/// Deletes all rows in the table identified by `table_id`
/// where the column identified by `cols` equates to `value`.
///
/// Returns an error if no columns were deleted or if the column wasn't found.
/// Returns an error if no rows were deleted or if the column wasn't found.
#[tracing::instrument(skip(self, ctx, value))]
pub fn delete_by_col_eq(
&self,
Expand All @@ -177,6 +177,26 @@ impl InstanceEnv {
NonZeroU32::new(count).ok_or(NodesError::ColumnValueNotFound)
}

/// Deletes all rows in the table identified by `table_id`
/// where the rows match one in `relation`
/// which is a bsatn encoding of `Vec<ProductValue>`.
///
/// Returns an error if no rows were deleted.
#[tracing::instrument(skip(self, relation))]
pub fn delete_by_rel(&self, table_id: TableId, relation: &[u8]) -> Result<u32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;

// Find the row schema using it to decode a vector of product values.
let row_ty = stdb.row_schema_for_table(tx, table_id)?;
// `TableType::delete` cares about a single element
// so in that case we can avoid the allocation by using `smallvec`.
let relation = ProductValue::decode_smallvec(&row_ty, &mut &*relation).map_err(NodesError::DecodeRow)?;

// Delete them and return how many we deleted.
Ok(stdb.delete_by_rel(tx, table_id, relation))
}

/// Returns the `table_id` associated with the given `table_name`.
///
/// Errors with `TableNotFound` if the table does not exist.
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub enum AbiCall {
ConsoleLog,
CreateIndex,
DeleteByColEq,
DeleteByRel,
GetTableId,
Insert,
IterByColEq,
Expand Down
30 changes: 30 additions & 0 deletions crates/core/src/host/wasmer/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,36 @@ impl WasmInstanceEnv {
result
}

/// Deletes those rows, in the table identified by `table_id`,
/// that match any row in `relation`.
///
/// Matching is defined by first BSATN-decoding
/// the byte string pointed to at by `relation` to a `Vec<ProductValue>`
/// according to the row schema of the table
/// and then using `Ord for AlgebraicValue`.
///
/// The number of rows deleted is written to the WASM pointer `out`.
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - `(relation, relation_len)` doesn't decode from BSATN to a `Vec<ProductValue>`
/// according to the `ProductValue` that the table's schema specifies for rows.
/// - `relation + relation_len` overflows a 64-bit integer
/// - writing to `out` would overflow a 32-bit integer
#[tracing::instrument(skip_all)]
pub fn delete_by_rel(
caller: FunctionEnvMut<'_, Self>,
table_id: u32,
relation: WasmPtr<u8>,
relation_len: u32,
out: WasmPtr<u32>,
) -> RtResult<u16> {
Self::cvt_ret(caller, "delete_by_rel", AbiCall::DeleteByRel, out, |caller, mem| {
let relation = mem.read_bytes(&caller, relation, relation_len)?;
Ok(caller.data().instance_env.delete_by_rel(table_id.into(), &relation)?)
})
}

/// Queries the `table_id` associated with the given (table) `name`
/// where `name` points to a UTF-8 slice in WASM memory of `name_len` bytes.
///
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/host/wasmer/wasmer_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl WasmerModule {
env,
WasmInstanceEnv::delete_by_col_eq,
),
"_delete_by_rel" => Function::new_typed_with_env(
store,
env,
WasmInstanceEnv::delete_by_rel,
),
"_insert" => Function::new_typed_with_env(
store,
env,
Expand Down
1 change: 1 addition & 0 deletions crates/sats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hex = { workspace = true, optional = true }
itertools.workspace = true
nonempty.workspace = true
serde = { workspace = true, optional = true }
smallvec.workspace = true
thiserror.workspace = true
tracing.workspace = true

Expand Down
20 changes: 16 additions & 4 deletions crates/sats/src/bsatn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::buffer::{BufReader, BufWriter};
use crate::de::{Deserialize, DeserializeSeed};
use crate::de::{BasicSmallVecVisitor, Deserialize, DeserializeSeed, Deserializer as _};
use crate::ser::Serialize;
use crate::Typespace;
use smallvec::SmallVec;

pub mod de;
pub mod ser;
Expand Down Expand Up @@ -49,12 +50,23 @@ macro_rules! codec_funcs {
};
(val: $ty:ty) => {
impl $ty {
/// Decode a value from `bytes` typed at `ty`.
pub fn decode<'a>(
algebraic_type: &<Self as crate::Value>::Type,
ty: &<Self as crate::Value>::Type,
bytes: &mut impl BufReader<'a>,
) -> Result<Self, DecodeError> {
crate::WithTypespace::new(&Typespace::new(Vec::new()), algebraic_type)
.deserialize(Deserializer::new(bytes))
crate::WithTypespace::new(&Typespace::new(Vec::new()), ty).deserialize(Deserializer::new(bytes))
}

/// Decode a vector of values from `bytes` with each value typed at `ty`.
pub fn decode_smallvec<'a>(
ty: &<Self as crate::Value>::Type,
bytes: &mut impl BufReader<'a>,
) -> Result<SmallVec<[Self; 1]>, DecodeError> {
Deserializer::new(bytes).deserialize_array_seed(
BasicSmallVecVisitor,
crate::WithTypespace::new(&Typespace::new(Vec::new()), ty),
)
}

pub fn encode(&self, bytes: &mut impl BufWriter) {
Expand Down
76 changes: 64 additions & 12 deletions crates/sats/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod serde;

#[doc(hidden)]
pub use impls::{visit_named_product, visit_seq_product};
use smallvec::SmallVec;

use std::borrow::Borrow;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -580,6 +581,18 @@ pub trait MapAccess<'de> {
}
}

impl<'de, A: MapAccess<'de>> ArrayAccess<'de> for A {
type Element = (A::Key, A::Value);
type Error = A::Error;

fn next_element(&mut self) -> Result<Option<Self::Element>, Self::Error> {
self.next_entry()
}
fn size_hint(&self) -> Option<usize> {
self.size_hint()
}
}

/// `DeserializeSeed` is the stateful form of the [`Deserialize`] trait.
pub trait DeserializeSeed<'de> {
/// The type produced by using this seed.
Expand Down Expand Up @@ -635,18 +648,61 @@ impl<'de, T: Deserialize<'de>> DeserializeSeed<'de> for PhantomData<T> {
}
}

/// A vector with two operations: `with_capacity` and `push`.
pub trait GrowingVec<T> {
/// Create the collection with the given capacity.
fn with_capacity(cap: usize) -> Self;

/// Push to the vector the `elem`.
fn push(&mut self, elem: T);
}

impl<T> GrowingVec<T> for Vec<T> {
fn with_capacity(cap: usize) -> Self {
Self::with_capacity(cap)
}
fn push(&mut self, elem: T) {
self.push(elem)
}
}

impl<T, const N: usize> GrowingVec<T> for SmallVec<[T; N]> {
fn with_capacity(cap: usize) -> Self {
Self::with_capacity(cap)
}
fn push(&mut self, elem: T) {
self.push(elem)
}
}

/// A basic implementation of `ArrayVisitor::visit` using the provided size hint.
pub fn array_visit<'de, A: ArrayAccess<'de>, V: GrowingVec<A::Element>>(mut access: A) -> Result<V, A::Error> {
let mut v = V::with_capacity(access.size_hint().unwrap_or(0));
while let Some(x) = access.next_element()? {
v.push(x)
}
Ok(v)
}

/// An implementation of [`ArrayVisitor<'de, T>`] where the output is a `Vec<T>`.
pub struct BasicVecVisitor;

impl<'de, T> ArrayVisitor<'de, T> for BasicVecVisitor {
type Output = Vec<T>;

fn visit<A: ArrayAccess<'de, Element = T>>(self, mut vec: A) -> Result<Self::Output, A::Error> {
let mut v = Vec::with_capacity(vec.size_hint().unwrap_or(0));
while let Some(el) = vec.next_element()? {
v.push(el)
}
Ok(v)
fn visit<A: ArrayAccess<'de, Element = T>>(self, vec: A) -> Result<Self::Output, A::Error> {
array_visit(vec)
}
}

/// An implementation of [`ArrayVisitor<'de, T>`] where the output is a `SmallVec<[T; N]>`.
pub struct BasicSmallVecVisitor<const N: usize>;

impl<'de, T, const N: usize> ArrayVisitor<'de, T> for BasicSmallVecVisitor<N> {
type Output = SmallVec<[T; N]>;

fn visit<A: ArrayAccess<'de, Element = T>>(self, vec: A) -> Result<Self::Output, A::Error> {
array_visit(vec)
}
}

Expand All @@ -656,12 +712,8 @@ pub struct BasicMapVisitor;
impl<'de, K: Ord, V> MapVisitor<'de, K, V> for BasicMapVisitor {
type Output = BTreeMap<K, V>;

fn visit<A: MapAccess<'de, Key = K, Value = V>>(self, mut map: A) -> Result<Self::Output, A::Error> {
let mut m = Vec::with_capacity(map.size_hint().unwrap_or(0));
while let Some(entry) = map.next_entry()? {
m.push(entry)
}
Ok(m.into_iter().collect())
fn visit<A: MapAccess<'de, Key = K, Value = V>>(self, map: A) -> Result<Self::Output, A::Error> {
Ok(array_visit::<_, Vec<_>>(map)?.into_iter().collect())
}
}

Expand Down

0 comments on commit d1da33d

Please sign in to comment.