Skip to content

Commit

Permalink
Simplify/deabstract module_host_actor and wasmer_module
Browse files Browse the repository at this point in the history
  • Loading branch information
coolreader18 committed Oct 25, 2023
1 parent bc54b73 commit dd11a77
Show file tree
Hide file tree
Showing 19 changed files with 625 additions and 595 deletions.
6 changes: 3 additions & 3 deletions crates/bindings-csharp/Runtime/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ private static byte[] DescribeModule()
// Note: this is accessed by C bindings.
private static string? CallReducer(
uint id,
byte[] sender_identity,
byte[] sender_address,
byte[] caller_identity,
byte[] caller_address,
ulong timestamp,
byte[] args
)
Expand All @@ -200,7 +200,7 @@ byte[] args
{
using var stream = new MemoryStream(args);
using var reader = new BinaryReader(stream);
reducers[(int)id].Invoke(reader, new(sender_identity, sender_address, timestamp));
reducers[(int)id].Invoke(reader, new(caller_identity, caller_address, timestamp));
if (stream.Position != stream.Length)
{
throw new Exception("Unrecognised extra bytes in the reducer arguments");
Expand Down
26 changes: 13 additions & 13 deletions crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,37 +437,37 @@ static Buffer return_result_buf(MonoObject* str) {

__attribute__((export_name("__call_reducer__"))) Buffer __call_reducer__(
uint32_t id,
Buffer sender_id_,
Buffer sender_address_,
Buffer caller_identity_,
Buffer caller_address_,
uint64_t timestamp,
Buffer args_) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);
MonoArray* args = stdb_buffer_consume(args_);

return return_result_buf(INVOKE_DOTNET_METHOD(
"SpacetimeDB.Runtime.dll", "SpacetimeDB.Module", "FFI", "CallReducer",
NULL, &id, sender_id, sender_address, &timestamp, args));
NULL, &id, caller_identity, caller_address, &timestamp, args));
}

__attribute__((export_name("__identity_connected__"))) Buffer
__identity_connected__(Buffer sender_id_, Buffer sender_address_, uint64_t timestamp) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
__identity_connected__(Buffer caller_identity_, Buffer caller_address_, uint64_t timestamp) {
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);

return return_result_buf(
INVOKE_DOTNET_METHOD("SpacetimeDB.Runtime.dll", "SpacetimeDB", "Runtime",
"IdentityConnected", NULL, sender_id, sender_address, &timestamp));
"IdentityConnected", NULL, caller_identity, caller_address, &timestamp));
}

__attribute__((export_name("__identity_disconnected__"))) Buffer
__identity_disconnected__(Buffer sender_id_, Buffer sender_address_, uint64_t timestamp) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
__identity_disconnected__(Buffer caller_identity_, Buffer caller_address_, uint64_t timestamp) {
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);

return return_result_buf(
INVOKE_DOTNET_METHOD("SpacetimeDB.Runtime.dll", "SpacetimeDB", "Runtime",
"IdentityDisconnected", NULL, sender_id, sender_address, &timestamp));
"IdentityDisconnected", NULL, caller_identity, caller_address, &timestamp));
}

// Shims to avoid dependency on WASI in the generated Wasm file.
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn assemble_context(sender: Buffer, timestamp: u64, client_address: Buffer) -> R

let address = Address::from_arr(&client_address.read_array::<16>());

