Skip to content

Commit

Permalink
Simplify/deabstract module_host_actor and wasmer_module (#417)
Browse files Browse the repository at this point in the history
* Simplify/deabstract module_host_actor and wasmer_module

* DatabaseLogger doesn't need an external mutex
  • Loading branch information
coolreader18 authored Oct 27, 2023
1 parent 3d37f1d commit 628dac1
Show file tree
Hide file tree
Showing 25 changed files with 654 additions and 624 deletions.
6 changes: 3 additions & 3 deletions crates/bindings-csharp/Runtime/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ private static byte[] DescribeModule()
// Note: this is accessed by C bindings.
private static string? CallReducer(
uint id,
byte[] sender_identity,
byte[] sender_address,
byte[] caller_identity,
byte[] caller_address,
ulong timestamp,
byte[] args
)
Expand All @@ -200,7 +200,7 @@ byte[] args
{
using var stream = new MemoryStream(args);
using var reader = new BinaryReader(stream);
reducers[(int)id].Invoke(reader, new(sender_identity, sender_address, timestamp));
reducers[(int)id].Invoke(reader, new(caller_identity, caller_address, timestamp));
if (stream.Position != stream.Length)
{
throw new Exception("Unrecognised extra bytes in the reducer arguments");
Expand Down
26 changes: 13 additions & 13 deletions crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,37 +437,37 @@ static Buffer return_result_buf(MonoObject* str) {

__attribute__((export_name("__call_reducer__"))) Buffer __call_reducer__(
uint32_t id,
Buffer sender_id_,
Buffer sender_address_,
Buffer caller_identity_,
Buffer caller_address_,
uint64_t timestamp,
Buffer args_) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);
MonoArray* args = stdb_buffer_consume(args_);

return return_result_buf(INVOKE_DOTNET_METHOD(
"SpacetimeDB.Runtime.dll", "SpacetimeDB.Module", "FFI", "CallReducer",
NULL, &id, sender_id, sender_address, &timestamp, args));
NULL, &id, caller_identity, caller_address, &timestamp, args));
}

__attribute__((export_name("__identity_connected__"))) Buffer
__identity_connected__(Buffer sender_id_, Buffer sender_address_, uint64_t timestamp) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
__identity_connected__(Buffer caller_identity_, Buffer caller_address_, uint64_t timestamp) {
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);

return return_result_buf(
INVOKE_DOTNET_METHOD("SpacetimeDB.Runtime.dll", "SpacetimeDB", "Runtime",
"IdentityConnected", NULL, sender_id, sender_address, &timestamp));
"IdentityConnected", NULL, caller_identity, caller_address, &timestamp));
}

__attribute__((export_name("__identity_disconnected__"))) Buffer
__identity_disconnected__(Buffer sender_id_, Buffer sender_address_, uint64_t timestamp) {
MonoArray* sender_id = stdb_buffer_consume(sender_id_);
MonoArray* sender_address = stdb_buffer_consume(sender_address_);
__identity_disconnected__(Buffer caller_identity_, Buffer caller_address_, uint64_t timestamp) {
MonoArray* caller_identity = stdb_buffer_consume(caller_identity_);
MonoArray* caller_address = stdb_buffer_consume(caller_address_);

return return_result_buf(
INVOKE_DOTNET_METHOD("SpacetimeDB.Runtime.dll", "SpacetimeDB", "Runtime",
"IdentityDisconnected", NULL, sender_id, sender_address, &timestamp));
"IdentityDisconnected", NULL, caller_identity, caller_address, &timestamp));
}

// Shims to avoid dependency on WASI in the generated Wasm file.
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn assemble_context(sender: Buffer, timestamp: u64, client_address: Buffer) -> R

let address = Address::from_arr(&client_address.read_array::<16>());

