From 1d8e2ca2fb76d432f111c693fd6ad3bf73373009 Mon Sep 17 00:00:00 2001 From: Jeremie Pelletier Date: Fri, 15 Nov 2024 11:56:28 -0500 Subject: [PATCH] Subscriptions impl, server side --- crates/cli/src/subcommands/subscribe.rs | 12 +- crates/client-api-messages/src/websocket.rs | 135 +++++++++++-- crates/core/src/client/client_connection.rs | 17 +- crates/core/src/client/message_handlers.rs | 8 + crates/core/src/client/messages.rs | 157 +++++++++++++- crates/core/src/error.rs | 4 +- crates/core/src/execution_context.rs | 9 + crates/core/src/host/module_host.rs | 2 +- .../core/src/subscription/execution_unit.rs | 29 ++- .../subscription/module_subscription_actor.rs | 191 ++++++++++-------- .../module_subscription_manager.rs | 106 +++++++--- crates/core/src/subscription/query.rs | 120 +++-------- crates/core/src/subscription/subscription.rs | 24 +-- 13 files changed, 542 insertions(+), 272 deletions(-) diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs index ca67d8d0e62..f5709a3802e 100644 --- a/crates/cli/src/subcommands/subscribe.rs +++ b/crates/cli/src/subcommands/subscribe.rs @@ -121,7 +121,7 @@ struct SubscriptionTable { } pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> { - let queries = args.get_many::("query").unwrap(); + let query = args.get_one::("query").unwrap(); let num = args.get_one::("num-updates").copied(); let timeout = args.get_one::("timeout").copied(); let print_initial_update = args.get_flag("print_initial_update"); @@ -152,7 +152,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error let (mut ws, _) = tokio_tungstenite::connect_async(req).await?; let task = async { - subscribe(&mut ws, queries.cloned().map(Into::into).collect()).await?; + subscribe(&mut ws, query.clone().into_boxed_str()).await?; await_initial_update(&mut ws, print_initial_update.then_some(&module_def)).await?; consume_transaction_updates(&mut ws, num, &module_def).await }; @@ -175,13 +175,13 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error } /// Send the subscribe message. -async fn subscribe(ws: &mut S, query_strings: Box<[Box]>) -> Result<(), S::Error> +async fn subscribe(ws: &mut S, query: Box) -> Result<(), S::Error> where S: Sink + Unpin, { let msg = serde_json::to_string(&SerializeWrapper::new(ws::ClientMessage::<()>::Subscribe( ws::Subscribe { - query_strings, + query, request_id: 0, }, ))) @@ -201,7 +201,7 @@ where while let Some(msg) = ws.try_next().await? { let Some(msg) = parse_msg_json(&msg) else { continue }; match msg { - ws::ServerMessage::InitialSubscription(sub) => { + ws::ServerMessage::SubscriptionUpdate(sub) => { if let Some(module_def) = module_def { let formatted = reformat_update(&sub.database_update, module_def)?; let output = serde_json::to_string(&formatted)? + "\n"; @@ -247,7 +247,7 @@ where let Some(msg) = parse_msg_json(&msg) else { continue }; match msg { - ws::ServerMessage::InitialSubscription(_) => { + ws::ServerMessage::SubscriptionUpdate(_) => { anyhow::bail!("protocol error: received a second initial subscription update") } ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { update, .. }) diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index dcd87313543..d1d14016f1d 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -31,7 +31,7 @@ use spacetimedb_sats::{ de::{Deserialize, Error}, impl_deserialize, impl_serialize, impl_st, ser::{serde::SerializeWrapper, Serialize}, - AlgebraicType, SpacetimeType, + AlgebraicType, SpacetimeType, u256 }; use std::{ io::{self, Read as _, Write as _}, @@ -88,6 +88,8 @@ pub enum ClientMessage { CallReducer(CallReducer), /// Register SQL queries on which to receive updates. Subscribe(Subscribe), + /// Unregister SQL queries which are receiving updates. + Unsubscribe(Unsubscribe), /// Send a one-off SQL query without establishing a subscription. OneOffQuery(OneOffQuery), } @@ -107,6 +109,7 @@ impl ClientMessage { flags, }), ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x), + ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x), ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x), } } @@ -162,6 +165,12 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? { x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))), }); +#[derive(SpacetimeType, Clone, Debug)] +#[sats(crate = spacetimedb_lib)] +pub struct QueryId { + pub hash: u256, +} + /// Sent by client to database to register a set of queries, about which the client will /// receive `TransactionUpdate`s. /// @@ -179,9 +188,21 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? { #[derive(SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct Subscribe { - /// A sequence of SQL queries. - pub query_strings: Box<[Box]>, + /// A single SQL `SELECT` query to subscribe to. + pub query: Box, + /// An identifier for a client request. + pub request_id: u32, +} + +/// Client request for removing a query from a subscription. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct Unsubscribe { + /// The ID returned in the [`SubscribeApplied`] message. + /// An optimization to avoid reparsing and normalizing the query string. pub request_id: u32, + /// An identifier for a client request. + pub query_id: QueryId, } /// A one-off query submission. @@ -212,31 +233,24 @@ pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2; #[derive(SpacetimeType, derive_more::From)] #[sats(crate = spacetimedb_lib)] pub enum ServerMessage { + /// After connecting, to inform client of its identity. + IdentityToken(IdentityToken), + /// Request a stream of changes to subscribed rows. + SubscribeApplied(SubscribeApplied), + /// Cancel a stream of changes from subscribed rows. + UnsubscribeApplied(UnsubscribeApplied), + /// Communicate an error in the subscription lifecycle. + SubscriptionError(SubscriptionError), /// Informs of changes to subscribed rows. - InitialSubscription(InitialSubscription), + SubscriptionUpdate(SubscriptionUpdate), /// Upon reducer run. TransactionUpdate(TransactionUpdate), /// Upon reducer run, but limited to just the table updates. TransactionUpdateLight(TransactionUpdateLight), - /// After connecting, to inform client of its identity. - IdentityToken(IdentityToken), /// Return results to a one off SQL query. OneOffQueryResponse(OneOffQueryResponse), } -/// Response to [`Subscribe`] containing the initial matching rows. -#[derive(SpacetimeType)] -#[sats(crate = spacetimedb_lib)] -pub struct InitialSubscription { - /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries. - pub database_update: DatabaseUpdate, - /// An identifier sent by the client in requests. - /// The server will include the same request_id in the response. - pub request_id: u32, - /// The overall time between the server receiving a request and sending the response. - pub total_host_execution_duration_micros: u64, -} - /// Received by database from client to inform of user's identity, token and client address. /// /// The database will always send an `IdentityToken` message @@ -253,6 +267,87 @@ pub struct IdentityToken { pub address: Address, } +/// The matching rows of a subscription query. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct SubscribeRows { + /// The table ID of the query. + pub table_id: TableId, + /// The table name of the query. + pub table_name: Box, + /// The BSATN row values. + pub table_rows: TableUpdate, +} + +/// Response to [`Subscribe`] containing the initial matching rows. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct SubscribeApplied { + /// An identifier sent by the client in requests. + /// The server will include the same request_id in the response. + pub request_id: u32, + /// The overall time between the server receiving a request and sending the response. + pub total_host_execution_duration_micros: u64, + /// An identifier for the subscribed query, allocated by the server. + pub query_id: QueryId, + /// The matching rows for this query. + pub rows: SubscribeRows, +} + +/// Server response to a client [`Unsubscribe`] request. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct UnsubscribeApplied { + /// Provided by the client via the `Subscribe` message. + pub request_id: u32, + /// The overall time between the server receiving a request and sending the response. + pub total_host_execution_duration_micros: u64, + /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages. + pub query_id: QueryId, + /// The matching rows for this query. + pub rows: SubscribeRows, +} + +/// Server response to an error at any point of the subscription lifecycle. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct SubscriptionError { + /// The overall time between the server receiving a request and sending the response. + pub total_host_execution_duration_micros: u64, + /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message. + /// [`None`] if this occurred as the result of a [`TransactionUpdate`]. + pub request_id: Option, + /// The return table of the query in question. + /// The server is not required to set this field. + /// It has been added to avoid a breaking change post 1.0. + /// + /// If unset, an error results in the entire subscription being dropped. + /// Otherwise only queries of this table type must be dropped. + pub table_id: Option, + /// An error message describing the failure. + /// + /// This should reference specific fragments of the query where applicable, + /// but should not include the full text of the query, + /// as the client can retrieve that from the `request_id`. + /// + /// This is intended for diagnostic purposes. + /// It need not have a predictable/parseable format. + pub error: Box, +} + +/// Response to [`Subscribe`] containing the initial matching rows. +#[derive(SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct SubscriptionUpdate { + /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries. + pub database_update: DatabaseUpdate, + /// An identifier sent by the client in requests. + /// The server will include the same request_id in the response. + pub request_id: u32, + /// The overall time between the server receiving a request and sending the response. + pub total_host_execution_duration_micros: u64, +} + // TODO: Evaluate if it makes sense for this to also include the // address of the database this is calling @@ -626,7 +721,7 @@ pub fn brotli_compress(bytes: &[u8], out: &mut Vec) { encoder .read_to_end(out) - .expect("Failed to Brotli compress `SubscriptionUpdateMessage`"); + .expect("Failed to Brotli compress `bytes`"); } pub fn brotli_decompress(bytes: &[u8]) -> Result, io::Error> { diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index fb646581a79..8e6d5eb7e68 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -7,7 +7,7 @@ use super::messages::{OneOffQueryResponseMessage, SerializableMessage}; use super::{message_handlers, ClientActorId, MessageHandleError}; use crate::error::DBError; use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult}; -use crate::messages::websocket::Subscribe; +use crate::messages::websocket::{Subscribe, Unsubscribe}; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; use derive_more::From; @@ -283,12 +283,23 @@ impl ClientConnection { .await } - pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> { + pub async fn subscribe(&self, request: Subscribe, timer: Instant) -> Result<(), DBError> { let me = self.clone(); tokio::task::spawn_blocking(move || { me.module .subscriptions() - .add_subscriber(me.sender, subscription, timer, None) + .add_subscriber(me.sender, request, timer, None) + }) + .await + .unwrap() + } + + pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> { + let me = self.clone(); + tokio::task::spawn_blocking(move || { + me.module + .subscriptions() + .remove_subscriber(me.sender, request, timer) }) .await .unwrap() diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index af59b101602..53b276c3b6b 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -87,6 +87,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } + ClientMessage::Unsubscribe(request) => { + let res = client.unsubscribe(request, timer).await; + WORKER_METRICS + .request_round_trip + .with_label_values(&WorkloadType::Unsubscribe, &address, "") + .observe(timer.elapsed().as_secs_f64()); + res.map_err(|e| (None, None, e.into())) + } ClientMessage::OneOffQuery(OneOffQuery { query_string: query, message_id, diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index eacc330ebf4..71a7bb9ae1a 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -11,7 +11,8 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::ser::serde::SerializeWrapper; use spacetimedb_lib::Address; -use spacetimedb_sats::bsatn; +use spacetimedb_primitives::TableId; +use spacetimedb_sats::{bsatn, u256}; use spacetimedb_vm::relation::MemTable; use std::sync::Arc; use std::time::Instant; @@ -65,7 +66,7 @@ pub fn serialize(msg: impl ToProtocol, config: pub enum SerializableMessage { Query(OneOffQueryResponseMessage), Identity(IdentityTokenMessage), - Subscribe(SubscriptionUpdateMessage), + Subscribe(SubscriptionMessage), TxUpdate(TransactionUpdateMessage), } @@ -82,7 +83,11 @@ impl SerializableMessage { pub fn workload(&self) -> Option { match self { Self::Query(_) => Some(WorkloadType::Sql), - Self::Subscribe(_) => Some(WorkloadType::Subscribe), + Self::Subscribe(x) => match x.result { + SubscriptionResult::Subscribe(_) => Some(WorkloadType::Subscribe), + SubscriptionResult::Unsubscribe(_) => Some(WorkloadType::Unsubscribe), + _ => None, + }, Self::TxUpdate(_) => Some(WorkloadType::Update), Self::Identity(_) => None, } @@ -225,23 +230,161 @@ impl ToProtocol for SubscriptionUpdateMessage { protocol.assert_matches_format_switch(&self.database_update); match self.database_update { FormatSwitch::Bsatn(database_update) => { - FormatSwitch::Bsatn(ws::ServerMessage::InitialSubscription(ws::InitialSubscription { + FormatSwitch::Bsatn(ws::SubscriptionUpdate { database_update, request_id, total_host_execution_duration_micros, - })) + }.into()) } FormatSwitch::Json(database_update) => { - FormatSwitch::Json(ws::ServerMessage::InitialSubscription(ws::InitialSubscription { + FormatSwitch::Json(ws::SubscriptionUpdate { database_update, request_id, total_host_execution_duration_micros, - })) + }.into()) } } } } +#[derive(Debug, Clone)] +pub struct SubscriptionRows { + pub table_id: TableId, + pub table_name: Box, + pub table_rows: FormatSwitch, ws::TableUpdate>, +} + +impl ToProtocol for SubscriptionRows { + type Encoded = FormatSwitch, ws::SubscribeRows>; + fn to_protocol(self, protocol: Protocol) -> Self::Encoded { + protocol.assert_matches_format_switch(&self.table_rows); + match self.table_rows { + FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::SubscribeRows { + table_id: self.table_id, + table_name: self.table_name, + table_rows + }.into()), + FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::SubscribeRows { + table_id: self.table_id, + table_name: self.table_name, + table_rows + }.into()), + } + } +} + +#[derive(Debug, Clone)] +pub struct SubscriptionError { + pub table_id: Option, + pub message: Box, +} + +#[derive(Debug, Clone)] +pub enum SubscriptionResult { + Subscribe(SubscriptionRows), + Unsubscribe(SubscriptionRows), + Error(SubscriptionError), +} + +#[derive(Debug, Clone)] +pub struct SubscriptionMessage { + pub timer: Option, + pub request_id: Option, + pub query_id: Option, + pub result: SubscriptionResult, +} + +fn num_rows_in(rows: &SubscriptionRows) -> usize { + match &rows.table_rows { + FormatSwitch::Bsatn(x) => x.num_rows(), + FormatSwitch::Json(x) => x.num_rows(), + } +} + +impl SubscriptionMessage { + fn num_rows(&self) -> usize { + match &self.result { + SubscriptionResult::Subscribe(x) => num_rows_in(&x), + SubscriptionResult::Unsubscribe(x) => num_rows_in(&x), + _ => 0, + } + } +} + +impl ToProtocol for SubscriptionMessage { + type Encoded = SwitchedServerMessage; + fn to_protocol(self, protocol: Protocol) -> Self::Encoded { + let request_id = self.request_id.unwrap_or(0); + let query_id = self.query_id.unwrap_or(ws::QueryId { hash: u256::ZERO }); + let total_host_execution_duration_micros = self.timer.map_or(0, |t| t.elapsed().as_micros() as u64); + + match self.result { + SubscriptionResult::Subscribe(result) => { + protocol.assert_matches_format_switch(&result.table_rows); + match result.table_rows { + FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::SubscribeApplied { + total_host_execution_duration_micros, + request_id, + query_id, + rows: ws::SubscribeRows { + table_id: result.table_id, + table_name: result.table_name, + table_rows + }, + }.into()), + FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::SubscribeApplied { + total_host_execution_duration_micros, + request_id, + query_id, + rows: ws::SubscribeRows { + table_id: result.table_id, + table_name: result.table_name, + table_rows + }, + }.into()), + } + }, + SubscriptionResult::Unsubscribe(result) => { + protocol.assert_matches_format_switch(&result.table_rows); + match result.table_rows { + FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::UnsubscribeApplied { + total_host_execution_duration_micros, + request_id, + query_id, + rows: ws::SubscribeRows { + table_id: result.table_id, + table_name: result.table_name, + table_rows + }, + }.into()), + FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::UnsubscribeApplied { + total_host_execution_duration_micros, + request_id, + query_id, + rows: ws::SubscribeRows { + table_id: result.table_id, + table_name: result.table_name, + table_rows + }, + }.into()), + } + }, + SubscriptionResult::Error(error) => { + let msg = ws::SubscriptionError { + total_host_execution_duration_micros, + request_id: self.request_id, // Pass Option through + table_id: error.table_id, + error: error.message, + }; + match protocol { + Protocol::Binary => FormatSwitch::Bsatn(msg.into()), + Protocol::Text => FormatSwitch::Json(msg.into()), + } + }, + } + } +} + #[derive(Debug)] pub struct OneOffQueryResponseMessage { pub message_id: Vec, diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index bc5661ef89c..480411b59a5 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -99,7 +99,9 @@ pub enum SubscriptionError { NotFound(IndexId), #[error("Empty string")] Empty, - #[error("Queries with side effects not allowed: {0:?}")] + #[error("Queries over multiple tables are not supported")] + Multiple, + #[error("Queries with side effects are not allowed: {0:?}")] SideEffect(Crud), #[error("Unsupported query on subscription: {0:?}")] Unsupported(String), diff --git a/crates/core/src/execution_context.rs b/crates/core/src/execution_context.rs index fa084404b24..b0f8897add8 100644 --- a/crates/core/src/execution_context.rs +++ b/crates/core/src/execution_context.rs @@ -100,6 +100,7 @@ pub enum Workload { Reducer(ReducerContext), Sql, Subscribe, + Unsubscribe, Update, Internal, } @@ -113,6 +114,7 @@ pub enum WorkloadType { Reducer, Sql, Subscribe, + Unsubscribe, Update, Internal, } @@ -125,6 +127,7 @@ impl From for WorkloadType { Workload::Reducer(_) => Self::Reducer, Workload::Sql => Self::Sql, Workload::Subscribe => Self::Subscribe, + Workload::Unsubscribe => Self::Unsubscribe, Workload::Update => Self::Update, Workload::Internal => Self::Internal, } @@ -156,6 +159,7 @@ impl ExecutionContext { Workload::Reducer(ctx) => Self::reducer(database_identity, ctx), Workload::Sql => Self::sql(database_identity), Workload::Subscribe => Self::subscribe(database_identity), + Workload::Unsubscribe => Self::unsubscribe(database_identity), Workload::Update => Self::incremental_update(database_identity), } } @@ -175,6 +179,11 @@ impl ExecutionContext { Self::new(database, None, WorkloadType::Subscribe) } + /// Returns an [ExecutionContext] for an initial unsubscribe call. + pub fn unsubscribe(database: Identity) -> Self { + Self::new(database, None, WorkloadType::Unsubscribe) + } + /// Returns an [ExecutionContext] for a subscription update. pub fn incremental_update(database: Identity) -> Self { Self::new(database, None, WorkloadType::Update) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 3b0c807b37a..79669c124f6 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -540,7 +540,7 @@ impl ModuleHost { pub async fn disconnect_client(&self, client_id: ClientActorId) { let this = self.clone(); let _ = tokio::task::spawn_blocking(move || { - this.subscriptions().remove_subscriber(client_id); + this.subscriptions().remove_all_subscribers(client_id); }) .await; // ignore NoSuchModule; if the module's already closed, that's fine diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index f44df10503a..743c965cc9e 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -8,11 +8,12 @@ use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, use crate::messages::websocket::TableUpdate; use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; -use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, WebsocketFormat}; +use spacetimedb_client_api_messages::websocket::{Compression, QueryId, QueryUpdate, RowListLen as _, WebsocketFormat}; use spacetimedb_lib::db::error::AuthError; use spacetimedb_lib::relation::DbTable; use spacetimedb_lib::{Identity, ProductValue}; use spacetimedb_primitives::TableId; +use spacetimedb_sats::u256; use spacetimedb_vm::eval::IterRows; use spacetimedb_vm::expr::{AuthAccess, NoInMemUsed, Query, QueryExpr, SourceExpr, SourceId}; use spacetimedb_vm::rel_ops::RelOps; @@ -41,6 +42,20 @@ pub struct QueryHash { data: [u8; 32], } +impl Into for QueryHash { + fn into(self) -> QueryId { + let ptr = &self.data as *const [u8; 32] as *const u128; + QueryId { hash: unsafe { u256::from_words(*ptr, *ptr.wrapping_add(1)) } } + } +} + +impl Into for QueryId { + fn into(self) -> QueryHash { + let ptr = &self.hash.0 as *const [u128; 2] as *const [u8; 32]; + QueryHash { data: unsafe { *ptr } } + } +} + impl QueryHash { pub const NONE: Self = Self { data: [0; 32] }; @@ -208,7 +223,7 @@ impl ExecutionUnit { sql: &str, slow_query_threshold: Option, compression: Compression, - ) -> Option> { + ) -> TableUpdate { let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx.workload()).log_guard(); // Build & execute the query and then encode it to a row list. @@ -217,12 +232,10 @@ impl ExecutionUnit { let inserts = inserts.iter(); let (inserts, num_rows) = F::encode_list(inserts); - (!inserts.is_empty()).then(|| { - let deletes = F::List::default(); - let qu = QueryUpdate { deletes, inserts }; - let update = F::into_query_update(qu, compression); - TableUpdate::new(self.return_table(), self.return_name(), (update, num_rows)) - }) + let deletes = F::List::default(); + let qu = QueryUpdate { deletes, inserts }; + let update = F::into_query_update(qu, compression); + TableUpdate::new(self.return_table(), self.return_name(), (update, num_rows)) } /// Evaluate this execution unit against the given delta tables. diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 61596e560d7..7d3dbafcabf 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,9 +1,9 @@ use super::execution_unit::{ExecutionUnit, QueryHash}; use super::module_subscription_manager::SubscriptionManager; use super::query::compile_read_only_query; -use super::subscription::ExecutionSet; -use crate::client::messages::{SubscriptionUpdateMessage, TransactionUpdateMessage}; +use crate::client::messages::{SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage}; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; +use crate::db::datastore::locking_tx_datastore::tx::TxId; use crate::db::datastore::system_tables::StVarTable; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; @@ -14,8 +14,8 @@ use crate::sql::ast::SchemaViewer; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use parking_lot::RwLock; -use spacetimedb_client_api_messages::websocket::FormatSwitch; use spacetimedb_expr::check::compile_sql_sub; +use spacetimedb_client_api_messages::websocket::{BsatnFormat, FormatSwitch, JsonFormat, TableUpdate, Unsubscribe}; use spacetimedb_expr::ty::TyCtx; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::Identity; @@ -46,109 +46,82 @@ impl ModuleSubscriptions { } } - /// Add a subscriber to the module. NOTE: this function is blocking. - #[tracing::instrument(skip_all)] - pub fn add_subscriber( - &self, - sender: Arc, - subscription: Subscribe, - timer: Instant, - _assert: Option, - ) -> Result<(), DBError> { - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { - self.relational_db.release_tx(tx); - }); - let request_id = subscription.request_id; - let auth = AuthCtx::new(self.owner_identity, sender.id.identity); - let mut queries = vec![]; - - let guard = self.subscriptions.read(); - - for sql in subscription - .query_strings - .iter() - .map(|sql| super::query::WHITESPACE.replace_all(sql, " ")) - { - let sql = sql.trim(); - if sql == super::query::SUBSCRIBE_TO_ALL_QUERY { - queries.extend( - super::subscription::get_all(&self.relational_db, &tx, &auth)? - .into_iter() - .map(|query| { - let hash = QueryHash::from_string(&query.sql); - ExecutionUnit::new(query, hash).map(Arc::new) - }) - .collect::, _>>()?, - ); - continue; - } - let hash = QueryHash::from_string(sql); - if let Some(unit) = guard.query(&hash) { - queries.push(unit); - } else { - // NOTE: The following ensures compliance with the 1.0 sql api. - // Come 1.0, it will have replaced the current compilation stack. - compile_sql_sub( - &mut TyCtx::default(), - sql, - &SchemaViewer::new(&self.relational_db, &*tx, &auth), - )?; - - let mut compiled = compile_read_only_query(&self.relational_db, &auth, &tx, sql)?; - // Note that no error path is needed here. - // We know this vec only has a single element, - // since `parse_and_type_sub` guarantees it. - // This check will be removed come 1.0. - if compiled.len() == 1 { - queries.push(Arc::new(ExecutionUnit::new(compiled.remove(0), hash)?)); - } - } - } - - drop(guard); - - let execution_set: ExecutionSet = queries.into(); - - execution_set - .check_auth(auth.owner, auth.caller) - .map_err(ErrorVm::Auth)?; + fn run_subscriber(&self, sender: Arc, query: Arc, auth: AuthCtx, tx: &TxId) -> + Result, TableUpdate>, DBError> { + query.check_auth(auth.owner, auth.caller).map_err(ErrorVm::Auth)?; check_row_limit( - &execution_set, + &query, &self.relational_db, &tx, - |execution_set, tx| execution_set.row_estimate(tx), + |query, tx| query.row_estimate(tx), &auth, )?; let slow_query_threshold = StVarTable::sub_limit(&self.relational_db, &tx)?.map(Duration::from_millis); - let database_update = match sender.config.protocol { - Protocol::Text => FormatSwitch::Json(execution_set.eval( + Ok(match sender.config.protocol { + Protocol::Binary => FormatSwitch::Bsatn(query.eval( &self.relational_db, &tx, + &query.sql, slow_query_threshold, sender.config.compression, )), - Protocol::Binary => FormatSwitch::Bsatn(execution_set.eval( + Protocol::Text => FormatSwitch::Json(query.eval( &self.relational_db, &tx, + &query.sql, slow_query_threshold, sender.config.compression, )), + }) + } + + /// Add a subscriber to the module. NOTE: this function is blocking. + #[tracing::instrument(skip_all)] + pub fn add_subscriber( + &self, + sender: Arc, + request: Subscribe, + timer: Instant, + _assert: Option, + ) -> Result<(), DBError> { + let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { + self.relational_db.release_tx(tx); + }); + let request_id = request.request_id; + let auth = AuthCtx::new(self.owner_identity, sender.id.identity); + let guard = self.subscriptions.read(); + let query = super::query::WHITESPACE.replace_all(&request.query, " "); + let sql = query.trim(); + let hash = QueryHash::from_string(sql); + let query = if let Some(unit) = guard.query(&hash) { unit } else { + // NOTE: The following ensures compliance with the 1.0 sql api. + // Come 1.0, it will have replaced the current compilation stack. + compile_sql_sub( + &mut TyCtx::default(), + sql, + &SchemaViewer::new(&self.relational_db, &*tx, &auth), + )?; + + let compiled = compile_read_only_query(&self.relational_db, &auth, &tx, sql)?; + Arc::new(ExecutionUnit::new(compiled, hash)?) }; + drop(guard); + + let table_rows = self.run_subscriber(sender.clone(), query.clone(), auth, &tx)?; + // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here // but that should not pose an issue. let mut subscriptions = self.subscriptions.write(); - subscriptions.remove_subscription(&(sender.id.identity, sender.id.address)); - subscriptions.add_subscription(sender.clone(), execution_set.into_iter()); - let num_queries = subscriptions.num_queries(); + subscriptions.add_subscription(sender.clone(), query.clone()); WORKER_METRICS .subscription_queries .with_label_values(&self.relational_db.database_identity()) - .set(num_queries as i64); + .set(subscriptions.num_queries() as i64); #[cfg(test)] if let Some(assert) = _assert { @@ -159,21 +132,65 @@ impl ModuleSubscriptions { // thread it's possible for messages to get sent to the client out of order. If you do // spawn in another thread messages will need to be buffered until the state is sent out // on the wire - let _ = sender.send_message(SubscriptionUpdateMessage { - database_update, + let _ = sender.send_message(SubscriptionMessage { request_id: Some(request_id), + query_id: Some(query.hash().into()), timer: Some(timer), + result: SubscriptionResult::Subscribe(SubscriptionRows { + table_id: query.return_table(), + table_name: query.return_name(), + table_rows + }), }); Ok(()) } - pub fn remove_subscriber(&self, client_id: ClientActorId) { + pub fn remove_subscriber(&self, sender: Arc, request: Unsubscribe, timer: Instant) -> Result<(), DBError> { let mut subscriptions = self.subscriptions.write(); - subscriptions.remove_subscription(&(client_id.identity, client_id.address)); - WORKER_METRICS - .subscription_queries - .with_label_values(&self.relational_db.database_identity()) - .set(subscriptions.num_queries() as i64); + + // NOTE: See note in add_subscriber() + let _ = sender.send_message(if let Some(query) = subscriptions.remove_subscription(&(sender.id.identity, sender.id.address), request.query_id.into()) { + let auth = AuthCtx::new(self.owner_identity, sender.id.identity); + let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { + self.relational_db.release_tx(tx); + }); + let table_rows = self.run_subscriber(sender.clone(), query.clone(), auth, &tx)?; + + WORKER_METRICS + .subscription_queries + .with_label_values(&self.relational_db.database_identity()) + .set(subscriptions.num_queries() as i64); + + SubscriptionMessage { + request_id: Some(request.request_id), + query_id: Some(query.hash().into()), + timer: Some(timer), + result: SubscriptionResult::Unsubscribe(SubscriptionRows { + table_id: query.return_table(), + table_name: query.return_name(), + table_rows + }), + } + } + else { + SubscriptionMessage { + request_id: Some(request.request_id), + query_id: None, + timer: Some(timer), + result: SubscriptionResult::Error(SubscriptionError { + table_id: None, + message: "No such query".into(), + }) + } + }); + + Ok(()) + } + + pub fn remove_all_subscribers(&self, client_id: ClientActorId) { + let mut subscriptions = self.subscriptions.write(); + + subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.address)); } /// Commit a transaction and broadcast its ModuleEvent to all interested subscribers. @@ -258,7 +275,7 @@ mod tests { let module_subscriptions = ModuleSubscriptions::new(db.clone(), owner); let subscribe = Subscribe { - query_strings: [sql.into()].into(), + query: sql.into(), request_id: 0, }; module_subscriptions.add_subscriber(sender, subscribe, Instant::now(), assert) diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 3639a06589b..c79d6c42552 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -23,6 +23,12 @@ type Query = Arc; type Client = Arc; type SwitchedDbUpdate = FormatSwitch, ws::DatabaseUpdate>; +#[derive(Debug)] +pub struct ClientWithRefCount { + pub client: Client, + pub ref_count: u32, +} + /// Responsible for the efficient evaluation of subscriptions. /// It performs basic multi-query optimization, /// in that if a query has N subscribers, @@ -31,7 +37,7 @@ type SwitchedDbUpdate = FormatSwitch, ws::Databa #[derive(Debug, Default)] pub struct SubscriptionManager { // Subscriber identities and their client connections. - clients: HashMap, + clients: HashMap, // Queries for which there is at least one subscriber. queries: HashMap, // The subscribers for each query. @@ -42,7 +48,7 @@ pub struct SubscriptionManager { impl SubscriptionManager { pub fn client(&self, id: &Id) -> Client { - self.clients[id].clone() + self.clients[id].client.clone() } pub fn query(&self, hash: &QueryHash) -> Option { @@ -72,15 +78,62 @@ impl SubscriptionManager { /// If a query is not already indexed, /// its table ids added to the inverted index. #[tracing::instrument(skip_all)] - pub fn add_subscription(&mut self, client: Client, queries: impl IntoIterator) { - let id = (client.id.identity, client.id.address); - self.clients.insert(id, client); - for unit in queries { - let hash = unit.hash(); - self.tables.entry(unit.return_table()).or_default().insert(hash); - self.tables.entry(unit.filter_table()).or_default().insert(hash); - self.subscribers.entry(hash).or_default().insert(id); - self.queries.insert(hash, unit); + pub fn add_subscription(&mut self, client: Client, query: Query) { + let client_id = (client.id.identity, client.id.address); + self.clients.entry(client_id) + .and_modify(|e| e.ref_count += 1) + .or_insert(ClientWithRefCount {client, ref_count: 0}); + + let hash = query.hash(); + self.tables.entry(query.return_table()).or_default().insert(hash); + self.tables.entry(query.filter_table()).or_default().insert(hash); + self.subscribers.entry(hash).or_default().insert(client_id); + self.queries.insert(hash, query); + } + + /// Removes a query from the client's subscriber mapping. + /// If a query no longer has any subscribers, + /// it is removed from the index along with its table ids. + /// If a client no longer has any queries, + /// it is removed from the index. + #[tracing::instrument(skip_all)] + pub fn remove_subscription(&mut self, client_id: &Id, query: QueryHash) -> Option { + // Remove `hash` from the set of queries for `table_id`. + // When the table has no queries, cleanup the map entry altogether. + let mut remove_table_query = |table_id: TableId| { + if let Entry::Occupied(mut entry) = self.tables.entry(table_id) { + let hashes = entry.get_mut(); + if hashes.remove(&query) && hashes.is_empty() { + entry.remove(); + } + } + }; + + if let Some(client) = self.clients.get_mut(client_id) { + client.ref_count -= 1; + if client.ref_count == 0 { + self.clients.remove(client_id); + } + } + + if let Some(ids) = self.subscribers.get_mut(&query) { + ids.remove(client_id); + if ids.is_empty() { + if let Some(query) = self.queries.remove(&query) { + remove_table_query(query.return_table()); + remove_table_query(query.filter_table()); + Some(query) + } + else { + None + } + } + else { + Some(self.queries[&query].clone()) + } + } + else { + None } } @@ -88,7 +141,7 @@ impl SubscriptionManager { /// If a query no longer has any subscribers, /// it is removed from the index along with its table ids. #[tracing::instrument(skip_all)] - pub fn remove_subscription(&mut self, client: &Id) { + pub fn remove_all_subscriptions(&mut self, client: &Id) { // Remove `hash` from the set of queries for `table_id`. // When the table has no queries, cleanup the map entry altogether. let mut remove_table_query = |table_id: TableId, hash: &QueryHash| { @@ -178,7 +231,7 @@ impl SubscriptionManager { } self.subscribers.get(hash).into_iter().flatten().map(move |id| { - let client = &*self.clients[id]; + let client = &*self.clients[id].client; let update = match client.config.protocol { Protocol::Binary => Bsatn(memo_encode::(&delta.updates, client, &mut ops_bin)), Protocol::Text => Json(memo_encode::(&delta.updates, client, &mut ops_json)), @@ -343,7 +396,7 @@ mod tests { let client = Arc::new(client(0)); let mut subscriptions = SubscriptionManager::default(); - subscriptions.add_subscription(client, [plan]); + subscriptions.add_subscription(client, plan); assert!(subscriptions.contains_query(&hash)); assert!(subscriptions.contains_subscription(&id, &hash)); @@ -365,8 +418,8 @@ mod tests { let client = Arc::new(client(0)); let mut subscriptions = SubscriptionManager::default(); - subscriptions.add_subscription(client, [plan]); - subscriptions.remove_subscription(&id); + subscriptions.add_subscription(client, plan.clone()); + subscriptions.remove_subscription(&id, plan.hash()); assert!(!subscriptions.contains_query(&hash)); assert!(!subscriptions.contains_subscription(&id, &hash)); @@ -388,14 +441,14 @@ mod tests { let client = Arc::new(client(0)); let mut subscriptions = SubscriptionManager::default(); - subscriptions.add_subscription(client.clone(), [plan.clone()]); - subscriptions.add_subscription(client.clone(), [plan.clone()]); + subscriptions.add_subscription(client.clone(), plan.clone()); + subscriptions.add_subscription(client.clone(), plan.clone()); assert!(subscriptions.contains_query(&hash)); assert!(subscriptions.contains_subscription(&id, &hash)); assert!(subscriptions.query_reads_from_table(&hash, &table_id)); - subscriptions.remove_subscription(&id); + subscriptions.remove_subscription(&id, plan.hash()); assert!(!subscriptions.contains_query(&hash)); assert!(!subscriptions.contains_subscription(&id, &hash)); @@ -420,15 +473,15 @@ mod tests { let client1 = Arc::new(client(1)); let mut subscriptions = SubscriptionManager::default(); - subscriptions.add_subscription(client0, [plan.clone()]); - subscriptions.add_subscription(client1, [plan.clone()]); + subscriptions.add_subscription(client0, plan.clone()); + subscriptions.add_subscription(client1, plan.clone()); assert!(subscriptions.contains_query(&hash)); assert!(subscriptions.contains_subscription(&id0, &hash)); assert!(subscriptions.contains_subscription(&id1, &hash)); assert!(subscriptions.query_reads_from_table(&hash, &table_id)); - subscriptions.remove_subscription(&id0); + subscriptions.remove_subscription(&id0, plan.hash()); assert!(subscriptions.contains_query(&hash)); assert!(subscriptions.contains_subscription(&id1, &hash)); @@ -465,8 +518,10 @@ mod tests { let client1 = Arc::new(client(1)); let mut subscriptions = SubscriptionManager::default(); - subscriptions.add_subscription(client0, [plan_scan.clone(), plan_select0.clone()]); - subscriptions.add_subscription(client1, [plan_scan.clone(), plan_select1.clone()]); + subscriptions.add_subscription(client0.clone(), plan_scan.clone()); + subscriptions.add_subscription(client0.clone(), plan_select0.clone()); + subscriptions.add_subscription(client1.clone(), plan_scan.clone()); + subscriptions.add_subscription(client1.clone(), plan_select1.clone()); assert!(subscriptions.contains_query(&hash_scan)); assert!(subscriptions.contains_query(&hash_select0)); @@ -486,7 +541,8 @@ mod tests { assert!(!subscriptions.query_reads_from_table(&hash_select0, &s)); assert!(!subscriptions.query_reads_from_table(&hash_select1, &t)); - subscriptions.remove_subscription(&id0); + subscriptions.remove_subscription(&id0, plan_scan.hash()); + subscriptions.remove_subscription(&id0, plan_select0.hash()); assert!(subscriptions.contains_query(&hash_scan)); assert!(subscriptions.contains_query(&hash_select1)); diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index a95ffc7f541..7a02efb695c 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -8,13 +8,7 @@ use spacetimedb_lib::identity::AuthCtx; use spacetimedb_vm::expr::{self, Crud, CrudExpr, QueryExpr}; pub(crate) static WHITESPACE: Lazy = Lazy::new(|| Regex::new(r"\s+").unwrap()); -pub const SUBSCRIBE_TO_ALL_QUERY: &str = "SELECT * FROM *"; -// TODO: It's semantically wrong to `SUBSCRIBE_TO_ALL_QUERY` -// as it can only return back the changes valid for the tables in scope *right now* -// instead of **continuously updating** the db changes -// with system table modifications (add/remove tables, indexes, ...). -// /// Variant of [`compile_read_only_query`] which appends `SourceExpr`s into a given `SourceBuilder`, /// rather than returning a new `SourceSet`. /// @@ -25,7 +19,7 @@ pub fn compile_read_only_query( auth: &AuthCtx, tx: &Tx, input: &str, -) -> Result, DBError> { +) -> Result { let input = input.trim(); if input.is_empty() { return Err(SubscriptionError::Empty.into()); @@ -34,31 +28,22 @@ pub fn compile_read_only_query( // Remove redundant whitespace, and in particular newlines, for debug info. let input = WHITESPACE.replace_all(input, " "); - let compiled = compile_sql(relational_db, auth, tx, &input)?; - let mut queries = Vec::with_capacity(compiled.len()); - for q in compiled { - return Err(SubscriptionError::SideEffect(match q { - CrudExpr::Query(x) => { - queries.push(x); - continue; - } - CrudExpr::Insert { .. } => Crud::Insert, - CrudExpr::Update { .. } => Crud::Update, - CrudExpr::Delete { .. } => Crud::Delete, - CrudExpr::SetVar { .. } => Crud::Config, - CrudExpr::ReadVar { .. } => Crud::Config, - }) - .into()); - } - - if !queries.is_empty() { - Ok(queries - .into_iter() - .map(|query| SupportedQuery::new(query, input.to_string())) - .collect::>()?) - } else { - Err(SubscriptionError::Empty.into()) - } + let mut compiled = compile_sql(relational_db, auth, tx, &input)?; + match compiled.len() { + 1 => {}, + 0 => return Err(SubscriptionError::Empty.into()), + _ => return Err(SubscriptionError::Multiple.into()), + } + + Err(SubscriptionError::SideEffect(match compiled.pop().unwrap() { + CrudExpr::Query(query) => return SupportedQuery::new(query, input.to_string()), + CrudExpr::Insert { .. } => Crud::Insert, + CrudExpr::Update { .. } => Crud::Update, + CrudExpr::Delete { .. } => Crud::Delete, + CrudExpr::SetVar { .. } => Crud::Config, + CrudExpr::ReadVar { .. } => Crud::Config, + }) + .into()) } /// The kind of [`QueryExpr`] currently supported for incremental evaluation. @@ -97,7 +82,7 @@ mod tests { use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate}; use crate::sql::execute::collect_result; use crate::sql::execute::tests::run_for_testing; - use crate::subscription::subscription::{get_all, ExecutionSet}; + use crate::subscription::subscription::ExecutionSet; use crate::vm::tests::create_table_with_rows; use crate::vm::DbProgram; use itertools::Itertools; @@ -510,58 +495,15 @@ mod tests { AND MobileEntityState.location_z < 192000"; let tx = db.begin_tx(Workload::ForTests); - let qset = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, sql_query)?; - - for q in qset { - let result = run_query( - &db, - &tx, - q.as_expr(), - AuthCtx::for_testing(), - SourceSet::<_, 0>::empty(), - )?; - assert_eq!(result.len(), 1, "Join query did not return any rows"); - } - - Ok(()) - } - - #[test] - fn test_subscribe_all() -> ResultTest<()> { - let db = TestDB::durable()?; - - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - - let (schema_1, _, _, _) = make_inv(&db, &mut tx, StAccess::Public)?; - let (schema_2, _, _, _) = make_player(&db, &mut tx)?; - db.commit_tx(tx)?; - let row_1 = product!(1u64, "health"); - let row_2 = product!(2u64, "jhon doe"); - let tx = db.begin_tx(Workload::Subscribe); - let s = get_all(&db, &tx, &AuthCtx::for_testing())?.into(); - check_query_eval(&db, &tx, &s, 2, &[row_1.clone(), row_2.clone()])?; - - let data1 = DatabaseTableUpdate { - table_id: schema_1.table_id, - table_name: "inventory".into(), - deletes: [row_1].into(), - inserts: [].into(), - }; - - let data2 = DatabaseTableUpdate { - table_id: schema_2.table_id, - table_name: "player".into(), - deletes: [].into(), - inserts: [row_2].into(), - }; - - let update = DatabaseUpdate { - tables: vec![data1, data2], - }; - - let row_1 = product!(1u64, "health"); - let row_2 = product!(2u64, "jhon doe"); - check_query_incr(&db, &tx, &s, &update, 2, &[row_1, row_2])?; + let q = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, sql_query)?; + let result = run_query( + &db, + &tx, + q.as_expr(), + AuthCtx::for_testing(), + SourceSet::<_, 0>::empty(), + )?; + assert_eq!(result.len(), 1, "Join query did not return any rows"); Ok(()) } @@ -596,18 +538,14 @@ mod tests { "SELECT * FROM lhs WHERE id > 5", ]; for scan in scans { - let expr = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, scan)? - .pop() - .unwrap(); + let expr = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, scan)?; assert_eq!(expr.kind(), Supported::Select, "{scan}\n{expr:#?}"); } // Only index semijoins are supported let joins = ["SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE rhs.y < 10"]; for join in joins { - let expr = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, join)? - .pop() - .unwrap(); + let expr = compile_read_only_query(&db, &AuthCtx::for_testing(), &tx, join)?; assert_eq!(expr.kind(), Supported::Semijoin, "{join}\n{expr:#?}"); } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index c58de78e466..50eae740482 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -33,9 +33,7 @@ use itertools::Either; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat}; use spacetimedb_data_structures::map::HashSet; -use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::db::error::AuthError; -use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::relation::DbTable; use spacetimedb_lib::{Identity, ProductValue}; use spacetimedb_primitives::TableId; @@ -44,7 +42,6 @@ use spacetimedb_vm::rel_ops::RelOps; use spacetimedb_vm::relation::{MemTable, RelValue}; use std::hash::Hash; use std::iter; -use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -521,7 +518,7 @@ impl ExecutionSet { .exec_units // if you need eval to run single-threaded for debugging, change this to .iter() .par_iter() - .filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) + .map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression)) .collect(); ws::DatabaseUpdate { tables } } @@ -599,25 +596,6 @@ impl AuthAccess for ExecutionSet { } } -/// Queries all the [`StTableType::User`] tables *right now* -/// and turns them into [`QueryExpr`], -/// the moral equivalent of `SELECT * FROM table`. -pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) -> Result, DBError> { - Ok(relational_db - .get_all_tables(tx)? - .iter() - .map(Deref::deref) - .filter(|t| { - t.table_type == StTableType::User && (auth.owner == auth.caller || t.table_access == StAccess::Public) - }) - .map(|src| SupportedQuery { - kind: query::Supported::Select, - expr: QueryExpr::new(src), - sql: format!("SELECT * FROM {}", src.table_name), - }) - .collect()) -} - #[cfg(test)] mod tests { use super::*;