From ea2bae45837c32e7794432ac792d44985b7f6888 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 20 Sep 2023 07:42:37 +0200 Subject: [PATCH] Cloud next (#173) * Client API changes to accomodate new cloud architecture The exact trait interfaces for `client-api` is TBD * Ensure we're not blocking when accessing the filesystem * Derive Clone for SendGridController * Add YOLO error variant to InsertDomainResult * Rebase * fixup! Rebase * Fix SpacetimeType for Address * Temporarily disable message / frame size limits for SDK WS * Remove get_database_instance_state from API trait It's an internal (worker db) thing, which does not need to be satisfied by impls. * Update indexes when updating a database It turns out that the order of the index definitions in the proposed schema may differ from those returned from the catalog, causing valid (i.e. no-op) updates to be rejected. While at it, allow updating table indexes so as long as the (column) schema remains unchanged. * Update indexes when updating a database It turns out that the order of the index definitions in the proposed schema may differ from those returned from the catalog, causing valid (i.e. no-op) updates to be rejected. While at it, allow updating table indexes so as long as the (column) schema remains unchanged. * Fix -S instead of -s for update-module smoke test -s now means "server", -S "skip clippy", changed in: a1e9984 (Multiple server configurations for CLI (#214), 2023-09-01) * Invalidate schema cache when committing a tx * Use long options in update-module.sh, fix unused warning * Add test asserting schema_for_table reflects index updates --- crates/bindings/src/impls.rs | 9 +- crates/cli/src/subcommands/dns.rs | 2 + crates/client-api/src/auth.rs | 10 +- crates/client-api/src/lib.rs | 485 +++++++++++++++++---- crates/client-api/src/routes/database.rs | 421 +++++++----------- crates/client-api/src/routes/energy.rs | 107 +++-- crates/client-api/src/routes/identity.rs | 64 ++- crates/client-api/src/routes/metrics.rs | 12 +- crates/client-api/src/routes/prometheus.rs | 19 +- crates/client-api/src/routes/subscribe.rs | 30 +- crates/client-api/src/routes/tracelog.rs | 29 +- crates/client-api/src/util.rs | 17 +- crates/core/src/control_db.rs | 102 +++-- crates/core/src/control_db/tests.rs | 52 +-- crates/core/src/sendgrid_controller.rs | 1 + crates/lib/src/address.rs | 12 + crates/lib/src/hash.rs | 2 + crates/lib/src/identity.rs | 10 +- crates/lib/src/lib.rs | 1 - crates/lib/src/name.rs | 30 +- crates/sdk/src/websocket.rs | 18 +- crates/standalone/src/energy_monitor.rs | 2 +- crates/standalone/src/lib.rs | 327 ++++++++------ crates/standalone/src/routes/mod.rs | 11 +- crates/testing/src/modules.rs | 36 +- test/tests/update-module.sh | 2 +- 26 files changed, 1080 insertions(+), 731 deletions(-) diff --git a/crates/bindings/src/impls.rs b/crates/bindings/src/impls.rs index 18b517eea1..88ce0de63f 100644 --- a/crates/bindings/src/impls.rs +++ b/crates/bindings/src/impls.rs @@ -1,4 +1,4 @@ -use spacetimedb_lib::{DataKey, Hash, Identity}; +use spacetimedb_lib::{Address, DataKey, Hash, Identity}; use super::PrimaryKey; use crate::{FilterableValue, UniqueValue}; @@ -54,3 +54,10 @@ impl UniqueValue for Identity { todo!() // idk what this is } } + +impl FilterableValue for Address {} +impl UniqueValue for Address { + fn into_primarykey(self) -> PrimaryKey { + todo!() + } +} diff --git a/crates/cli/src/subcommands/dns.rs b/crates/cli/src/subcommands/dns.rs index 3d3af2c8a2..16812e842e 100644 --- a/crates/cli/src/subcommands/dns.rs +++ b/crates/cli/src/subcommands/dns.rs @@ -159,6 +159,7 @@ pub async fn exec_set_name(mut config: Config, args: &ArgMatches) -> Result<(), let res = builder.send().await?.error_for_status()?; let bytes = res.bytes().await.unwrap(); + println!("{}", String::from_utf8_lossy(&bytes[..])); let result: InsertDomainResult = serde_json::from_slice(&bytes[..]).unwrap(); match result { InsertDomainResult::Success { domain, address } => { @@ -212,6 +213,7 @@ pub async fn exec_set_name(mut config: Config, args: &ArgMatches) -> Result<(), )), }; } + InsertDomainResult::OtherError(e) => return Err(anyhow::anyhow!(e)), } Ok(()) diff --git a/crates/client-api/src/auth.rs b/crates/client-api/src/auth.rs index da2eada2f1..0717e1ce4d 100644 --- a/crates/client-api/src/auth.rs +++ b/crates/client-api/src/auth.rs @@ -16,7 +16,7 @@ use spacetimedb::auth::identity::{ use spacetimedb::host::EnergyDiff; use spacetimedb::identity::Identity; -use crate::{log_and_500, ControlNodeDelegate}; +use crate::{log_and_500, ControlStateDelegate, NodeDelegate}; // Yes, this is using basic auth. See the below issues. // The current form is: Authorization: Basic base64("token:") @@ -75,7 +75,7 @@ pub struct TokenQueryParam { } #[async_trait::async_trait] -impl axum::extract::FromRequestParts for SpacetimeAuthHeader { +impl axum::extract::FromRequestParts for SpacetimeAuthHeader { type Rejection = AuthorizationRejection; async fn from_request_parts(parts: &mut request::Parts, state: &S) -> Result { match ( @@ -168,8 +168,8 @@ enum AuthorizationRejectionReason { } impl SpacetimeAuth { - pub async fn alloc(ctx: &(impl ControlNodeDelegate + ?Sized)) -> axum::response::Result { - let identity = ctx.alloc_spacetime_identity().await.map_err(log_and_500)?; + pub async fn alloc(ctx: &(impl NodeDelegate + ControlStateDelegate + ?Sized)) -> axum::response::Result { + let identity = ctx.create_identity().await.map_err(log_and_500)?; let creds = SpacetimeCreds::encode_token(ctx.private_key(), identity).map_err(log_and_500)?; Ok(Self { creds, identity }) } @@ -192,7 +192,7 @@ impl SpacetimeAuthHeader { /// If there is no JWT in the auth header we will create a new identity and token and return it. pub async fn get_or_create( self, - ctx: &(impl ControlNodeDelegate + ?Sized), + ctx: &(impl NodeDelegate + ControlStateDelegate + ?Sized), ) -> axum::response::Result { match self.get() { Some(auth) => Ok(auth), diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 9761555e71..e42e63835f 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -1,116 +1,169 @@ +use std::sync::Arc; + use async_trait::async_trait; -use axum::extract::FromRef; use http::StatusCode; + use spacetimedb::address::Address; use spacetimedb::auth::identity::{DecodingKey, EncodingKey}; use spacetimedb::client::ClientActorIndex; -use spacetimedb::control_db::ControlDb; use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController; -use spacetimedb::hash::Hash; use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::{EnergyQuanta, HostController}; use spacetimedb::identity::Identity; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Node}; -use spacetimedb::messages::worker_db::DatabaseInstanceState; +use spacetimedb::messages::control_db::{Database, DatabaseInstance, IdentityEmail, Node}; use spacetimedb::module_host_context::ModuleHostContext; -use spacetimedb::object_db::ObjectDb; use spacetimedb::sendgrid_controller::SendGridController; -use spacetimedb_lib::name::DomainName; -mod auth; +use spacetimedb_lib::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; +use spacetimedb_lib::recovery::RecoveryCode; + +pub mod auth; pub mod routes; pub mod util; -use std::sync::Arc; +/// Defines the state / environment of a SpacetimeDB node from the PoV of the +/// client API. +/// +/// Types returned here should be considered internal state and **never** be +/// surfaced to the API. #[async_trait] -pub trait WorkerCtx: ControlNodeDelegate + ControlStateDelegate + Send + Sync { +pub trait NodeDelegate: Send + Sync { fn gather_metrics(&self) -> Vec; fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController; - async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result; fn host_controller(&self) -> &Arc; fn client_actor_index(&self) -> &ClientActorIndex; -} + fn sendgrid_controller(&self) -> Option<&SendGridController>; -#[async_trait] -pub trait ControlStateDelegate: Send + Sync { - async fn get_node_id(&self) -> Result, anyhow::Error>; + /// Return a JWT decoding key for verifying credentials. + fn public_key(&self) -> &DecodingKey; - async fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result>; + /// Return the public key used to verify JWTs, as the bytes of a PEM public key file. + /// + /// The `/identity/public-key` route calls this method to return the public key to callers. + fn public_key_bytes(&self) -> &[u8]; - async fn get_nodes(&self) -> spacetimedb::control_db::Result>; + /// Return a JWT encoding key for signing credentials. + fn private_key(&self) -> &EncodingKey; - async fn get_database_instance_state( - &self, - database_instance_id: u64, - ) -> Result, anyhow::Error>; + /// Load the [`ModuleHostContext`] for instance `instance_id` of + /// [`Database`] `db`. + /// + /// This method is defined as `async`, as that obliges the implementer to + /// ensure that any necessary blocking I/O is made async-safe. In other + /// words, it is the responsibility of the implementation to make use of + /// `spawn_blocking` or `block_in_place` as appropriate, while the + /// `client-api` assumes that `await`ing the method never blocks. + async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result; +} - async fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result>; +/// Parameters for publishing a database. +/// +/// See [`ControlStateDelegate::publish_database`]. +pub struct DatabaseDef { + /// The [`Address`] the database shall have. + /// + /// Addresses are allocated via [`ControlStateDelegate::create_address`]. + pub address: Address, + /// The compiled program of the database module. + pub program_bytes: Vec, + /// The desired number of replicas the database shall have. + pub num_replicas: u32, +} - async fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result>; +/// API of the SpacetimeDB control plane. +/// +/// The trait is the composition of [`ControlStateReadAccess`] and +/// [`ControlStateWriteAccess`] to reflect the consistency model of SpacetimeDB +/// as of this writing: +/// +/// The control plane state represents the _desired_ state of an ensemble of +/// SpacetimeDB nodes. As such, this state can be read from a local (in-memory) +/// representation, which is guaranteed to be "prefix consistent" across all +/// nodes of a cluster. Prefix consistency means that the state being examined +/// is consistent, but reads may not return the most recently written values. +/// +/// As a consequence, implementations are not currently required to guarantee +/// read-after-write consistency. In the future, however, write operations may +/// be required to return the observed state after completing. As this may +/// require them to suspend themselves while waiting for the writes to propagate, +/// [`ControlStateWriteAccess`] methods are marked `async` today already. +#[async_trait] +pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync {} - async fn get_databases(&self) -> spacetimedb::control_db::Result>; +impl ControlStateDelegate for T {} - async fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result>; +/// Query API of the SpacetimeDB control plane. +pub trait ControlStateReadAccess { + // Nodes + fn get_node_id(&self) -> Option; + fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result>; + fn get_nodes(&self) -> spacetimedb::control_db::Result>; - async fn get_database_instances(&self) -> spacetimedb::control_db::Result>; + // Databases + fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result>; + fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result>; + fn get_databases(&self) -> spacetimedb::control_db::Result>; - async fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option; + // Database instances + fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result>; + fn get_database_instances(&self) -> spacetimedb::control_db::Result>; + fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option; + + // Identities + fn get_identities_for_email(&self, email: &str) -> spacetimedb::control_db::Result>; + fn get_recovery_codes(&self, email: &str) -> spacetimedb::control_db::Result>; + + // Energy + fn get_energy_balance(&self, identity: &Identity) -> spacetimedb::control_db::Result>; + + // DNS + fn lookup_address(&self, domain: &DomainName) -> spacetimedb::control_db::Result>; + fn reverse_lookup(&self, address: &Address) -> spacetimedb::control_db::Result>; } +/// Write operations on the SpacetimeDB control plane. #[async_trait] -pub trait ControlCtx: ControlNodeDelegate + Send + Sync { - #[allow(clippy::too_many_arguments)] - async fn insert_database( - &self, - address: &Address, - identity: &Identity, - program_bytes_address: &Hash, - host_type: HostType, - num_replicas: u32, - force: bool, - ) -> Result<(), anyhow::Error>; +pub trait ControlStateWriteAccess: Send + Sync { + // Databases + async fn create_address(&self) -> spacetimedb::control_db::Result
; - async fn update_database( + /// Publish a database acc. to [`DatabaseDef`]. + /// + /// If the database with the given address was successfully published before, + /// it is updated acc. to the module lifecycle conventions. `Some` result is + /// returned in that case. + /// + /// Otherwise, `None` is returned meaning that the database was freshly + /// initialized. + async fn publish_database( &self, - address: &Address, - program_bytes_address: &Hash, - num_replicas: u32, - ) -> Result, anyhow::Error>; + identity: &Identity, + spec: DatabaseDef, + ) -> spacetimedb::control_db::Result>; - async fn delete_database(&self, address: &Address) -> Result<(), anyhow::Error>; + async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()>; - fn object_db(&self) -> &ObjectDb; - fn control_db(&self) -> &ControlDb; - fn sendgrid_controller(&self) -> Option<&SendGridController>; -} + // Identities + async fn create_identity(&self) -> spacetimedb::control_db::Result; + async fn add_email(&self, identity: &Identity, email: &str) -> spacetimedb::control_db::Result<()>; + async fn insert_recovery_code( + &self, + identity: &Identity, + email: &str, + code: RecoveryCode, + ) -> spacetimedb::control_db::Result<()>; -#[async_trait] -/// Access to the SpacetimeDB control plane. -/// -/// Implementors of this trait are able to delegate requests to a `ControlCtx` -/// through some unspecified method. -/// In SpacetimeDB-standalone, this manifests as a direct in-program call to the control node. -/// In SpacetimeDB-cloud, worker nodes' `ControlNodeDelegate` implementations -/// may make network requests to the control node to handle some of these methods. -pub trait ControlNodeDelegate: Send + Sync { - /// Resolve a database name to an address. - async fn spacetime_dns(&self, domain: &DomainName) -> spacetimedb::control_db::Result>; - - /// Create a new, unique `Identity`. - async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result; - - /// Subtract `amount` from the energy balance of `identity`. + // Energy + async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()>; async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()>; - /// Return a JWT decoding key for verifying credentials. - fn public_key(&self) -> &DecodingKey; - - /// Return a JWT encoding key for signing credentials. - fn private_key(&self) -> &EncodingKey; - - /// Return the public key used to verify JWTs, as the bytes of a PEM public key file. - /// - /// The `/identity/public-key` route calls this method to return the public key to callers. - fn public_key_bytes(&self) -> &[u8]; + // DNS + async fn register_tld(&self, identity: &Identity, tld: Tld) -> spacetimedb::control_db::Result; + async fn create_dns_record( + &self, + identity: &Identity, + domain: &DomainName, + address: &Address, + ) -> spacetimedb::control_db::Result; } pub struct ArcEnv(pub Arc); @@ -120,65 +173,307 @@ impl Clone for ArcEnv { } } -impl FromRef> for Arc { - fn from_ref(env: &ArcEnv) -> Self { - env.0.clone() +impl ControlStateReadAccess for ArcEnv { + // Nodes + fn get_node_id(&self) -> Option { + self.0.get_node_id() + } + fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result> { + self.0.get_node_by_id(node_id) + } + fn get_nodes(&self) -> spacetimedb::control_db::Result> { + self.0.get_nodes() + } + + // Databases + fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + self.0.get_database_by_id(id) + } + fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result> { + self.0.get_database_by_address(address) + } + fn get_databases(&self) -> spacetimedb::control_db::Result> { + self.0.get_databases() + } + + // Database instances + fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + self.0.get_database_instance_by_id(id) + } + fn get_database_instances(&self) -> spacetimedb::control_db::Result> { + self.0.get_database_instances() + } + fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { + self.0.get_leader_database_instance_by_database(database_id) + } + + // Identities + fn get_identities_for_email(&self, email: &str) -> spacetimedb::control_db::Result> { + self.0.get_identities_for_email(email) + } + fn get_recovery_codes(&self, email: &str) -> spacetimedb::control_db::Result> { + self.0.get_recovery_codes(email) } -} -impl FromRef> for Arc { - fn from_ref(env: &ArcEnv) -> Self { - env.0.clone() + // Energy + fn get_energy_balance(&self, identity: &Identity) -> spacetimedb::control_db::Result> { + self.0.get_energy_balance(identity) + } + + // DNS + fn lookup_address(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { + self.0.lookup_address(domain) + } + + fn reverse_lookup(&self, address: &Address) -> spacetimedb::control_db::Result> { + self.0.reverse_lookup(address) } } #[async_trait] -impl ControlNodeDelegate for ArcEnv { - async fn spacetime_dns(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { - self.0.spacetime_dns(domain).await +impl ControlStateWriteAccess for ArcEnv { + async fn create_address(&self) -> spacetimedb::control_db::Result
{ + self.0.create_address().await + } + + async fn publish_database( + &self, + identity: &Identity, + spec: DatabaseDef, + ) -> spacetimedb::control_db::Result> { + self.0.publish_database(identity, spec).await + } + + async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()> { + self.0.delete_database(identity, address).await + } + + async fn create_identity(&self) -> spacetimedb::control_db::Result { + self.0.create_identity().await } - async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result { - self.0.alloc_spacetime_identity().await + async fn add_email(&self, identity: &Identity, email: &str) -> spacetimedb::control_db::Result<()> { + self.0.add_email(identity, email).await } + async fn insert_recovery_code( + &self, + identity: &Identity, + email: &str, + code: RecoveryCode, + ) -> spacetimedb::control_db::Result<()> { + self.0.insert_recovery_code(identity, email, code).await + } + + async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + self.0.add_energy(identity, amount).await + } async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { self.0.withdraw_energy(identity, amount).await } + async fn register_tld(&self, identity: &Identity, tld: Tld) -> spacetimedb::control_db::Result { + self.0.register_tld(identity, tld).await + } + + async fn create_dns_record( + &self, + identity: &Identity, + domain: &DomainName, + address: &Address, + ) -> spacetimedb::control_db::Result { + self.0.create_dns_record(identity, domain, address).await + } +} + +#[async_trait] +impl NodeDelegate for ArcEnv { + fn gather_metrics(&self) -> Vec { + self.0.gather_metrics() + } + + fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController { + self.0.database_instance_context_controller() + } + + fn host_controller(&self) -> &Arc { + self.0.host_controller() + } + + fn client_actor_index(&self) -> &ClientActorIndex { + self.0.client_actor_index() + } + fn public_key(&self) -> &DecodingKey { self.0.public_key() } + + fn public_key_bytes(&self) -> &[u8] { + self.0.public_key_bytes() + } + fn private_key(&self) -> &EncodingKey { self.0.private_key() } - fn public_key_bytes(&self) -> &[u8] { - self.0.public_key_bytes() + + fn sendgrid_controller(&self) -> Option<&SendGridController> { + self.0.sendgrid_controller() + } + + async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result { + self.0.load_module_host_context(db, instance_id).await + } +} + +impl ControlStateReadAccess for Arc { + // Nodes + fn get_node_id(&self) -> Option { + (**self).get_node_id() + } + fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result> { + (**self).get_node_by_id(node_id) + } + fn get_nodes(&self) -> spacetimedb::control_db::Result> { + (**self).get_nodes() + } + + // Databases + fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + (**self).get_database_by_id(id) + } + fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result> { + (**self).get_database_by_address(address) + } + fn get_databases(&self) -> spacetimedb::control_db::Result> { + (**self).get_databases() + } + + // Database instances + fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + (**self).get_database_instance_by_id(id) + } + fn get_database_instances(&self) -> spacetimedb::control_db::Result> { + (**self).get_database_instances() + } + fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { + (**self).get_leader_database_instance_by_database(database_id) + } + + // Identities + fn get_identities_for_email(&self, email: &str) -> spacetimedb::control_db::Result> { + (**self).get_identities_for_email(email) + } + fn get_recovery_codes(&self, email: &str) -> spacetimedb::control_db::Result> { + (**self).get_recovery_codes(email) + } + + // Energy + fn get_energy_balance(&self, identity: &Identity) -> spacetimedb::control_db::Result> { + (**self).get_energy_balance(identity) + } + + // DNS + fn lookup_address(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { + (**self).lookup_address(domain) + } + + fn reverse_lookup(&self, address: &Address) -> spacetimedb::control_db::Result> { + (**self).reverse_lookup(address) } } #[async_trait] -impl ControlNodeDelegate for Arc { - async fn spacetime_dns(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { - (**self).spacetime_dns(domain).await +impl ControlStateWriteAccess for Arc { + async fn create_address(&self) -> spacetimedb::control_db::Result
{ + (**self).create_address().await + } + + async fn publish_database( + &self, + identity: &Identity, + spec: DatabaseDef, + ) -> spacetimedb::control_db::Result> { + (**self).publish_database(identity, spec).await + } + + async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()> { + (**self).delete_database(identity, address).await + } + + async fn create_identity(&self) -> spacetimedb::control_db::Result { + (**self).create_identity().await + } + + async fn add_email(&self, identity: &Identity, email: &str) -> spacetimedb::control_db::Result<()> { + (**self).add_email(identity, email).await } - async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result { - (**self).alloc_spacetime_identity().await + async fn insert_recovery_code( + &self, + identity: &Identity, + email: &str, + code: RecoveryCode, + ) -> spacetimedb::control_db::Result<()> { + (**self).insert_recovery_code(identity, email, code).await } + async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + (**self).add_energy(identity, amount).await + } async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { (**self).withdraw_energy(identity, amount).await } + async fn register_tld(&self, identity: &Identity, tld: Tld) -> spacetimedb::control_db::Result { + (**self).register_tld(identity, tld).await + } + + async fn create_dns_record( + &self, + identity: &Identity, + domain: &DomainName, + address: &Address, + ) -> spacetimedb::control_db::Result { + (**self).create_dns_record(identity, domain, address).await + } +} + +#[async_trait] +impl NodeDelegate for Arc { + fn gather_metrics(&self) -> Vec { + (**self).gather_metrics() + } + + fn database_instance_context_controller(&self) -> &DatabaseInstanceContextController { + (**self).database_instance_context_controller() + } + + fn host_controller(&self) -> &Arc { + (**self).host_controller() + } + + fn client_actor_index(&self) -> &ClientActorIndex { + (**self).client_actor_index() + } + fn public_key(&self) -> &DecodingKey { (**self).public_key() } + + fn public_key_bytes(&self) -> &[u8] { + (**self).public_key_bytes() + } + fn private_key(&self) -> &EncodingKey { (**self).private_key() } - fn public_key_bytes(&self) -> &[u8] { - (**self).public_key_bytes() + + fn sendgrid_controller(&self) -> Option<&SendGridController> { + (**self).sendgrid_controller() + } + + async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result { + (**self).load_module_host_context(db, instance_id).await } } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index b270372ac3..3df5791317 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -1,5 +1,5 @@ use axum::body::Bytes; -use axum::extract::{DefaultBodyLimit, FromRef, Path, Query, State}; +use axum::extract::{DefaultBodyLimit, Path, Query, State}; use axum::response::{ErrorResponse, IntoResponse}; use axum::{headers, TypedHeader}; use chrono::Utc; @@ -22,14 +22,11 @@ use spacetimedb::json::client_api::StmtResultJson; use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType}; use spacetimedb::sql::execute::execute; use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::name::{ - self, DnsLookupResponse, DomainName, DomainParsingError, InsertDomainResult, PublishOp, PublishResult, -}; +use spacetimedb_lib::name::{self, DnsLookupResponse, DomainName, DomainParsingError, PublishOp, PublishResult}; use spacetimedb_lib::recovery::{RecoveryCode, RecoveryCodeResponse}; use spacetimedb_lib::sats::WithTypespace; use std::collections::HashMap; use std::convert::From; -use std::sync::Arc; use super::identity::IdentityForUrl; use crate::auth::{ @@ -37,7 +34,7 @@ use crate::auth::{ SpacetimeIdentityToken, }; use crate::util::{ByteStringBody, NameOrAddress}; -use crate::{log_and_500, ControlCtx, ControlNodeDelegate, WorkerCtx}; +use crate::{log_and_500, ControlStateDelegate, DatabaseDef, NodeDelegate}; #[derive(derive_more::From)] pub(crate) struct DomainParsingRejection(pub(crate) DomainParsingError); @@ -54,8 +51,8 @@ pub struct CallParams { reducer: String, } -pub async fn call( - State(worker_ctx): State>, +pub async fn call( + State(worker_ctx): State, auth: SpacetimeAuthHeader, Path(CallParams { name_or_address, @@ -66,19 +63,18 @@ pub async fn call( let SpacetimeAuth { identity: caller_identity, creds: caller_identity_token, - } = auth.get_or_create(&*worker_ctx).await?; + } = auth.get_or_create(&worker_ctx).await?; let args = ReducerArgs::Json(body); - let address = name_or_address.resolve(&*worker_ctx).await?.into(); - let database = worker_ctx_find_database(&*worker_ctx, &address).await?.ok_or_else(|| { + let address = name_or_address.resolve(&worker_ctx).await?.into(); + let database = worker_ctx_find_database(&worker_ctx, &address).await?.ok_or_else(|| { log::error!("Could not find database: {}", address.to_hex()); (StatusCode::NOT_FOUND, "No such database.") })?; let identity = database.identity; let database_instance = worker_ctx .get_leader_database_instance_by_database(database.id) - .await .ok_or(( StatusCode::NOT_FOUND, "Database instance not scheduled to this node yet.", @@ -166,7 +162,7 @@ pub struct DatabaseInformation { /// where credentials are required (e.g. publish), so for now we're just going to keep this as is, but we're /// going to generate a new set of credentials if you don't provide them. async fn extract_db_call_info( - ctx: &dyn WorkerCtx, + ctx: &(impl ControlStateDelegate + NodeDelegate + ?Sized), auth: SpacetimeAuthHeader, address: &Address, ) -> Result { @@ -177,7 +173,7 @@ async fn extract_db_call_info( (StatusCode::NOT_FOUND, "No such database.") })?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id).await.ok_or(( + let database_instance = ctx.get_leader_database_instance_by_database(database.id).ok_or(( StatusCode::NOT_FOUND, "Database instance not scheduled to this node yet.", ))?; @@ -230,8 +226,8 @@ pub struct DescribeQueryParams { expand: Option, } -pub async fn describe( - State(worker_ctx): State>, +pub async fn describe( + State(worker_ctx): State, Path(DescribeParams { name_or_address, entity_type, @@ -239,13 +235,16 @@ pub async fn describe( }): Path, Query(DescribeQueryParams { expand }): Query, auth: SpacetimeAuthHeader, -) -> axum::response::Result { - let address = name_or_address.resolve(&*worker_ctx).await?.into(); - let database = worker_ctx_find_database(&*worker_ctx, &address) +) -> axum::response::Result +where + S: ControlStateDelegate + NodeDelegate, +{ + let address = name_or_address.resolve(&worker_ctx).await?.into(); + let database = worker_ctx_find_database(&worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let call_info = extract_db_call_info(&*worker_ctx, auth, &address).await?; + let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?; let instance_id = call_info.database_instance.id; let host = worker_ctx.host_controller(); @@ -288,18 +287,21 @@ pub async fn describe( pub struct CatalogParams { name_or_address: NameOrAddress, } -pub async fn catalog( - State(worker_ctx): State>, +pub async fn catalog( + State(worker_ctx): State, Path(CatalogParams { name_or_address }): Path, Query(DescribeQueryParams { expand }): Query, auth: SpacetimeAuthHeader, -) -> axum::response::Result { - let address = name_or_address.resolve(&*worker_ctx).await?.into(); - let database = worker_ctx_find_database(&*worker_ctx, &address) +) -> axum::response::Result +where + S: ControlStateDelegate + NodeDelegate, +{ + let address = name_or_address.resolve(&worker_ctx).await?.into(); + let database = worker_ctx_find_database(&worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let call_info = extract_db_call_info(&*worker_ctx, auth, &address).await?; + let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?; let instance_id = call_info.database_instance.id; let host = worker_ctx.host_controller(); @@ -336,14 +338,14 @@ pub async fn catalog( pub struct InfoParams { name_or_address: NameOrAddress, } -pub async fn info( - State(worker_ctx): State>, +pub async fn info( + State(worker_ctx): State, Path(InfoParams { name_or_address }): Path, ) -> axum::response::Result { log::trace!("Trying to resolve address: {:?}", name_or_address); - let address = name_or_address.resolve(&*worker_ctx).await?.into(); + let address = name_or_address.resolve(&worker_ctx).await?.into(); log::trace!("Resolved address to: {address:?}"); - let database = worker_ctx_find_database(&*worker_ctx, &address) + let database = worker_ctx_find_database(&worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; log::trace!("Fetched database from the worker db for address: {address:?}"); @@ -378,12 +380,15 @@ fn auth_or_unauth(auth: SpacetimeAuthHeader) -> axum::response::Result>, +pub async fn logs( + State(worker_ctx): State, Path(LogsParams { name_or_address }): Path, Query(LogsQuery { num_lines, follow }): Query, auth: SpacetimeAuthHeader, -) -> axum::response::Result { +) -> axum::response::Result +where + S: ControlStateDelegate + NodeDelegate, +{ // You should not be able to read the logs from a database that you do not own // so, unless you are the owner, this will fail. // TODO: This returns `UNAUTHORIZED` on failure, @@ -392,8 +397,8 @@ pub async fn logs( // Should all the others change? let auth = auth_or_unauth(auth)?; - let address = name_or_address.resolve(&*worker_ctx).await?.into(); - let database = worker_ctx_find_database(&*worker_ctx, &address) + let address = name_or_address.resolve(&worker_ctx).await?.into(); + let database = worker_ctx_find_database(&worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; @@ -411,7 +416,6 @@ pub async fn logs( let database_instance = worker_ctx .get_leader_database_instance_by_database(database.id) - .await .ok_or(( StatusCode::NOT_FOUND, "Database instance not scheduled to this node yet.", @@ -467,10 +471,10 @@ fn mime_ndjson() -> mime::Mime { } async fn worker_ctx_find_database( - worker_ctx: &dyn WorkerCtx, + worker_ctx: &(impl ControlStateDelegate + ?Sized), address: &Address, ) -> Result, StatusCode> { - worker_ctx.get_database_by_address(address).await.map_err(log_and_500) + worker_ctx.get_database_by_address(address).map_err(log_and_500) } #[derive(Deserialize)] @@ -481,19 +485,22 @@ pub struct SqlParams { #[derive(Deserialize)] pub struct SqlQueryParams {} -pub async fn sql( - State(worker_ctx): State>, +pub async fn sql( + State(worker_ctx): State, Path(SqlParams { name_or_address }): Path, Query(SqlQueryParams {}): Query, auth: SpacetimeAuthHeader, body: String, -) -> axum::response::Result { +) -> axum::response::Result +where + S: NodeDelegate + ControlStateDelegate, +{ // Anyone is authorized to execute SQL queries. The SQL engine will determine // which queries this identity is allowed to execute against the database. - let auth = auth.get_or_create(&*worker_ctx).await?; + let auth = auth.get().ok_or((StatusCode::UNAUTHORIZED, "Invalid credentials."))?; - let address = name_or_address.resolve(&*worker_ctx).await?.into(); - let database = worker_ctx_find_database(&*worker_ctx, &address) + let address = name_or_address.resolve(&worker_ctx).await?.into(); + let database = worker_ctx_find_database(&worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; @@ -501,7 +508,6 @@ pub async fn sql( log::debug!("auth: {auth:?}"); let database_instance = worker_ctx .get_leader_database_instance_by_database(database.id) - .await .ok_or(( StatusCode::NOT_FOUND, "Database instance not scheduled to this node yet.", @@ -563,13 +569,13 @@ pub struct ReverseDNSParams { #[derive(Deserialize)] pub struct DNSQueryParams {} -pub async fn dns( - State(ctx): State>, +pub async fn dns( + State(ctx): State, Path(DNSParams { database_name }): Path, Query(DNSQueryParams {}): Query, ) -> axum::response::Result { let domain = database_name.parse().map_err(DomainParsingRejection)?; - let address = ctx.control_db().spacetime_dns(&domain).await.map_err(log_and_500)?; + let address = ctx.lookup_address(&domain).map_err(log_and_500)?; let response = if let Some(address) = address { DnsLookupResponse::Success { domain, @@ -582,15 +588,11 @@ pub async fn dns( Ok(axum::Json(response)) } -pub async fn reverse_dns( - State(ctx): State>, +pub async fn reverse_dns( + State(ctx): State, Path(ReverseDNSParams { database_address }): Path, ) -> axum::response::Result { - let names = ctx - .control_db() - .spacetime_reverse_dns(&database_address) - .await - .map_err(log_and_500)?; + let names = ctx.reverse_lookup(&database_address).map_err(log_and_500)?; let response = name::ReverseDNSResponse { names }; Ok(axum::Json(response)) @@ -606,8 +608,8 @@ fn auth_or_bad_request(auth: SpacetimeAuthHeader) -> axum::response::Result>, +pub async fn register_tld( + State(ctx): State, Query(RegisterTldParams { tld }): Query, auth: SpacetimeAuthHeader, ) -> axum::response::Result { @@ -616,11 +618,7 @@ pub async fn register_tld( let auth = auth_or_bad_request(auth)?; let tld = tld.parse::().map_err(DomainParsingRejection)?.into(); - let result = ctx - .control_db() - .spacetime_register_tld(tld, auth.identity) - .await - .map_err(log_and_500)?; + let result = ctx.register_tld(&auth.identity, tld).await.map_err(log_and_500)?; Ok(axum::Json(result)) } @@ -633,8 +631,8 @@ pub struct RequestRecoveryCodeParams { identity: IdentityForUrl, } -pub async fn request_recovery_code( - State(ctx): State>, +pub async fn request_recovery_code( + State(ctx): State, Query(RequestRecoveryCodeParams { link, email, identity }): Query, ) -> axum::response::Result { let identity = Identity::from(identity); @@ -644,7 +642,6 @@ pub async fn request_recovery_code( }; if !ctx - .control_db() .get_identities_for_email(email.as_str()) .map_err(log_and_500)? .iter() @@ -664,8 +661,7 @@ pub async fn request_recovery_code( generation_time: Utc::now(), identity: identity.to_hex(), }; - ctx.control_db() - .spacetime_insert_recovery_code(email.as_str(), recovery_code) + ctx.insert_recovery_code(&identity, email.as_str(), recovery_code) .await .map_err(log_and_500)?; @@ -687,16 +683,16 @@ pub struct ConfirmRecoveryCodeParams { /// we are providing a login token to the user initiating the request. We want to make /// sure there aren't any logical issues in here that would allow a user to request a token /// for an identity that they don't have authority over. -pub async fn confirm_recovery_code( - State(ctx): State>, +pub async fn confirm_recovery_code( + State(ctx): State, Query(ConfirmRecoveryCodeParams { email, identity, code }): Query, ) -> axum::response::Result { let identity = Identity::from(identity); - let recovery_code = ctx - .control_db() - .spacetime_get_recovery_code(email.as_str(), code.as_str()) - .await - .map_err(log_and_500)? + let recovery_codes = ctx.get_recovery_codes(email.as_str()).map_err(log_and_500)?; + + let recovery_code = recovery_codes + .into_iter() + .find(|rc| rc.code == code.as_str()) .ok_or((StatusCode::BAD_REQUEST, "Recovery code not found."))?; let duration = Utc::now() - recovery_code.generation_time; @@ -714,7 +710,6 @@ pub async fn confirm_recovery_code( } if !ctx - .control_db() .get_identities_for_email(email.as_str()) .map_err(log_and_500)? .iter() @@ -734,171 +729,103 @@ pub async fn confirm_recovery_code( Ok(axum::Json(result)) } -async fn control_ctx_find_database(ctx: &dyn ControlCtx, address: &Address) -> Result, StatusCode> { - ctx.control_db() - .get_database_by_address(address) - .await - .map_err(log_and_500) -} - #[derive(Deserialize)] pub struct PublishDatabaseParams {} #[derive(Deserialize)] pub struct PublishDatabaseQueryParams { - host_type: Option, #[serde(default)] clear: bool, name_or_address: Option, - #[serde(default)] - register_tld: bool, } -pub async fn publish( - State(ctx): State>, +pub async fn publish( + State(ctx): State, Path(PublishDatabaseParams {}): Path, Query(query_params): Query, auth: SpacetimeAuthHeader, body: Bytes, ) -> axum::response::Result> { - let PublishDatabaseQueryParams { - name_or_address, - host_type, - clear, - register_tld, - } = query_params; + let PublishDatabaseQueryParams { name_or_address, clear } = query_params; // You should not be able to publish to a database that you do not own // so, unless you are the owner, this will fail. let auth = auth_or_bad_request(auth)?; - let specified_address = matches!(name_or_address, Some(NameOrAddress::Address(_))); - - // Parse the address or convert the name to a usable address - let db_address = if let Some(name_or_address) = name_or_address.clone() { - match name_or_address.try_resolve(&*ctx).await? { + let (db_addr, db_name) = match name_or_address { + Some(noa) => match noa.try_resolve(&ctx).await? { Ok(resolved) => resolved.into(), Err(domain) => { - // Client specified a name which doesn't yet exist - // Create a new DNS record and a new address to assign to it - let address = ctx.control_db().alloc_spacetime_address().await.map_err(log_and_500)?; - let result = ctx - .control_db() - .spacetime_insert_domain(&address, domain, auth.identity, register_tld) + // `name_or_address` was a `NameOrAddress::Name`, but no record + // exists yet. Create it now with a fresh address. + let addr = ctx.create_address().await.map_err(log_and_500)?; + ctx.create_dns_record(&auth.identity, &domain, &addr) .await .map_err(log_and_500)?; - match result { - InsertDomainResult::Success { .. } => {} - InsertDomainResult::TldNotRegistered { domain } => { - return Ok(axum::Json(PublishResult::TldNotRegistered { domain })) - } - InsertDomainResult::PermissionDenied { domain } => { - return Ok(axum::Json(PublishResult::PermissionDenied { domain })) - } - } - - address + (addr, Some(domain)) } + }, + None => { + let addr = ctx.create_address().await.map_err(log_and_500)?; + (addr, None) } - } else { - // No domain or address was specified, create a new one - ctx.control_db().alloc_spacetime_address().await.map_err(log_and_500)? }; - log::trace!("Publishing to the address: {}", db_address.to_hex()); + log::trace!("Publishing to the address: {}", db_addr.to_hex()); - let host_type = match host_type { - None => HostType::Wasmer, - Some(ht) => ht - .parse() - .map_err(|_| (StatusCode::BAD_REQUEST, format!("unknown host type {ht}")))?, - }; + let op = { + let exists = ctx.get_database_by_address(&db_addr).map_err(log_and_500)?.is_some(); - let program_bytes_addr = ctx.object_db().insert_object(body.into()).unwrap(); + if clear && exists { + ctx.delete_database(&auth.identity, &db_addr) + .await + .map_err(log_and_500)?; + } - let num_replicas = 1; + if exists { + PublishOp::Updated + } else { + PublishOp::Created + } + }; - let op = match control_ctx_find_database(&*ctx, &db_address).await? { - Some(db) => { - if db.identity != auth.identity { - return Err((StatusCode::BAD_REQUEST, "Identity does not own this database.").into()); - } + let maybe_updated = ctx + .publish_database( + &auth.identity, + DatabaseDef { + address: db_addr, + program_bytes: body.into(), + num_replicas: 1, + }, + ) + .await + .map_err(log_and_500)?; - if clear { - ctx.insert_database( - &db_address, - &auth.identity, - &program_bytes_addr, - host_type, - num_replicas, - clear, - ) - .await - .map_err(log_and_500)?; - PublishOp::Created - } else { - let res = ctx - .update_database(&db_address, &program_bytes_addr, num_replicas) - .await - .map_err(log_and_500)?; - if let Some(res) = res { - let success = match res { - Ok(success) => success, - Err(e) => { - return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {e}")).into()); - } - }; - if let UpdateDatabaseSuccess { - update_result: Some(update_result), - migrate_results: _, - } = success - { - match reducer_outcome_response(&auth.identity, "update", update_result.outcome) { - (StatusCode::OK, _) => {} - (status, body) => return Err((status, body).into()), - } + if let Some(updated) = maybe_updated { + match updated { + Ok(success) => { + if let UpdateDatabaseSuccess { + // An update reducer was defined, and it was run + update_result: Some(update_result), + // Not yet implemented + migrate_results: _, + } = success + { + let ror = reducer_outcome_response(&auth.identity, "update", update_result.outcome); + if !matches!(ror, (StatusCode::OK, _)) { + return Err(ror.into()); } } - - log::debug!("Updated database {}", db_address.to_hex()); - PublishOp::Updated } + Err(e) => return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {e}")).into()), } - None if specified_address => { - return Err(( - StatusCode::NOT_FOUND, - format!("Failed to find database at address: {}", db_address.to_hex()), - ) - .into()) - } - None => { - ctx.insert_database( - &db_address, - &auth.identity, - &program_bytes_addr, - host_type, - num_replicas, - false, - ) - .await - .map_err(log_and_500)?; - PublishOp::Created - } - }; + } - let response = PublishResult::Success { - domain: name_or_address.and_then(|noa| match noa { - NameOrAddress::Address(_) => None, - NameOrAddress::Name(name) => Some(name), - }), - address: db_address.to_hex(), + Ok(axum::Json(PublishResult::Success { + domain: db_name.as_ref().map(ToString::to_string), + address: db_addr.to_hex(), op, - }; - - //TODO(tyler): Eventually we want it to be possible to publish a database - // which no one has the credentials to. In that case we wouldn't want to - // return a token. - Ok(axum::Json(response)) + })) } #[derive(Deserialize)] @@ -906,107 +833,91 @@ pub struct DeleteDatabaseParams { address: Address, } -pub async fn delete_database( - State(ctx): State>, +pub async fn delete_database( + State(ctx): State, Path(DeleteDatabaseParams { address }): Path, auth: SpacetimeAuthHeader, ) -> axum::response::Result { let auth = auth_or_bad_request(auth)?; - match control_ctx_find_database(&*ctx, &address).await? { - Some(db) => { - if db.identity != auth.identity { - Err((StatusCode::BAD_REQUEST, "Identity does not own this database.").into()) - } else { - ctx.delete_database(&address) - .await - .map_err(log_and_500) - .map_err(Into::into) - } - } - None => Ok(()), - } + ctx.delete_database(&auth.identity, &address) + .await + .map_err(log_and_500)?; + + Ok(()) } #[derive(Deserialize)] pub struct SetNameQueryParams { domain: String, address: Address, - #[serde(default)] - register_tld: bool, } -pub async fn set_name( - State(ctx): State>, - Query(SetNameQueryParams { - domain, - address, - register_tld, - }): Query, +pub async fn set_name( + State(ctx): State, + Query(SetNameQueryParams { domain, address }): Query, auth: SpacetimeAuthHeader, ) -> axum::response::Result { let auth = auth_or_bad_request(auth)?; let database = ctx - .control_db() .get_database_by_address(&address) - .await .map_err(log_and_500)? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; if database.identity != auth.identity { - return Err((StatusCode::BAD_REQUEST, "Identity does not own database.").into()); + return Err((StatusCode::UNAUTHORIZED, "Identity does not own database.").into()); } let domain = domain.parse().map_err(DomainParsingRejection)?; let response = ctx - .control_db() - .spacetime_insert_domain(&address, domain, auth.identity, register_tld) + .create_dns_record(&auth.identity, &domain, &address) .await - .map_err(log_and_500)?; + .map_err(|err| match err { + spacetimedb::control_db::Error::RecordAlreadyExists(_) => StatusCode::CONFLICT, + _ => log_and_500(err), + })?; Ok(axum::Json(response)) } /// This API call is just designed to allow clients to determine whether or not they can /// establish a connection to SpacetimeDB. This API call doesn't actually do anything. -pub async fn ping( - State(_ctx): State>, - _auth: SpacetimeAuthHeader, -) -> axum::response::Result { +pub async fn ping(State(_ctx): State, _auth: SpacetimeAuthHeader) -> axum::response::Result { Ok(()) } pub fn control_routes() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: NodeDelegate + ControlStateDelegate + Clone + 'static, { use axum::routing::{get, post}; axum::Router::new() - .route("/dns/:database_name", get(dns)) - .route("/reverse_dns/:database_address", get(reverse_dns)) - .route("/set_name", get(set_name)) - .route("/ping", get(ping)) - .route("/register_tld", get(register_tld)) - .route("/request_recovery_code", get(request_recovery_code)) - .route("/confirm_recovery_code", get(confirm_recovery_code)) - .route("/publish", post(publish).layer(DefaultBodyLimit::disable())) - .route("/delete/:address", post(delete_database)) + .route("/dns/:database_name", get(dns::)) + .route("/reverse_dns/:database_address", get(reverse_dns::)) + .route("/set_name", get(set_name::)) + .route("/ping", get(ping::)) + .route("/register_tld", get(register_tld::)) + .route("/request_recovery_code", get(request_recovery_code::)) + .route("/confirm_recovery_code", get(confirm_recovery_code::)) + .route("/publish", post(publish::).layer(DefaultBodyLimit::disable())) + .route("/delete/:address", post(delete_database::)) } pub fn worker_routes() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: NodeDelegate + ControlStateDelegate + Clone + 'static, { use axum::routing::{get, post}; axum::Router::new() - .route("/subscribe/:name_or_address", get(super::subscribe::handle_websocket)) - .route("/call/:name_or_address/:reducer", post(call)) - .route("/schema/:name_or_address/:entity_type/:entity", get(describe)) - .route("/schema/:name_or_address", get(catalog)) - .route("/info/:name_or_address", get(info)) - .route("/logs/:name_or_address", get(logs)) - .route("/sql/:name_or_address", post(sql)) + .route( + "/subscribe/:name_or_address", + get(super::subscribe::handle_websocket::), + ) + .route("/call/:name_or_address/:reducer", post(call::)) + .route("/schema/:name_or_address/:entity_type/:entity", get(describe::)) + .route("/schema/:name_or_address", get(catalog::)) + .route("/info/:name_or_address", get(info::)) + .route("/logs/:name_or_address", get(logs::)) + .route("/sql/:name_or_address", post(sql::)) } diff --git a/crates/client-api/src/routes/energy.rs b/crates/client-api/src/routes/energy.rs index d640628a0a..c3d3517190 100644 --- a/crates/client-api/src/routes/energy.rs +++ b/crates/client-api/src/routes/energy.rs @@ -1,6 +1,4 @@ -use std::sync::Arc; - -use axum::extract::{FromRef, Path, Query, State}; +use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use http::StatusCode; use serde::Deserialize; @@ -10,7 +8,7 @@ use spacetimedb::host::EnergyQuanta; use spacetimedb_lib::Identity; use crate::auth::SpacetimeAuthHeader; -use crate::{log_and_500, ControlCtx, ControlNodeDelegate}; +use crate::{log_and_500, ControlStateDelegate, NodeDelegate}; use super::identity::IdentityForUrl; @@ -19,22 +17,63 @@ pub struct IdentityParams { identity: IdentityForUrl, } -pub async fn get_energy_balance( - State(ctx): State>, +pub async fn get_energy_balance( + State(ctx): State, Path(IdentityParams { identity }): Path, ) -> axum::response::Result { let identity = Identity::from(identity); + get_budget_inner(ctx, &identity) +} + +#[derive(Deserialize)] +pub struct AddEnergyQueryParams { + amount: Option, +} +pub async fn add_energy( + State(ctx): State, + Query(AddEnergyQueryParams { amount }): Query, + auth: SpacetimeAuthHeader, +) -> axum::response::Result { + let Some(auth) = auth.auth else { + return Err(StatusCode::UNAUTHORIZED.into()); + }; + // Nb.: Negative amount withdraws + let amount = amount.map(|s| s.parse::()).transpose().map_err(|e| { + log::error!("Failed to parse amount: {e:?}"); + StatusCode::BAD_REQUEST + })?; + + let mut balance = ctx + .get_energy_balance(&auth.identity) + .map_err(log_and_500)? + .map(|quanta| quanta.0) + .unwrap_or(0); + + if let Some(satoshi) = amount { + ctx.add_energy(&auth.identity, EnergyQuanta(satoshi)) + .await + .map_err(log_and_500)?; + balance += satoshi; + } - // Note: Consult the write-through cache on control_budget, not the control_db directly. + let response_json = json!({ + // Note: balance must be returned as a string to avoid truncation. + "balance": balance.to_string(), + }); + + Ok(axum::Json(response_json)) +} + +fn get_budget_inner(ctx: impl ControlStateDelegate, identity: &Identity) -> axum::response::Result { let balance = ctx - .control_db() - .get_energy_balance(&identity) + .get_energy_balance(identity) .map_err(log_and_500)? - .unwrap_or(EnergyQuanta(0)); + .map(|quanta| quanta.0) + .unwrap_or(0); let response_json = json!({ // Note: balance must be returned as a string to avoid truncation. - "balance": balance.0.to_string(), + "balance": balance.to_string(), }); Ok(axum::Json(response_json)) @@ -44,8 +83,8 @@ pub async fn get_energy_balance( pub struct SetEnergyBalanceQueryParams { balance: Option, } -pub async fn set_energy_balance( - State(ctx): State>, +pub async fn set_energy_balance( + State(ctx): State, Path(IdentityParams { identity }): Path, Query(SetEnergyBalanceQueryParams { balance }): Query, auth: SpacetimeAuthHeader, @@ -64,23 +103,37 @@ pub async fn set_energy_balance( let identity = Identity::from(identity); - let balance = balance + let desired_balance = balance .map(|balance| balance.parse::()) .transpose() .map_err(|err| { log::error!("Failed to parse balance: {:?}", err); StatusCode::BAD_REQUEST - })?; - let balance = EnergyQuanta(balance.unwrap_or(0)); - - ctx.control_db() - .set_energy_balance(identity, balance) - .await - .map_err(log_and_500)?; + })? + .unwrap_or(0); + let current_balance = ctx + .get_energy_balance(&identity) + .map_err(log_and_500)? + .map(|quanta| quanta.0) + .unwrap_or(0); + + let balance: i128 = if desired_balance > current_balance { + let delta = desired_balance - current_balance; + ctx.add_energy(&identity, EnergyQuanta(delta)) + .await + .map_err(log_and_500)?; + delta + } else { + let delta = current_balance - desired_balance; + ctx.withdraw_energy(&identity, EnergyQuanta(delta)) + .await + .map_err(log_and_500)?; + delta + }; let response_json = json!({ // Note: balance must be returned as a string to avoid truncation. - "balance": balance.0.to_string(), + "balance": balance.to_string(), }); Ok(axum::Json(response_json)) @@ -88,11 +141,11 @@ pub async fn set_energy_balance( pub fn router() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: NodeDelegate + ControlStateDelegate + Clone + 'static, { - use axum::routing::{get, post}; + use axum::routing::{get, post, put}; axum::Router::new() - .route("/:identity", get(get_energy_balance)) - .route("/:identity", post(set_energy_balance)) + .route("/:identity", get(get_energy_balance::)) + .route("/:identity", post(set_energy_balance::)) + .route("/:identity", put(add_energy::)) } diff --git a/crates/client-api/src/routes/identity.rs b/crates/client-api/src/routes/identity.rs index 8deaf36527..e1ca795baa 100644 --- a/crates/client-api/src/routes/identity.rs +++ b/crates/client-api/src/routes/identity.rs @@ -1,15 +1,14 @@ -use std::sync::Arc; - -use axum::extract::{FromRef, Path, Query, State}; +use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; use http::StatusCode; use serde::{Deserialize, Serialize}; + use spacetimedb::auth::identity::encode_token_with_expiry; use spacetimedb_lib::de::serde::DeserializeWrapper; use spacetimedb_lib::Identity; -pub use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader}; -use crate::{log_and_500, ControlCtx, ControlNodeDelegate}; +use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader}; +use crate::{log_and_500, ControlStateDelegate, ControlStateWriteAccess, NodeDelegate}; #[derive(Deserialize)] pub struct CreateIdentityQueryParams { @@ -22,16 +21,15 @@ pub struct CreateIdentityResponse { token: String, } -pub async fn create_identity( - State(ctx): State>, +pub async fn create_identity( + State(ctx): State, Query(CreateIdentityQueryParams { email }): Query, ) -> axum::response::Result { - let auth = SpacetimeAuth::alloc(&*ctx).await?; + let auth = SpacetimeAuth::alloc(&ctx).await?; if let Some(email) = email { - ctx.control_db() - .associate_email_spacetime_identity(auth.identity, email.as_str()) + ctx.add_email(&auth.identity, email.as_str()) .await - .unwrap(); + .map_err(log_and_500)?; } let identity_response = CreateIdentityResponse { @@ -56,17 +54,14 @@ pub struct GetIdentityResponseEntry { pub struct GetIdentityQueryParams { email: Option, } -pub async fn get_identity( - State(ctx): State>, +pub async fn get_identity( + State(ctx): State, Query(GetIdentityQueryParams { email }): Query, ) -> axum::response::Result { let lookup = match email { None => None, Some(email) => { - let identities = ctx - .control_db() - .get_identities_for_email(email.as_str()) - .map_err(log_and_500)?; + let identities = ctx.get_identities_for_email(email.as_str()).map_err(log_and_500)?; if identities.is_empty() { None } else { @@ -118,8 +113,8 @@ pub struct SetEmailQueryParams { email: email_address::EmailAddress, } -pub async fn set_email( - State(ctx): State>, +pub async fn set_email( + State(ctx): State, Path(SetEmailParams { identity }): Path, Query(SetEmailQueryParams { email }): Query, auth: SpacetimeAuthHeader, @@ -130,11 +125,7 @@ pub async fn set_email( if auth.identity != identity { return Err(StatusCode::UNAUTHORIZED.into()); } - - ctx.control_db() - .associate_email_spacetime_identity(identity, email.as_str()) - .await - .unwrap(); + ctx.add_email(&identity, email.as_str()).await.map_err(log_and_500)?; Ok(()) } @@ -149,13 +140,13 @@ pub struct GetDatabasesResponse { addresses: Vec, } -pub async fn get_databases( - State(ctx): State>, +pub async fn get_databases( + State(ctx): State, Path(GetDatabasesParams { identity }): Path, ) -> axum::response::Result { let identity = identity.into(); // Linear scan for all databases that have this identity, and return their addresses - let all_dbs = ctx.control_db().get_databases().await.map_err(|e| { + let all_dbs = ctx.get_databases().map_err(|e| { log::error!("Failure when retrieving databases for search: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -172,8 +163,8 @@ pub struct WebsocketTokenResponse { pub token: String, } -pub async fn create_websocket_token( - State(ctx): State>, +pub async fn create_websocket_token( + State(ctx): State, auth: SpacetimeAuthHeader, ) -> axum::response::Result { match auth.auth { @@ -206,7 +197,7 @@ pub async fn validate_token( } } -pub async fn get_public_key(State(ctx): State>) -> axum::response::Result { +pub async fn get_public_key(State(ctx): State) -> axum::response::Result { let res = Response::builder() .header("Content-Type", "application/pem-certificate-chain") .body(()) @@ -216,15 +207,14 @@ pub async fn get_public_key(State(ctx): State>) -> axum::res pub fn router() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: NodeDelegate + ControlStateDelegate + Clone + 'static, { use axum::routing::{get, post}; axum::Router::new() - .route("/", get(get_identity).post(create_identity)) - .route("/public-key", get(get_public_key)) - .route("/websocket_token", post(create_websocket_token)) + .route("/", get(get_identity::).post(create_identity::)) + .route("/public-key", get(get_public_key::)) + .route("/websocket_token", post(create_websocket_token::)) .route("/:identity/verify", get(validate_token)) - .route("/:identity/set-email", post(set_email)) - .route("/:identity/databases", get(get_databases)) + .route("/:identity/set-email", post(set_email::)) + .route("/:identity/databases", get(get_databases::)) } diff --git a/crates/client-api/src/routes/metrics.rs b/crates/client-api/src/routes/metrics.rs index 3ed63ee359..15737be587 100644 --- a/crates/client-api/src/routes/metrics.rs +++ b/crates/client-api/src/routes/metrics.rs @@ -1,8 +1,7 @@ -use axum::extract::{FromRef, State}; +use axum::extract::State; use axum::response::IntoResponse; -use std::sync::Arc; -use crate::{ControlNodeDelegate, WorkerCtx}; +use crate::NodeDelegate; // #[derive(Clone, NewMiddleware)] // pub struct MetricsAuthMiddleware; @@ -16,7 +15,7 @@ use crate::{ControlNodeDelegate, WorkerCtx}; // } // } -pub async fn metrics(State(ctx): State>) -> axum::response::Result { +pub async fn metrics(State(ctx): State) -> axum::response::Result { let mut buf = String::new(); let mut encode_to_buffer = |mfs: &[_]| { @@ -33,11 +32,10 @@ pub async fn metrics(State(ctx): State>) -> axum::response::R pub fn router() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: NodeDelegate + Clone + 'static, { use axum::routing::get; - axum::Router::new().route("/", get(metrics)) + axum::Router::new().route("/", get(metrics::)) // TODO: // .layer(MetricsAuthMiddleware) } diff --git a/crates/client-api/src/routes/prometheus.rs b/crates/client-api/src/routes/prometheus.rs index 5e887d54cf..84aa75f4e9 100644 --- a/crates/client-api/src/routes/prometheus.rs +++ b/crates/client-api/src/routes/prometheus.rs @@ -1,10 +1,10 @@ -use axum::extract::{FromRef, State}; +use std::collections::HashMap; + +use axum::extract::State; use axum::response::IntoResponse; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use crate::{log_and_500, ControlCtx, ControlNodeDelegate}; +use crate::{log_and_500, ControlStateReadAccess}; #[derive(Serialize, Deserialize)] struct SDConfig { @@ -12,9 +12,11 @@ struct SDConfig { labels: HashMap, } -pub async fn get_sd_config(State(ctx): State>) -> axum::response::Result { +pub async fn get_sd_config( + State(ctx): State, +) -> axum::response::Result { // TODO(cloutiertyler): security - let nodes = ctx.control_db().get_nodes().await.map_err(log_and_500)?; + let nodes = ctx.get_nodes().map_err(log_and_500)?; let mut targets = Vec::new(); let labels = HashMap::new(); @@ -30,9 +32,8 @@ pub async fn get_sd_config(State(ctx): State>) -> axum::resp pub fn router() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: ControlStateReadAccess + Clone + Send + Sync + 'static, { use axum::routing::get; - axum::Router::new().route("/sd_config", get(get_sd_config)) + axum::Router::new().route("/sd_config", get(get_sd_config::)) } diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index da3e8ad89f..9cabbce367 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1,6 +1,5 @@ use std::mem; use std::pin::pin; -use std::sync::Arc; use std::time::Duration; use axum::extract::{Path, State}; @@ -20,7 +19,7 @@ use crate::util::websocket::{ CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade, }; use crate::util::{NameOrAddress, XForwardedFor}; -use crate::{log_and_500, WorkerCtx}; +use crate::{log_and_500, ControlStateDelegate, NodeDelegate}; #[allow(clippy::declare_interior_mutable_const)] pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static("v1.text.spacetimedb"); @@ -32,16 +31,19 @@ pub struct SubscribeParams { pub name_or_address: NameOrAddress, } -pub async fn handle_websocket( - State(worker_ctx): State>, +pub async fn handle_websocket( + State(ctx): State, Path(SubscribeParams { name_or_address }): Path, forwarded_for: Option>, auth: SpacetimeAuthHeader, ws: WebSocketUpgrade, -) -> axum::response::Result { - let auth = auth.get_or_create(&*worker_ctx).await?; +) -> axum::response::Result +where + S: NodeDelegate + ControlStateDelegate, +{ + let auth = auth.get_or_create(&ctx).await?; - let address = name_or_address.resolve(&*worker_ctx).await?.into(); + let address = name_or_address.resolve(&ctx).await?.into(); let (res, ws_upgrade, protocol) = ws.select_protocol([(BIN_PROTOCOL, Protocol::Binary), (TEXT_PROTOCOL, Protocol::Text)]); @@ -50,24 +52,24 @@ pub async fn handle_websocket( // TODO: Should also maybe refactor the code and the protocol to allow a single websocket // to connect to multiple modules - let database = worker_ctx + let database = ctx .get_database_by_address(&address) - .await .unwrap() .ok_or(StatusCode::BAD_REQUEST)?; - let database_instance = worker_ctx + let database_instance = ctx .get_leader_database_instance_by_database(database.id) - .await .ok_or(StatusCode::BAD_REQUEST)?; let instance_id = database_instance.id; let identity_token = auth.creds.token().to_owned(); - let host = worker_ctx.host_controller(); + let host = ctx.host_controller(); let module = match host.get_module_host(instance_id) { Ok(m) => m, Err(_) => { - let dbic = worker_ctx + // TODO(kim): probably wrong -- check if instance node id matches ours + log::debug!("creating fresh module host"); + let dbic = ctx .load_module_host_context(database, instance_id) .await .map_err(log_and_500)?; @@ -77,7 +79,7 @@ pub async fn handle_websocket( let client_id = ClientActorId { identity: auth.identity, - name: worker_ctx.client_actor_index().next_client_name(), + name: ctx.client_actor_index().next_client_name(), }; let ws_config = WebSocketConfig { diff --git a/crates/client-api/src/routes/tracelog.rs b/crates/client-api/src/routes/tracelog.rs index ad997ac68e..4fde2ff3da 100644 --- a/crates/client-api/src/routes/tracelog.rs +++ b/crates/client-api/src/routes/tracelog.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - use axum::body::Bytes; -use axum::extract::{FromRef, Path, State}; +use axum::extract::{Path, State}; use axum::response::IntoResponse; use http::StatusCode; use serde::Deserialize; -use spacetimedb_lib::Identity; use tempdir::TempDir; use spacetimedb::address::Address; @@ -15,23 +12,23 @@ use spacetimedb::hash::hash_bytes; use spacetimedb::host::instance_env::InstanceEnv; use spacetimedb::host::scheduler::Scheduler; use spacetimedb::host::tracelog::replay::replay_report; +use spacetimedb_lib::Identity; -use crate::{log_and_500, ControlNodeDelegate, WorkerCtx}; +use crate::{log_and_500, ControlStateReadAccess, NodeDelegate}; #[derive(Deserialize)] pub struct GetTraceParams { address: Address, } -pub async fn get_tracelog( - State(ctx): State>, +pub async fn get_tracelog( + State(ctx): State, Path(GetTraceParams { address }): Path, ) -> axum::response::Result { let database = ctx .get_database_by_address(&address) - .await .map_err(log_and_500)? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id).await; + let database_instance = ctx.get_leader_database_instance_by_database(database.id); let instance_id = database_instance.unwrap().id; let host = ctx.host_controller(); @@ -49,16 +46,15 @@ pub async fn get_tracelog( pub struct StopTraceParams { address: Address, } -pub async fn stop_tracelog( - State(ctx): State>, +pub async fn stop_tracelog( + State(ctx): State, Path(StopTraceParams { address }): Path, ) -> axum::response::Result { let database = ctx .get_database_by_address(&address) - .await .map_err(log_and_500)? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id).await; + let database_instance = ctx.get_leader_database_instance_by_database(database.id); let instance_id = database_instance.unwrap().id; let host = ctx.host_controller(); @@ -101,12 +97,11 @@ pub async fn perform_tracelog_replay(body: Bytes) -> axum::response::Result() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, + S: ControlStateReadAccess + NodeDelegate + Clone + 'static, { use axum::routing::{get, post}; axum::Router::new() - .route("/database/:address", get(get_tracelog)) - .route("/database/:address/stop", post(stop_tracelog)) + .route("/database/:address", get(get_tracelog::)) + .route("/database/:address/stop", post(stop_tracelog::)) .route("/replay", post(perform_tracelog_replay)) } diff --git a/crates/client-api/src/util.rs b/crates/client-api/src/util.rs index d30635c188..a11d1e37b2 100644 --- a/crates/client-api/src/util.rs +++ b/crates/client-api/src/util.rs @@ -15,7 +15,7 @@ use spacetimedb::address::Address; use spacetimedb_lib::name::DomainName; use crate::routes::database::DomainParsingRejection; -use crate::{log_and_500, ControlNodeDelegate}; +use crate::{log_and_500, ControlStateReadAccess}; pub struct ByteStringBody(pub ByteString); @@ -94,7 +94,7 @@ impl NameOrAddress { /// i.e. no corresponding [`Address`] exists. pub async fn try_resolve( &self, - ctx: &(impl ControlNodeDelegate + ?Sized), + ctx: &(impl ControlStateReadAccess + ?Sized), ) -> axum::response::Result> { Ok(match self { Self::Address(addr) => Ok(ResolvedAddress { @@ -103,7 +103,7 @@ impl NameOrAddress { }), Self::Name(name) => { let domain = name.parse().map_err(DomainParsingRejection)?; - let address = ctx.spacetime_dns(&domain).await.map_err(log_and_500)?; + let address = ctx.lookup_address(&domain).map_err(log_and_500)?; match address { Some(address) => Ok(ResolvedAddress { address, @@ -118,7 +118,10 @@ impl NameOrAddress { /// A variant of [`Self::try_resolve()`] which maps to a 400 (Bad Request) /// response if `self` is a [`NameOrAddress::Name`] for which no /// corresponding [`Address`] is found in the SpacetimeDB DNS. - pub async fn resolve(&self, ctx: &(impl ControlNodeDelegate + ?Sized)) -> axum::response::Result { + pub async fn resolve( + &self, + ctx: &(impl ControlStateReadAccess + ?Sized), + ) -> axum::response::Result { self.try_resolve(ctx).await?.map_err(|_| StatusCode::BAD_REQUEST.into()) } } @@ -170,3 +173,9 @@ impl From for Address { value.address } } + +impl From for (Address, Option) { + fn from(ResolvedAddress { address, domain }: ResolvedAddress) -> Self { + (address, domain) + } +} diff --git a/crates/core/src/control_db.rs b/crates/core/src/control_db.rs index 1d88f67647..9ac0508bc2 100644 --- a/crates/core/src/control_db.rs +++ b/crates/core/src/control_db.rs @@ -39,6 +39,10 @@ pub enum Error { ConnectionError(), #[error(transparent)] JSONDeserializationError(#[from] serde_json::Error), + #[error(transparent)] + Task(#[from] tokio::task::JoinError), + #[error(transparent)] + Other(#[from] anyhow::Error), } impl From for Error { @@ -72,7 +76,7 @@ impl ControlDb { } impl ControlDb { - pub async fn spacetime_dns(&self, domain: &DomainName) -> Result> { + pub fn spacetime_dns(&self, domain: &DomainName) -> Result> { let tree = self.db.open_tree("dns")?; let value = tree.get(domain.to_lowercase().as_bytes())?; if let Some(value) = value { @@ -81,7 +85,7 @@ impl ControlDb { Ok(None) } - pub async fn spacetime_reverse_dns(&self, address: &Address) -> Result> { + pub fn spacetime_reverse_dns(&self, address: &Address) -> Result> { let tree = self.db.open_tree("reverse_dns")?; let value = tree.get(address.as_slice())?; if let Some(value) = value { @@ -104,17 +108,18 @@ impl ControlDb { /// * `address` - The address the database name should point to /// * `database_name` - The database name to register /// * `owner_identity` - The identity that is publishing the database name - pub async fn spacetime_insert_domain( + pub fn spacetime_insert_domain( &self, address: &Address, domain: DomainName, owner_identity: Identity, try_register_tld: bool, ) -> Result { - if self.spacetime_dns(&domain).await?.is_some() { + if self.spacetime_dns(&domain)?.is_some() { return Err(Error::RecordAlreadyExists(domain)); } - match self.spacetime_lookup_tld(domain.tld()).await? { + let tld = domain.tld(); + match self.spacetime_lookup_tld(tld)? { Some(owner) => { if owner != owner_identity { return Ok(InsertDomainResult::PermissionDenied { domain }); @@ -123,7 +128,7 @@ impl ControlDb { None => { if try_register_tld { // Let's try to automatically register this TLD for the identity - let result = self.spacetime_register_tld(domain.to_tld(), owner_identity).await?; + let result = self.spacetime_register_tld(tld.to_owned(), owner_identity)?; if let RegisterTldResult::Success { .. } = result { // This identity now owns this TLD } else { @@ -163,7 +168,7 @@ impl ControlDb { /// /// * `domain` - The domain name to register /// * `owner_identity` - The identity that should own this domain name. - pub async fn spacetime_register_tld(&self, tld: Tld, owner_identity: Identity) -> Result { + pub fn spacetime_register_tld(&self, tld: Tld, owner_identity: Identity) -> Result { let tree = self.db.open_tree("top_level_domains")?; let key = tld.to_lowercase(); let current_owner = tree.get(&key)?; @@ -185,7 +190,7 @@ impl ControlDb { /// Starts a recovery code request /// /// * `email` - The email to send the recovery code to - pub async fn spacetime_insert_recovery_code(&self, email: &str, new_code: RecoveryCode) -> Result<()> { + pub fn spacetime_insert_recovery_code(&self, email: &str, new_code: RecoveryCode) -> Result<()> { // TODO(jdetter): This function should take an identity instead of an email let tree = self.db.open_tree("recovery_codes")?; let current_requests = tree.get(email.as_bytes())?; @@ -203,29 +208,32 @@ impl ControlDb { Ok(()) } - pub async fn spacetime_get_recovery_code(&self, email: &str, code: &str) -> Result> { + pub fn spacetime_get_recovery_codes(&self, email: &str) -> Result> { let tree = self.db.open_tree("recovery_codes")?; let current_requests = tree.get(email.as_bytes())?; - match current_requests { - None => Ok(None), - Some(codes_bytes) => { - let codes: Vec = serde_json::from_slice(&codes_bytes[..])?; - for recovery_code in codes { - if recovery_code.code == code { - return Ok(Some(recovery_code)); - } - } - - Ok(None) + current_requests + .map(|bytes| { + let codes: Vec = serde_json::from_slice(&bytes[..])?; + Ok(codes) + }) + .unwrap_or(Ok(vec![])) + } + + pub fn spacetime_get_recovery_code(&self, email: &str, code: &str) -> Result> { + for recovery_code in self.spacetime_get_recovery_codes(email)? { + if recovery_code.code == code { + return Ok(Some(recovery_code)); } } + + Ok(None) } /// Returns the owner (or `None` if there is no owner) of the domain. /// /// # Arguments /// * `domain` - The domain to lookup - pub async fn spacetime_lookup_tld(&self, domain: impl AsRef) -> Result> { + pub fn spacetime_lookup_tld(&self, domain: impl AsRef) -> Result> { let tree = self.db.open_tree("top_level_domains")?; match tree.get(domain.as_ref().to_lowercase().as_bytes())? { Some(owner) => Ok(Some(Identity::from_slice(&owner[..]))), @@ -233,7 +241,7 @@ impl ControlDb { } } - pub async fn alloc_spacetime_identity(&self) -> Result { + pub fn alloc_spacetime_identity(&self) -> Result { // TODO: this really doesn't need to be a single global count let id = self.db.generate_id()?; let bytes: &[u8] = &id.to_le_bytes(); @@ -243,7 +251,7 @@ impl ControlDb { Ok(hash) } - pub async fn alloc_spacetime_address(&self) -> Result
{ + pub fn alloc_spacetime_address(&self) -> Result
{ // TODO: this really doesn't need to be a single global count // We could do something more intelligent for addresses... // A. generating them randomly @@ -281,7 +289,7 @@ impl ControlDb { Ok(result) } - pub async fn get_databases(&self) -> Result> { + pub fn get_databases(&self) -> Result> { let tree = self.db.open_tree("database")?; let mut databases = Vec::new(); let scan_key: &[u8] = b""; @@ -293,8 +301,8 @@ impl ControlDb { Ok(databases) } - pub async fn get_database_by_id(&self, id: u64) -> Result> { - for database in self.get_databases().await? { + pub fn get_database_by_id(&self, id: u64) -> Result> { + for database in self.get_databases()? { if database.id == id { return Ok(Some(database)); } @@ -302,7 +310,7 @@ impl ControlDb { Ok(None) } - pub async fn get_database_by_address(&self, address: &Address) -> Result> { + pub fn get_database_by_address(&self, address: &Address) -> Result> { let tree = self.db.open_tree("database_by_address")?; let key = address.to_hex(); let value = tree.get(key.as_bytes())?; @@ -313,7 +321,7 @@ impl ControlDb { Ok(None) } - pub async fn insert_database(&self, mut database: Database) -> Result { + pub fn insert_database(&self, mut database: Database) -> Result { let id = self.db.generate_id()?; let tree = self.db.open_tree("database_by_address")?; @@ -334,7 +342,7 @@ impl ControlDb { Ok(id) } - pub async fn update_database(&self, database: Database) -> Result<()> { + pub fn update_database(&self, database: Database) -> Result<()> { let tree = self.db.open_tree("database")?; let tree_by_address = self.db.open_tree("database_by_address")?; let key = database.address.to_hex(); @@ -358,7 +366,7 @@ impl ControlDb { Ok(()) } - pub async fn delete_database(&self, id: u64) -> Result> { + pub fn delete_database(&self, id: u64) -> Result> { let tree = self.db.open_tree("database")?; let tree_by_address = self.db.open_tree("database_by_address")?; @@ -374,7 +382,7 @@ impl ControlDb { Ok(None) } - pub async fn get_database_instances(&self) -> Result> { + pub fn get_database_instances(&self) -> Result> { let tree = self.db.open_tree("database_instance")?; let mut database_instances = Vec::new(); let scan_key: &[u8] = b""; @@ -386,8 +394,8 @@ impl ControlDb { Ok(database_instances) } - pub async fn get_database_instance_by_id(&self, database_instance_id: u64) -> Result> { - for di in self.get_database_instances().await? { + pub fn get_database_instance_by_id(&self, database_instance_id: u64) -> Result> { + for di in self.get_database_instances()? { if di.id == database_instance_id { return Ok(Some(di)); } @@ -395,15 +403,14 @@ impl ControlDb { Ok(None) } - pub async fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { + pub fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { self.get_database_instances() - .await .unwrap() .into_iter() .find(|instance| instance.database_id == database_id && instance.leader) } - pub async fn get_database_instances_by_database(&self, database_id: u64) -> Result> { + pub fn get_database_instances_by_database(&self, database_id: u64) -> Result> { // TODO: because we don't have foreign key constraints it's actually possible to have // instances in here with no database. Although we'd be in a bit of a corrupted state // in that case @@ -414,8 +421,7 @@ impl ControlDb { // } // let database_instances = self - .get_database_instances() - .await? + .get_database_instances()? .iter() .filter(|instance| instance.database_id == database_id) .cloned() @@ -423,7 +429,7 @@ impl ControlDb { Ok(database_instances) } - pub async fn insert_database_instance(&self, mut database_instance: DatabaseInstance) -> Result { + pub fn insert_database_instance(&self, mut database_instance: DatabaseInstance) -> Result { let tree = self.db.open_tree("database_instance")?; let id = self.db.generate_id()?; @@ -436,7 +442,7 @@ impl ControlDb { Ok(id) } - pub async fn update_database_instance(&self, database_instance: DatabaseInstance) -> Result<()> { + pub fn update_database_instance(&self, database_instance: DatabaseInstance) -> Result<()> { let tree = self.db.open_tree("database_instance")?; let buf = bsatn::to_vec(&database_instance).unwrap(); @@ -445,13 +451,13 @@ impl ControlDb { Ok(()) } - pub async fn delete_database_instance(&self, id: u64) -> Result<()> { + pub fn delete_database_instance(&self, id: u64) -> Result<()> { let tree = self.db.open_tree("database_instance")?; tree.remove(id.to_be_bytes())?; Ok(()) } - pub async fn get_nodes(&self) -> Result> { + pub fn get_nodes(&self) -> Result> { let tree = self.db.open_tree("node")?; let mut nodes = Vec::new(); let scan_key: &[u8] = b""; @@ -463,7 +469,7 @@ impl ControlDb { Ok(nodes) } - pub async fn get_node(&self, id: u64) -> Result> { + pub fn get_node(&self, id: u64) -> Result> { let tree = self.db.open_tree("node")?; let value = tree.get(id.to_be_bytes())?; @@ -475,7 +481,7 @@ impl ControlDb { } } - pub async fn insert_node(&self, mut node: Node) -> Result { + pub fn insert_node(&self, mut node: Node) -> Result { let tree = self.db.open_tree("node")?; let id = self.db.generate_id()?; @@ -488,7 +494,7 @@ impl ControlDb { Ok(id) } - pub async fn update_node(&self, node: Node) -> Result<()> { + pub fn update_node(&self, node: Node) -> Result<()> { let tree = self.db.open_tree("node")?; let buf = bsatn::to_vec(&node).unwrap(); @@ -497,7 +503,7 @@ impl ControlDb { Ok(()) } - pub async fn _delete_node(&self, id: u64) -> Result<()> { + pub fn _delete_node(&self, id: u64) -> Result<()> { let tree = self.db.open_tree("node")?; tree.remove(id.to_be_bytes())?; Ok(()) @@ -506,7 +512,7 @@ impl ControlDb { /// Return the current budget for all identities as stored in the db. /// Note: this function is for the stored budget only and should *only* be called by functions in /// `control_budget`, where a cached copy is stored along with business logic for managing it. - pub async fn get_energy_balances(&self) -> Result> { + pub fn get_energy_balances(&self) -> Result> { let mut balances = vec![]; let tree = self.db.open_tree("energy_budget")?; for balance_entry in tree.iter() { @@ -550,7 +556,7 @@ impl ControlDb { /// Update the stored current budget for a identity. /// Note: this function is for the stored budget only and should *only* be called by functions in /// `control_budget`, where a cached copy is stored along with business logic for managing it. - pub async fn set_energy_balance(&self, identity: Identity, energy_balance: EnergyQuanta) -> Result<()> { + pub fn set_energy_balance(&self, identity: Identity, energy_balance: EnergyQuanta) -> Result<()> { let tree = self.db.open_tree("energy_budget")?; tree.insert(identity.as_bytes(), &energy_balance.0.to_be_bytes())?; diff --git a/crates/core/src/control_db/tests.rs b/crates/core/src/control_db/tests.rs index 9178c52a56..6e6390ce75 100644 --- a/crates/core/src/control_db/tests.rs +++ b/crates/core/src/control_db/tests.rs @@ -8,30 +8,26 @@ use super::*; static ALICE: Lazy = Lazy::new(|| Identity::from_hashing_bytes("alice")); static BOB: Lazy = Lazy::new(|| Identity::from_hashing_bytes("bob")); -#[tokio::test] -async fn test_register_tld() -> anyhow::Result<()> { +#[test] +fn test_register_tld() -> anyhow::Result<()> { let tmp = TempDir::new("register-tld")?; let domain: DomainName = "amaze".parse()?; - let cdb = tokio::task::spawn_blocking({ - let path = tmp.path().to_path_buf(); - move || ControlDb::at(path) - }) - .await??; - - cdb.spacetime_register_tld(domain.to_tld(), *ALICE).await?; - let owner = cdb.spacetime_lookup_tld(domain.tld()).await?; + let cdb = ControlDb::at(tmp.path())?; + + cdb.spacetime_register_tld(domain.to_tld(), *ALICE)?; + let owner = cdb.spacetime_lookup_tld(domain.tld())?; assert_eq!(owner, Some(*ALICE)); - let unauthorized = cdb.spacetime_register_tld(domain.to_tld(), *BOB).await?; + let unauthorized = cdb.spacetime_register_tld(domain.to_tld(), *BOB)?; assert!(matches!(unauthorized, RegisterTldResult::Unauthorized { .. })); - let already_registered = cdb.spacetime_register_tld(domain.to_tld(), *ALICE).await?; + let already_registered = cdb.spacetime_register_tld(domain.to_tld(), *ALICE)?; assert!(matches!( already_registered, RegisterTldResult::AlreadyRegistered { .. } )); let domain = DomainName::from_str("amAZe")?; - let already_registered = cdb.spacetime_register_tld(domain.to_tld(), *ALICE).await?; + let already_registered = cdb.spacetime_register_tld(domain.to_tld(), *ALICE)?; assert!(matches!( already_registered, RegisterTldResult::AlreadyRegistered { .. } @@ -41,48 +37,42 @@ async fn test_register_tld() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_domain() -> anyhow::Result<()> { +#[test] +fn test_domain() -> anyhow::Result<()> { let tmp = TempDir::new("insert-domain")?; let domain: DomainName = "this/hASmiXed/case".parse()?; let domain_lower: DomainName = domain.to_lowercase().parse()?; - let cdb = tokio::task::spawn_blocking({ - let path = tmp.path().to_path_buf(); - move || ControlDb::at(path) - }) - .await??; + let cdb = ControlDb::at(tmp.path())?; let addr = Address::zero(); - let res = cdb.spacetime_insert_domain(&addr, domain.clone(), *ALICE, true).await?; + let res = cdb.spacetime_insert_domain(&addr, domain.clone(), *ALICE, true)?; assert!(matches!(res, InsertDomainResult::Success { .. })); // Check Alice owns TLD let unauthorized = cdb - .spacetime_insert_domain(&addr, domain.to_tld().into(), *BOB, true) - .await?; + .spacetime_insert_domain(&addr, "this/is/bob".parse()?, *BOB, true) + .unwrap(); assert!(matches!(unauthorized, InsertDomainResult::PermissionDenied { .. })); - let already_registered = cdb.spacetime_insert_domain(&addr, domain.clone(), *ALICE, true).await; + let already_registered = cdb.spacetime_insert_domain(&addr, domain.clone(), *ALICE, true); assert!(matches!(already_registered, Err(Error::RecordAlreadyExists(_)))); // Cannot register lowercase - let already_registered = cdb - .spacetime_insert_domain(&addr, domain_lower.clone(), *ALICE, true) - .await; + let already_registered = cdb.spacetime_insert_domain(&addr, domain_lower.clone(), *ALICE, true); assert!(matches!(already_registered, Err(Error::RecordAlreadyExists(_)))); - let tld_owner = cdb.spacetime_lookup_tld(domain.tld()).await?; + let tld_owner = cdb.spacetime_lookup_tld(domain.tld())?; assert_eq!(tld_owner, Some(*ALICE)); - let registered_addr = cdb.spacetime_dns(&domain).await?; + let registered_addr = cdb.spacetime_dns(&domain)?; assert_eq!(registered_addr, Some(addr)); // Try lowercase, too - let registered_addr = cdb.spacetime_dns(&domain_lower).await?; + let registered_addr = cdb.spacetime_dns(&domain_lower)?; assert_eq!(registered_addr, Some(addr)); // Reverse should yield the original domain (in mixed-case) - let reverse_lookup = cdb.spacetime_reverse_dns(&addr).await?; + let reverse_lookup = cdb.spacetime_reverse_dns(&addr)?; assert_eq!( reverse_lookup.first().map(ToString::to_string), Some(domain.to_string()) diff --git a/crates/core/src/sendgrid_controller.rs b/crates/core/src/sendgrid_controller.rs index 98e2bd065f..763e22cf13 100644 --- a/crates/core/src/sendgrid_controller.rs +++ b/crates/core/src/sendgrid_controller.rs @@ -1,5 +1,6 @@ use sendgrid::v3::{Email, Personalization, SGMap}; +#[derive(Clone)] pub struct SendGridController { sender: sendgrid::v3::Sender, sendgrid_sender: Email, diff --git a/crates/lib/src/address.rs b/crates/lib/src/address.rs index a4c1fdf1dd..a67aa97f06 100644 --- a/crates/lib/src/address.rs +++ b/crates/lib/src/address.rs @@ -24,6 +24,8 @@ impl Display for Address { impl Address { const ABBREVIATION_LEN: usize = 16; + pub const ZERO: Self = Self(0); + pub fn from_arr(arr: &[u8; 16]) -> Self { Self(u128::from_be_bytes(*arr)) } @@ -66,6 +68,16 @@ impl Address { pub fn to_ipv6_string(self) -> String { self.to_ipv6().to_string() } + + pub fn to_u128(&self) -> u128 { + self.0 + } +} + +impl From for Address { + fn from(value: u128) -> Self { + Self(value) + } } impl_serialize!([] Address, (self, ser) => self.0.to_be_bytes().serialize(ser)); diff --git a/crates/lib/src/hash.rs b/crates/lib/src/hash.rs index cb5047f677..216bdb522b 100644 --- a/crates/lib/src/hash.rs +++ b/crates/lib/src/hash.rs @@ -16,6 +16,8 @@ impl_deserialize!([] Hash, de => Ok(Self { data: <_>::deserialize(de)? })); impl Hash { const ABBREVIATION_LEN: usize = 16; + pub const ZERO: Self = Self { data: [0; HASH_SIZE] }; + pub fn from_arr(arr: &[u8; HASH_SIZE]) -> Self { Self { data: *arr } } diff --git a/crates/lib/src/identity.rs b/crates/lib/src/identity.rs index ff7dbd9b13..64cd6d787d 100644 --- a/crates/lib/src/identity.rs +++ b/crates/lib/src/identity.rs @@ -1,6 +1,6 @@ use spacetimedb_bindings_macro::{Deserialize, Serialize}; use spacetimedb_sats::{impl_st, AlgebraicType, ProductTypeElement}; -use std::fmt; +use std::{fmt, str::FromStr}; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct AuthCtx { @@ -101,6 +101,14 @@ impl hex::FromHex for Identity { } } +impl FromStr for Identity { + type Err = ::Error; + + fn from_str(s: &str) -> Result { + Self::from_hex(s) + } +} + #[cfg(feature = "serde")] impl serde::Serialize for Identity { fn serialize(&self, serializer: S) -> Result { diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index bd8a15fe2e..7c9c561301 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -9,7 +9,6 @@ pub mod identity; pub use spacetimedb_sats::de; pub mod error; pub mod hash; -#[cfg(feature = "serde")] pub mod name; pub mod operator; pub mod primary_key; diff --git a/crates/lib/src/name.rs b/crates/lib/src/name.rs index 334a3dc5a0..a495a882dd 100644 --- a/crates/lib/src/name.rs +++ b/crates/lib/src/name.rs @@ -1,11 +1,11 @@ -use serde::{Deserialize, Serialize}; use spacetimedb_sats::{impl_deserialize, impl_serialize, impl_st}; use std::{borrow::Borrow, fmt, ops::Deref, str::FromStr}; #[cfg(test)] mod tests; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum InsertDomainResult { Success { domain: DomainName, @@ -32,16 +32,24 @@ pub enum InsertDomainResult { PermissionDenied { domain: DomainName, }, + + /// Some unspecified error occurred. + OtherError(String), } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] +#[derive(Clone, Copy, Debug)] +#[cfg_attr( + feature = "serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "lowercase") +)] pub enum PublishOp { Created, Updated, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum PublishResult { Success { /// `Some` if publish was given a domain name to operate on, `None` @@ -59,6 +67,7 @@ pub enum PublishResult { op: PublishOp, }, + // TODO: below variants are obsolete with control db module /// The top level domain for the database name is not registered. For example: /// /// - `clockworklabs/bitcraft` @@ -77,7 +86,8 @@ pub enum PublishResult { PermissionDenied { domain: DomainName }, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum DnsLookupResponse { /// The lookup was successful and the domain and address are returned. Success { domain: DomainName, address: String }, @@ -86,7 +96,8 @@ pub enum DnsLookupResponse { Failure { domain: DomainName }, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum RegisterTldResult { Success { domain: Tld, @@ -102,7 +113,8 @@ pub enum RegisterTldResult { // TODO(jdetter): Insufficient funds error here } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum SetDefaultDomainResult { Success { domain: DomainName, @@ -273,7 +285,7 @@ impl fmt::Display for TldRef { /// /// To construct a valid [`DomainName`], use [`parse_domain_name`] or the /// [`FromStr`] impl. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct DomainName { // Iff there is a subdomain, next char in `domain_name` is '/'. tld_offset: usize, diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index bf767ede4c..d2ae20186b 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -8,7 +8,9 @@ use spacetimedb_client_api_messages::client_api::Message; use tokio::task::JoinHandle; use tokio::{net::TcpStream, runtime}; use tokio_tungstenite::{ - connect_async, tungstenite::client::IntoClientRequest, tungstenite::protocol::Message as WebSocketMessage, + connect_async_with_config, + tungstenite::client::IntoClientRequest, + tungstenite::protocol::{Message as WebSocketMessage, WebSocketConfig}, MaybeTlsStream, WebSocketStream, }; @@ -119,7 +121,19 @@ impl DbConnection { >::Error: std::error::Error + Send + Sync + 'static, { let req = make_request(host, db_name, credentials)?; - let (sock, _): (WebSocketStream>, _) = connect_async(req).await?; + let (sock, _): (WebSocketStream>, _) = connect_async_with_config( + req, + // TODO(kim): In order to be able to replicate module WASM blobs, + // `cloud-next` cannot have message / frame size limits. That's + // obviously a bad default for all other clients, though. + Some(WebSocketConfig { + max_frame_size: None, + max_message_size: None, + ..WebSocketConfig::default() + }), + false, + ) + .await?; Ok(DbConnection { sock }) } diff --git a/crates/standalone/src/energy_monitor.rs b/crates/standalone/src/energy_monitor.rs index 638849b341..639a73ae74 100644 --- a/crates/standalone/src/energy_monitor.rs +++ b/crates/standalone/src/energy_monitor.rs @@ -1,6 +1,6 @@ use crate::StandaloneEnv; use spacetimedb::host::{EnergyDiff, EnergyMonitor, EnergyMonitorFingerprint, EnergyQuanta}; -use spacetimedb_client_api::ControlNodeDelegate; +use spacetimedb_client_api::ControlStateWriteAccess; use std::{ sync::{Arc, Mutex, Weak}, time::Duration, diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 7c71308949..3c583cf8b3 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -6,7 +6,8 @@ mod worker_db; use crate::subcommands::start::ProgramMode; use crate::subcommands::{start, version}; -use anyhow::Context; +use anyhow::{anyhow, Context}; +use async_trait::async_trait; use clap::{ArgMatches, Command}; use energy_monitor::StandaloneEnergyMonitor; use openssl::ec::{EcGroup, EcKey}; @@ -19,18 +20,19 @@ use spacetimedb::control_db::ControlDb; use spacetimedb::database_instance_context::DatabaseInstanceContext; use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController; use spacetimedb::db::{db_metrics, Config}; -use spacetimedb::hash::Hash; +use spacetimedb::host::EnergyQuanta; +use spacetimedb::host::UpdateDatabaseResult; use spacetimedb::host::UpdateOutcome; use spacetimedb::host::{scheduler::Scheduler, HostController}; -use spacetimedb::host::{EnergyQuanta, UpdateDatabaseResult}; use spacetimedb::identity::Identity; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, Node}; +use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, IdentityEmail, Node}; use spacetimedb::messages::worker_db::DatabaseInstanceState; use spacetimedb::module_host_context::ModuleHostContext; use spacetimedb::object_db::ObjectDb; use spacetimedb::sendgrid_controller::SendGridController; use spacetimedb::{stdb_path, worker_metrics}; -use spacetimedb_lib::name::DomainName; +use spacetimedb_lib::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; +use spacetimedb_lib::recovery::RecoveryCode; use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; @@ -155,8 +157,8 @@ fn get_key_path(env: &str) -> Option { Some(path) } -#[async_trait::async_trait] -impl spacetimedb_client_api::WorkerCtx for StandaloneEnv { +#[async_trait] +impl spacetimedb_client_api::NodeDelegate for StandaloneEnv { fn gather_metrics(&self) -> Vec { let mut metric_families = worker_metrics::REGISTRY.gather(); metric_families.extend(db_metrics::REGISTRY.gather()); @@ -167,25 +169,44 @@ impl spacetimedb_client_api::WorkerCtx for StandaloneEnv { &self.db_inst_ctx_controller } - async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result { - self.load_module_host_context_inner(db, instance_id).await - } - fn host_controller(&self) -> &Arc { &self.host_controller } + fn client_actor_index(&self) -> &ClientActorIndex { &self.client_actor_index } + + fn public_key(&self) -> &DecodingKey { + &self.public_key + } + + fn public_key_bytes(&self) -> &[u8] { + &self.public_key_bytes + } + + fn private_key(&self) -> &EncodingKey { + &self.private_key + } + + /// Standalone SpacetimeDB does not support SendGrid as a means to + /// reissue authentication tokens. + fn sendgrid_controller(&self) -> Option<&SendGridController> { + None + } + + async fn load_module_host_context(&self, db: Database, instance_id: u64) -> anyhow::Result { + self.load_module_host_context_inner(db, instance_id).await + } } -#[async_trait::async_trait] -impl spacetimedb_client_api::ControlStateDelegate for StandaloneEnv { - async fn get_node_id(&self) -> Result, anyhow::Error> { - Ok(Some(0)) +impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv { + // Nodes + fn get_node_id(&self) -> Option { + Some(0) } - async fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result> { + fn get_node_by_id(&self, node_id: u64) -> spacetimedb::control_db::Result> { if node_id == 0 { return Ok(Some(Node { id: 0, @@ -196,163 +217,195 @@ impl spacetimedb_client_api::ControlStateDelegate for StandaloneEnv { Ok(None) } - async fn get_nodes(&self) -> spacetimedb::control_db::Result> { - Ok(vec![self.get_node_by_id(0).await?.unwrap()]) + fn get_nodes(&self) -> spacetimedb::control_db::Result> { + Ok(vec![self.get_node_by_id(0)?.unwrap()]) } - async fn get_database_instance_state( - &self, - database_instance_id: u64, - ) -> Result, anyhow::Error> { - self.worker_db.get_database_instance_state(database_instance_id) + // Databases + fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + self.control_db.get_database_by_id(id) } - async fn get_database_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { - self.control_db.get_database_by_id(id).await + fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result> { + self.control_db.get_database_by_address(address) } - async fn get_database_by_address(&self, address: &Address) -> spacetimedb::control_db::Result> { - self.control_db.get_database_by_address(address).await + fn get_databases(&self) -> spacetimedb::control_db::Result> { + self.control_db.get_databases() } - async fn get_databases(&self) -> spacetimedb::control_db::Result> { - self.control_db.get_databases().await + // Database instances + fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { + self.control_db.get_database_instance_by_id(id) } - async fn get_database_instance_by_id(&self, id: u64) -> spacetimedb::control_db::Result> { - self.control_db.get_database_instance_by_id(id).await + fn get_database_instances(&self) -> spacetimedb::control_db::Result> { + self.control_db.get_database_instances() } - async fn get_database_instances(&self) -> spacetimedb::control_db::Result> { - self.control_db.get_database_instances().await + fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { + self.control_db.get_leader_database_instance_by_database(database_id) } - async fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { - self.control_db - .get_leader_database_instance_by_database(database_id) - .await + // Identities + fn get_identities_for_email(&self, email: &str) -> spacetimedb::control_db::Result> { + self.control_db.get_identities_for_email(email) } -} -#[async_trait::async_trait] -impl spacetimedb_client_api::ControlCtx for StandaloneEnv { - async fn insert_database( - &self, - address: &Address, - identity: &Identity, - program_bytes_address: &Hash, - host_type: HostType, - num_replicas: u32, - force: bool, - ) -> Result<(), anyhow::Error> { - let database = Database { - id: 0, - address: *address, - identity: *identity, - host_type, - num_replicas, - program_bytes_address: *program_bytes_address, - }; + fn get_recovery_codes(&self, email: &str) -> spacetimedb::control_db::Result> { + self.control_db.spacetime_get_recovery_codes(email) + } - if force { - self.delete_database(address).await?; - } + // Energy + fn get_energy_balance(&self, identity: &Identity) -> spacetimedb::control_db::Result> { + self.control_db.get_energy_balance(identity) + } - let mut new_database = database.clone(); - let id = self.control_db.insert_database(database).await?; - new_database.id = id; + // DNS + fn lookup_address(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { + self.control_db.spacetime_dns(domain) + } - self.schedule_database(Some(new_database), None).await?; - Ok(()) + fn reverse_lookup(&self, address: &Address) -> spacetimedb::control_db::Result> { + self.control_db.spacetime_reverse_dns(address) + } +} + +#[async_trait] +impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { + async fn create_address(&self) -> spacetimedb::control_db::Result
{ + self.control_db.alloc_spacetime_address() } - async fn update_database( + async fn publish_database( &self, - address: &Address, - program_bytes_address: &Hash, - num_replicas: u32, - ) -> Result, anyhow::Error> { - let database = self.control_db.get_database_by_address(address).await?; - let mut database = match database { - Some(database) => database, - None => return Ok(None), + identity: &Identity, + spec: spacetimedb_client_api::DatabaseDef, + ) -> spacetimedb::control_db::Result> { + let existing_db = self.control_db.get_database_by_address(&spec.address)?; + let program_bytes_address = self.object_db.insert_object(spec.program_bytes)?; + let mut database = match existing_db.as_ref() { + Some(existing) => Database { + address: spec.address, + num_replicas: spec.num_replicas, + program_bytes_address, + ..existing.clone() + }, + None => Database { + id: 0, + address: spec.address, + identity: *identity, + host_type: HostType::Wasmer, + num_replicas: spec.num_replicas, + program_bytes_address, + }, }; - let old_database = database.clone(); + if let Some(existing) = existing_db.as_ref() { + if &existing.identity != identity { + return Err(anyhow!( + "Permission denied: `{}` does not own database `{}`", + identity.to_hex(), + spec.address.to_abbreviated_hex() + ) + .into()); + } + self.control_db.update_database(database.clone())?; + } else { + let id = self.control_db.insert_database(database.clone())?; + database.id = id; + } - database.program_bytes_address = *program_bytes_address; - database.num_replicas = num_replicas; let database_id = database.id; - let new_database = database.clone(); - self.control_db.update_database(database).await?; + let should_update_instances = existing_db.is_some(); - self.schedule_database(Some(new_database), Some(old_database)).await?; - self.update_database_instances(database_id) - .await - // TODO(kim): this should really only run on the leader instance - .map(|mut res| res.pop().flatten()) + self.schedule_database(Some(database), existing_db).await?; + + if should_update_instances { + let leader = self + .control_db + .get_leader_database_instance_by_database(database_id) + .ok_or_else(|| anyhow!("Not found: leader instance for database {database_id}"))?; + Ok(self.update_database_instance(leader).await?) + } else { + Ok(None) + } } - async fn delete_database(&self, address: &Address) -> Result<(), anyhow::Error> { - let Some(database) = self.control_db.get_database_by_address(address).await? else { + async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()> { + let Some(database) = self.control_db.get_database_by_address(address)? else { return Ok(()); }; - self.control_db.delete_database(database.id).await?; + if &database.identity != identity { + return Err(anyhow!( + "Permission denied: `{}` does not own database `{}`", + identity.to_hex(), + address.to_abbreviated_hex() + ) + .into()); + } + + self.control_db.delete_database(database.id)?; self.schedule_database(None, Some(database)).await?; + Ok(()) } - fn object_db(&self) -> &ObjectDb { - &self.object_db + async fn create_identity(&self) -> spacetimedb::control_db::Result { + self.control_db.alloc_spacetime_identity() } - fn control_db(&self) -> &ControlDb { - &self.control_db + async fn add_email(&self, identity: &Identity, email: &str) -> spacetimedb::control_db::Result<()> { + self.control_db + .associate_email_spacetime_identity(*identity, email) + .await } - /// Standalone SpacetimeDB does not support SendGrid as a means to - /// reissue authentication tokens. - fn sendgrid_controller(&self) -> Option<&SendGridController> { - None + async fn insert_recovery_code( + &self, + _identity: &Identity, + email: &str, + code: RecoveryCode, + ) -> spacetimedb::control_db::Result<()> { + self.control_db.spacetime_insert_recovery_code(email, code) } -} -#[async_trait::async_trait] -impl spacetimedb_client_api::ControlNodeDelegate for StandaloneEnv { - async fn spacetime_dns(&self, domain: &DomainName) -> spacetimedb::control_db::Result> { - self.control_db.spacetime_dns(domain).await - } + async fn add_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { + let mut balance = ::get_energy_balance(self, identity)? + .map(|quanta| quanta.0) + .unwrap_or(0); + balance = balance.saturating_add(amount.0); - async fn alloc_spacetime_identity(&self) -> spacetimedb::control_db::Result { - self.control_db.alloc_spacetime_identity().await + self.control_db.set_energy_balance(*identity, EnergyQuanta(balance)) } - async fn withdraw_energy(&self, identity: &Identity, amount: EnergyQuanta) -> spacetimedb::control_db::Result<()> { let energy_balance = self.control_db.get_energy_balance(identity)?; let energy_balance = energy_balance.unwrap_or(EnergyQuanta(0)); log::trace!("Withdrawing {} energy from {}", amount.0, identity); log::trace!("Old balance: {}", energy_balance.0); let new_balance = energy_balance - amount; - self.control_db - .set_energy_balance(*identity, new_balance.as_quanta()) - .await + self.control_db.set_energy_balance(*identity, new_balance.as_quanta()) } - fn public_key(&self) -> &DecodingKey { - &self.public_key - } - fn private_key(&self) -> &EncodingKey { - &self.private_key + async fn register_tld(&self, identity: &Identity, tld: Tld) -> spacetimedb::control_db::Result { + self.control_db.spacetime_register_tld(tld, *identity) } - fn public_key_bytes(&self) -> &[u8] { - &self.public_key_bytes + + async fn create_dns_record( + &self, + identity: &Identity, + domain: &DomainName, + address: &Address, + ) -> spacetimedb::control_db::Result { + self.control_db + .spacetime_insert_domain(address, domain.clone(), *identity, true) } } impl StandaloneEnv { async fn insert_database_instance(&self, database_instance: DatabaseInstance) -> Result<(), anyhow::Error> { let mut new_database_instance = database_instance.clone(); - let id = self.control_db.insert_database_instance(database_instance).await?; + let id = self.control_db.insert_database_instance(database_instance)?; new_database_instance.id = id; self.on_insert_database_instance(&new_database_instance).await?; @@ -367,16 +420,12 @@ impl StandaloneEnv { &self, database_instance: DatabaseInstance, ) -> Result, anyhow::Error> { - self.control_db - .update_database_instance(database_instance.clone()) - .await?; - + self.control_db.update_database_instance(database_instance.clone())?; self.on_update_database_instance(&database_instance).await } async fn delete_database_instance(&self, database_instance_id: u64) -> Result<(), anyhow::Error> { - self.control_db.delete_database_instance(database_instance_id).await?; - + self.control_db.delete_database_instance(database_instance_id)?; self.on_delete_database_instance(database_instance_id).await; Ok(()) @@ -426,25 +475,9 @@ impl StandaloneEnv { Ok(()) } - // TODO(kim): update should only run on the leader instance, and this - // method should return a single result - async fn update_database_instances( - &self, - database_id: u64, - ) -> Result>, anyhow::Error> { - let instances = self.control_db.get_database_instances_by_database(database_id).await?; - let mut results = Vec::with_capacity(instances.len()); - for instance in instances { - let res = self.update_database_instance(instance).await?; - results.push(res); - } - - Ok(results) - } - async fn deschedule_replicas(&self, database_id: u64, num_replicas: u32) -> Result<(), anyhow::Error> { for _ in 0..num_replicas { - let instances = self.control_db.get_database_instances_by_database(database_id).await?; + let instances = self.control_db.get_database_instances_by_database(database_id)?; let Some(instance) = instances.last() else { return Ok(()); }; @@ -517,7 +550,7 @@ impl StandaloneEnv { database_id: u64, instance_id: u64, ) -> Result { - let database = if let Some(database) = self.control_db.get_database_by_id(database_id).await? { + let database = if let Some(database) = self.control_db.get_database_by_id(database_id)? { database } else { return Err(anyhow::anyhow!( @@ -542,9 +575,19 @@ impl StandaloneEnv { if let Some((dbic, scheduler)) = self.db_inst_ctx_controller.get(instance_id) { (dbic, scheduler.new_with_same_db()) } else { - let dbic = - DatabaseInstanceContext::from_database(self.config, &database, instance_id, root_db_path.clone()); - let (scheduler, scheduler_starter) = Scheduler::open(dbic.scheduler_db_path(root_db_path))?; + // `spawn_blocking` because we're accessing the filesystem + let (dbic, (scheduler, scheduler_starter)) = tokio::task::spawn_blocking({ + let database = database.clone(); + let path = root_db_path.clone(); + let config = self.config; + move || -> anyhow::Result<_> { + let dbic = DatabaseInstanceContext::from_database(config, &database, instance_id, path); + let sched = Scheduler::open(dbic.scheduler_db_path(root_db_path))?; + Ok((dbic, sched)) + } + }) + .await??; + self.db_inst_ctx_controller.insert(dbic.clone(), scheduler.clone()); (dbic, (scheduler, scheduler_starter)) }; diff --git a/crates/standalone/src/routes/mod.rs b/crates/standalone/src/routes/mod.rs index 81825a934a..5be504c9a1 100644 --- a/crates/standalone/src/routes/mod.rs +++ b/crates/standalone/src/routes/mod.rs @@ -1,18 +1,15 @@ -use axum::extract::FromRef; use http::header::{ACCEPT, AUTHORIZATION}; +use tower_http::cors::{Any, CorsLayer}; + use spacetimedb_client_api::{ routes::{database, energy, identity, metrics, prometheus}, - ControlCtx, ControlNodeDelegate, WorkerCtx, + ControlStateDelegate, NodeDelegate, }; -use std::sync::Arc; -use tower_http::cors::{Any, CorsLayer}; #[allow(clippy::let_and_return)] pub fn router() -> axum::Router where - S: ControlNodeDelegate + Clone + 'static, - Arc: FromRef, - Arc: FromRef, + S: NodeDelegate + ControlStateDelegate + Clone + 'static, { let router = axum::Router::new() .nest("/database", database::control_routes().merge(database::worker_routes())) diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index c90da58a21..f22a51b622 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -3,16 +3,15 @@ use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::Arc; +use tokio::runtime::{Builder, Runtime}; + use spacetimedb::address::Address; use spacetimedb::client::{ClientActorId, ClientConnection, Protocol}; +use spacetimedb::config::{FilesLocal, SpacetimeDbFiles}; use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::db::{Config, FsyncPolicy, Storage}; - -use spacetimedb::config::{FilesLocal, SpacetimeDbFiles}; -use spacetimedb::messages::control_db::HostType; -use spacetimedb_client_api::{ControlCtx, ControlStateDelegate, WorkerCtx}; +use spacetimedb_client_api::{ControlStateReadAccess, ControlStateWriteAccess, DatabaseDef, NodeDelegate}; use spacetimedb_standalone::StandaloneEnv; -use tokio::runtime::{Builder, Runtime}; fn start_runtime() -> Runtime { Builder::new_multi_thread().enable_all().build().unwrap() @@ -88,7 +87,6 @@ impl CompiledModule { with_runtime(move |runtime| { runtime.block_on(async { let module = self.load_module().await; - routine(module).await; }); }); @@ -119,23 +117,27 @@ impl CompiledModule { crate::set_key_env_vars(&paths); let env = spacetimedb_standalone::StandaloneEnv::init(config).await.unwrap(); - let identity = env.control_db().alloc_spacetime_identity().await.unwrap(); - let address = env.control_db().alloc_spacetime_address().await.unwrap(); + let identity = env.create_identity().await.unwrap(); + let address = env.create_address().await.unwrap(); let program_bytes = self .program_bytes .get_or_init(|| std::fs::read(&self.path).unwrap()) .clone(); - let program_bytes_addr = env.object_db().insert_object(program_bytes).unwrap(); - - let host_type = HostType::Wasmer; - - env.insert_database(&address, &identity, &program_bytes_addr, host_type, 1, true) - .await - .unwrap(); - let database = env.get_database_by_address(&address).await.unwrap().unwrap(); - let instance = env.get_leader_database_instance_by_database(database.id).await.unwrap(); + env.publish_database( + &identity, + DatabaseDef { + address, + program_bytes, + num_replicas: 1, + }, + ) + .await + .unwrap(); + + let database = env.get_database_by_address(&address).unwrap().unwrap(); + let instance = env.get_leader_database_instance_by_database(database.id).unwrap(); let client_id = ClientActorId { identity, diff --git a/test/tests/update-module.sh b/test/tests/update-module.sh index 9712b678eb..6977f80def 100644 --- a/test/tests/update-module.sh +++ b/test/tests/update-module.sh @@ -22,7 +22,7 @@ pub struct Person { #[spacetimedb(reducer)] pub fn add(name: String) { - Person::insert(Person { id: 0, name }).unwrap(); + Person::insert(Person { id: 0, name }); } #[spacetimedb(reducer)]