let address = if address == Address::__dummy() {
let address = if address == Address::__DUMMY {
None
} else {
Some(address)
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
.map(Address::from)
.unwrap_or_else(generate_random_address);

if client_address == Address::__dummy() {
if client_address == Address::__DUMMY {
Err((
StatusCode::BAD_REQUEST,
"Invalid client address: the all-zeros Address is reserved.",
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ServerMessage for TransactionUpdateMessage<'_> {
},
energy_quanta_used: event.energy_quanta_used.0,
message: errmsg,
caller_address: event.caller_address.unwrap_or(Address::ZERO),
caller_address: event.caller_address.unwrap_or(Address::__DUMMY),
};

let subscription_update = database_update.into_json();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct DatabaseInstanceContext {
pub database_id: u64,
pub identity: Identity,
pub address: Address,
pub logger: Arc<Mutex<DatabaseLogger>>,
pub logger: Arc<DatabaseLogger>,
pub relational_db: Arc<RelationalDB>,
pub publisher_address: Option<Address>,
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl DatabaseInstanceContext {
database_id,
identity,
address,
logger: Arc::new(Mutex::new(DatabaseLogger::open(log_path))),
logger: Arc::new(DatabaseLogger::open(log_path)),
relational_db: Arc::new(
RelationalDB::open(
db_path,
Expand Down
42 changes: 40 additions & 2 deletions crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl DatabaseLogger {
self.file.seek(SeekFrom::End(0)).unwrap();
}

pub fn write(&mut self, level: LogLevel, &record: &Record<'_>, bt: &dyn BacktraceProvider) {
pub fn write(&self, level: LogLevel, &record: &Record<'_>, bt: &dyn BacktraceProvider) {
let (trace, frames);
let event = match level {
LogLevel::Error => LogEvent::Error(record),
Expand All @@ -163,7 +163,7 @@ impl DatabaseLogger {
};
let mut buf = serde_json::to_string(&event).unwrap();
buf.push('\n');
self.file.write_all(buf.as_bytes()).unwrap();
(&self.file).write_all(buf.as_bytes()).unwrap();
let _ = self.tx.send(buf.into());
}

Expand All @@ -190,4 +190,42 @@ impl DatabaseLogger {

text[text.len() - off_from_end..].to_owned()
}

pub fn system_logger(&self) -> &SystemLogger {
// SAFETY: SystemLogger is repr(transparent) over DatabaseLogger
unsafe { &*(self as *const DatabaseLogger as *const SystemLogger) }
}
}

/// Somewhat ad-hoc wrapper around [`DatabaseLogger`] which allows to inject
/// "system messages" into the user-retrievable database / module log
#[repr(transparent)]
pub struct SystemLogger {
inner: DatabaseLogger,
}

impl SystemLogger {
pub fn info(&self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Info, &Self::record(msg), &())
}

pub fn warn(&self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Warn, &Self::record(msg), &())
}

pub fn error(&self, msg: &str) {
self.inner
.write(crate::database_logger::LogLevel::Error, &Self::record(msg), &())
}

fn record(message: &str) -> Record {
Record {
target: None,
filename: Some("spacetimedb"),
line_number: None,
message,
}
}
}
4 changes: 2 additions & 2 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl CommitLog {
let metric = rows_inserted.with_label_values(
&ctx.txn_type(),
&ctx.database_id(),
&ctx.reducer_id().unwrap_or(0),
&ctx.reducer_id().unwrap_or_default(),
&table_id,
);
metric.inc();
Expand All @@ -134,7 +134,7 @@ impl CommitLog {
let metric = rows_deleted.with_label_values(
&ctx.txn_type(),
&ctx.database_id(),
&ctx.reducer_id().unwrap_or(0),
&ctx.reducer_id().unwrap_or_default(),
&table_id,
);
metric.inc();
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ impl Drop for Iter<'_> {
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.database_id(),
&self.ctx.reducer_id().unwrap_or(0),
&self.ctx.reducer_id().unwrap_or_default(),
&self.table_id.into(),
)
.inc_by(self.committed_rows_fetched);
Expand Down
70 changes: 69 additions & 1 deletion crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::db::relational_db::ST_TABLES_ID;
use crate::execution_context::ExecutionContext;
use anyhow::Context;
use nonempty::NonEmpty;
use spacetimedb_lib::auth::{StAccess, StTableType};
use spacetimedb_lib::relation::{DbTable, FieldName, FieldOnly, Header, TableField};
use spacetimedb_lib::{ColumnIndexAttribute, DataKey, Hash};
use spacetimedb_primitives::{ColId, IndexId, SequenceId, TableId};
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue, WithTypespace};
use spacetimedb_vm::expr::SourceExpr;
use std::{borrow::Cow, ops::RangeBounds, sync::Arc};

Expand Down Expand Up @@ -292,6 +293,73 @@ pub struct TableDef {
pub(crate) table_access: StAccess,
}

impl TableDef {
pub fn from_lib_tabledef(table: WithTypespace<'_, spacetimedb_lib::TableDef>) -> anyhow::Result<Self> {
let schema = table
.map(|t| &t.data)
.resolve_refs()
.context("recursive types not yet supported")?;
let schema = schema.into_product().ok().context("table not a product type?")?;
let table = table.ty();
anyhow::ensure!(
table.column_attrs.len() == schema.elements.len(),
"mismatched number of columns"
);

let mut columns = Vec::with_capacity(schema.elements.len());
let mut indexes = Vec::new();
for (col_id, (ty, col_attr)) in std::iter::zip(&schema.elements, &table.column_attrs).enumerate() {
let col = ColumnDef {
col_name: ty.name.clone().context("column without name")?,
col_type: ty.algebraic_type.clone(),
is_autoinc: col_attr.is_autoinc(),
};

let index_for_column = table.indexes.iter().find(|index| {
// Ignore multi-column indexes
matches!(*index.col_ids, [index_col_id] if index_col_id as usize == col_id)
});

// If there's an index defined for this column already, use it,
// making sure that it is unique if the column has a unique constraint
let index_info = if let Some(index) = index_for_column {
Some((index.name.clone(), index.ty))
} else if col_attr.is_unique() {
// If you didn't find an index, but the column is unique then create a unique btree index
// anyway.
Some((
format!("{}_{}_unique", table.name, col.col_name),
spacetimedb_lib::IndexType::BTree,
))
} else {
None
};
if let Some((name, ty)) = index_info {
match ty {
spacetimedb_lib::IndexType::BTree => {}
// TODO
spacetimedb_lib::IndexType::Hash => anyhow::bail!("hash indexes not yet supported"),
}
indexes.push(IndexDef::new(
name,
TableId(0), // Will be ignored
ColId(col_id as u32),
col_attr.is_unique(),
))
}
columns.push(col);
}

Ok(TableDef {
table_name: table.name.clone(),
columns,
indexes,
table_type: table.table_type,
table_access: table.table_access,
})
}
}

impl From<ProductType> for TableDef {
fn from(value: ProductType) -> Self {
Self {
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::host::ReducerId;
use crate::{execution_context::TransactionType, util::typed_prometheus::metrics_group};
use once_cell::sync::Lazy;
use prometheus::{Histogram, HistogramVec, IntCounterVec};
Expand Down Expand Up @@ -52,17 +53,17 @@ metrics_group!(

#[name = spacetime_num_rows_inserted_cumulative]
#[help = "The cumulative number of rows inserted into a table"]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: u64, table_id: u32)]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: ReducerId, table_id: u32)]
pub rdb_num_rows_inserted: IntCounterVec,

#[name = spacetime_num_rows_deleted_cumulative]
#[help = "The cumulative number of rows deleted from a table"]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: u64, table_id: u32)]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: ReducerId, table_id: u32)]
pub rdb_num_rows_deleted: IntCounterVec,

#[name = spacetime_num_rows_fetched_cumulative]
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: u64, table_id: u32)]
#[labels(txn_type: TransactionType, database_id: u64, reducer_id: ReducerId, table_id: u32)]
pub rdb_num_rows_fetched: IntCounterVec,
}
);
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod messages;
pub mod ostorage;
pub mod relational_db;
mod relational_operators;
pub mod update;

pub use spacetimedb_lib::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};

Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ impl RelationalDB {
pub fn with_auto_rollback<F, A, E>(&self, mut tx: MutTxId, f: F) -> Result<(MutTxId, A), E>
where
F: FnOnce(&mut MutTxId) -> Result<A, E>,
E: From<DBError>,
{
let res = f(&mut tx);
self.rollback_on_err(tx, res)
Expand Down Expand Up @@ -370,11 +369,7 @@ impl RelationalDB {

/// Roll back transaction `tx` if `res` is `Err`, otherwise return it
/// alongside the `Ok` value.
#[tracing::instrument(skip_all)]
pub fn rollback_on_err<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<(MutTxId, A), E>
where
E: From<DBError>,
{
pub fn rollback_on_err<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<(MutTxId, A), E> {
match res {
Err(e) => {
self.rollback_tx(tx);
Expand Down
Loading

1 comment on commit 628dac1

@github-actions
Copy link

@github-actions github-actions bot commented on 628dac1 Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.