let address = if address == Address::__dummy() {
let address = if address == Address::__DUMMY {
None
} else {
Some(address)
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
.map(Address::from)
.unwrap_or_else(generate_random_address);

if client_address == Address::__dummy() {
if client_address == Address::__DUMMY {
Err((
StatusCode::BAD_REQUEST,
"Invalid client address: the all-zeros Address is reserved.",
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ServerMessage for TransactionUpdateMessage<'_> {
},
energy_quanta_used: event.energy_quanta_used.0,
message: errmsg,
caller_address: event.caller_address.unwrap_or(Address::ZERO),
caller_address: event.caller_address.unwrap_or(Address::__DUMMY),
};

let subscription_update = database_update.into_json();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct DatabaseInstanceContext {
pub database_id: u64,
pub identity: Identity,
pub address: Address,
pub logger: Arc<Mutex<DatabaseLogger>>,
pub logger: Arc<parking_lot::Mutex<DatabaseLogger>>,
pub relational_db: Arc<RelationalDB>,
pub publisher_address: Option<Address>,
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl DatabaseInstanceContext {
database_id,
identity,
address,
logger: Arc::new(Mutex::new(DatabaseLogger::open(log_path))),
logger: Arc::new(parking_lot::Mutex::new(DatabaseLogger::open(log_path))),
relational_db: Arc::new(
RelationalDB::open(db_path, message_log, odb, address, config.fsync != FsyncPolicy::Never).unwrap(),
),
Expand Down
38 changes: 38 additions & 0 deletions crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,42 @@ impl DatabaseLogger {

text[text.len() - off_from_end..].to_owned()
}

pub fn system_logger(&mut self) -> &mut SystemLogger {
// SAFETY: SystemLogger is repr(transparent) over DatabaseLogger
unsafe { &mut *(self as *mut DatabaseLogger as *mut SystemLogger) }
}
}

/// Somewhat ad-hoc wrapper around [`DatabaseLogger`] which allows to inject
/// "system messages" into the user-retrievable database / module log
#[repr(transparent)]
pub struct SystemLogger {
inner: DatabaseLogger,
}

impl SystemLogger {
pub fn info(&mut self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Info, &Self::record(msg), &())
}

pub fn warn(&mut self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Warn, &Self::record(msg), &())
}

pub fn error(&mut self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Error, &Self::record(msg), &())
}

fn record(message: &str) -> Record {
Record {
target: None,
filename: Some("spacetimedb"),
line_number: None,
message,
}
}
}
70 changes: 69 additions & 1 deletion crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::db::relational_db::ST_TABLES_ID;
use anyhow::Context;
use nonempty::NonEmpty;
use spacetimedb_lib::auth::{StAccess, StTableType};
use spacetimedb_lib::relation::{DbTable, FieldName, FieldOnly, Header, TableField};
use spacetimedb_lib::{ColumnIndexAttribute, DataKey, Hash};
use spacetimedb_primitives::{ColId, IndexId, SequenceId, TableId};
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue, WithTypespace};
use spacetimedb_vm::expr::SourceExpr;
use std::{borrow::Cow, ops::RangeBounds, sync::Arc};

Expand Down Expand Up @@ -291,6 +292,73 @@ pub struct TableDef {
pub(crate) table_access: StAccess,
}

impl TableDef {
pub fn from_lib_tabledef(table: WithTypespace<'_, spacetimedb_lib::TableDef>) -> anyhow::Result<Self> {
let schema = table
.map(|t| &t.data)
.resolve_refs()
.context("recursive types not yet supported")?;
let schema = schema.into_product().ok().context("table not a product type?")?;
let table = table.ty();
anyhow::ensure!(
table.column_attrs.len() == schema.elements.len(),
"mismatched number of columns"
);

let mut columns = Vec::with_capacity(schema.elements.len());
let mut indexes = Vec::new();
for (col_id, (ty, col_attr)) in std::iter::zip(&schema.elements, &table.column_attrs).enumerate() {
let col = ColumnDef {
col_name: ty.name.clone().context("column without name")?,
col_type: ty.algebraic_type.clone(),
is_autoinc: col_attr.is_autoinc(),
};

let index_for_column = table.indexes.iter().find(|index| {
// Ignore multi-column indexes
matches!(*index.col_ids, [index_col_id] if index_col_id as usize == col_id)
});

// If there's an index defined for this column already, use it,
// making sure that it is unique if the column has a unique constraint
let index_info = if let Some(index) = index_for_column {
Some((index.name.clone(), index.ty))
} else if col_attr.is_unique() {
// If you didn't find an index, but the column is unique then create a unique btree index
// anyway.
Some((
format!("{}_{}_unique", table.name, col.col_name),
spacetimedb_lib::IndexType::BTree,
))
} else {
None
};
if let Some((name, ty)) = index_info {
match ty {
spacetimedb_lib::IndexType::BTree => {}
// TODO
spacetimedb_lib::IndexType::Hash => anyhow::bail!("hash indexes not yet supported"),
}
indexes.push(IndexDef::new(
name,
TableId(0), // Will be ignored
ColId(col_id as u32),
col_attr.is_unique(),
))
}
columns.push(col);
}

Ok(TableDef {
table_name: table.name.clone(),
columns,
indexes,
table_type: table.table_type,
table_access: table.table_access,
})
}
}

impl From<ProductType> for TableDef {
fn from(value: ProductType) -> Self {
Self {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod messages;
pub mod ostorage;
pub mod relational_db;
mod relational_operators;
pub mod update;

pub use spacetimedb_lib::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};

Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ impl RelationalDB {
pub fn with_auto_rollback<F, A, E>(&self, mut tx: MutTxId, f: F) -> Result<(MutTxId, A), E>
where
F: FnOnce(&mut MutTxId) -> Result<A, E>,
E: From<DBError>,
{
let res = f(&mut tx);
self.rollback_on_err(tx, res)
Expand Down Expand Up @@ -360,11 +359,7 @@ impl RelationalDB {

/// Roll back transaction `tx` if `res` is `Err`, otherwise return it
/// alongside the `Ok` value.
#[tracing::instrument(skip_all)]
pub fn rollback_on_err<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<(MutTxId, A), E>
where
E: From<DBError>,
{
pub fn rollback_on_err<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<(MutTxId, A), E> {
match res {
Err(e) => {
self.rollback_tx(tx);
Expand Down
Loading

0 comments on commit dd11a77

Please sign in to comment.