Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side implementation of incremental subscription changes #2030

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
10 changes: 5 additions & 5 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
use spacetimedb::subscription::query::compile_read_only_query;
use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
Expand Down Expand Up @@ -102,7 +102,7 @@ fn eval(c: &mut Criterion) {
let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query: ExecutionSet = query.into();

b.iter(|| {
Expand Down Expand Up @@ -141,8 +141,8 @@ fn eval(c: &mut Criterion) {
let select_lhs = "select * from footprint";
let select_rhs = "select * from location";
let tx = &raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
let tx = &tx.into();

Expand All @@ -160,7 +160,7 @@ fn eval(c: &mut Criterion) {
where location.chunk_index = {chunk_index}"
);
let tx = &raw.db.begin_tx(Workload::Update);
let query = compile_read_only_query(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query: ExecutionSet = query.into();
let tx = &tx.into();

Expand Down
159 changes: 157 additions & 2 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use spacetimedb_sats::{
de::{Deserialize, Error},
impl_deserialize, impl_serialize, impl_st,
ser::{serde::SerializeWrapper, Serialize},
AlgebraicType, SpacetimeType,
u256, AlgebraicType, SpacetimeType,
};
use std::{
io::{self, Read as _, Write as _},
Expand Down Expand Up @@ -90,6 +90,10 @@ pub enum ClientMessage<Args> {
Subscribe(Subscribe),
/// Send a one-off SQL query without establishing a subscription.
OneOffQuery(OneOffQuery),
/// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
SubscribeSingle(SubscribeSingle),
/// Remove a subscription to a SQL query that was added with SubscribeSingle.
Unsubscribe(Unsubscribe),
}

impl<Args> ClientMessage<Args> {
Expand All @@ -106,8 +110,10 @@ impl<Args> ClientMessage<Args> {
request_id,
flags,
}),
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
}
}
}
Expand Down Expand Up @@ -162,6 +168,19 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
});

/// A hash of a query. I'm not sure this has a use in the client other than for debugging.
#[derive(SpacetimeType, Clone, Debug)]
#[sats(crate = spacetimedb_lib)]
pub struct QueryId {
pub hash: u256,
}

impl QueryId {
pub fn new(hash: u256) -> Self {
Self { hash }
}
}

/// Sent by client to database to register a set of queries, about which the client will
/// receive `TransactionUpdate`s.
///
Expand All @@ -184,6 +203,39 @@ pub struct Subscribe {
pub request_id: u32,
}

/// Sent by client to register a subscription to single query, for which the client should receive
/// receive relevant `TransactionUpdate`s.
///
/// After issuing a `SubscribeSingle` message, the client will receive a single
/// `SubscribeApplied` message containing every current row which matches the query. Then, any
/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
/// containing the relevant updates.
///
/// If a client subscribes to queries with overlapping results, the client will receive
/// multiple copies of rows that appear in multiple queries.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeSingle {
/// A single SQL `SELECT` query to subscribe to.
pub query: Box<str>,
/// An identifier for a client request.
/// This should not be reused for any other subscriptions on the same connection.
/// TODO: Can we call this subscription_id? It feels odd that this request_id will be used for multiple messages (the Unsubscribe call).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some confusion here due to a typo in the proposal. The intended design is:

  • Each request/response pair has a unique request_id, assigned by the client. It is not an error for the client to re-use request_ids, as their only purpose is to correlate a request/response pair; they have no semantic meaning to the host.
  • This means that Unsubscribe gets a unique request_id distinct from the one in SubscribeSingle.
  • The host looks at the QueryId included in the Unsubscribe message to determine which query is being removed.

It looks like at some point, someone (almost certainly me) made a copy-paste error in the proposal:

/// Server response to a client `Unsubscribe` request.
struct UnsubscribeApplied {
    /// Provided by the client via the `Subscribe` message.
    request_id: u32,

This should say, "Provided by the client via the Unsubscribe message." That is, the host treats it as an opaque nonce, and does not retain it or recognize it after sending its response.

Sorry for the confusion!

pub request_id: u32,
}

/// Client request for removing a query from a subscription.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct Unsubscribe {
/// An identifier for a client request.
pub request_id: u32,
/// The ID returned in the [`SubscribeApplied`] message.
/// An optimization to avoid reparsing and normalizing the query string.
/// TODO: I assume this is just used for debugging?
pub query_id: QueryId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment. This query_id is (supposed to be) the thing you want to call subscription_id.

}

/// A one-off query submission.
///
/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
Expand Down Expand Up @@ -213,6 +265,7 @@ pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
#[sats(crate = spacetimedb_lib)]
pub enum ServerMessage<F: WebsocketFormat> {
/// Informs of changes to subscribed rows.
/// This will be removed when we switch to `SubscribeSingle`.
InitialSubscription(InitialSubscription<F>),
/// Upon reducer run.
TransactionUpdate(TransactionUpdate<F>),
Expand All @@ -222,6 +275,99 @@ pub enum ServerMessage<F: WebsocketFormat> {
IdentityToken(IdentityToken),
/// Return results to a one off SQL query.
OneOffQueryResponse(OneOffQueryResponse<F>),
/// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
SubscribeApplied(SubscribeApplied<F>),
/// Sent in response to an `Unsubscribe` message. This contains the matching rows.
UnsubscribeApplied(UnsubscribeApplied<F>),
/// Communicate an error in the subscription lifecycle.
SubscriptionError(SubscriptionError),
/// Informs of changes to subscribed rows.
SubscriptionUpdate(SubscriptionUpdate<F>),
jsdt marked this conversation as resolved.
Show resolved Hide resolved
}

/// The matching rows of a subscription query.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeRows<F: WebsocketFormat> {
/// The table ID of the query.
pub table_id: TableId,
/// The table name of the query.
pub table_name: Box<str>,
/// The BSATN row values.
pub table_rows: TableUpdate<F>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeApplied<F: WebsocketFormat> {
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// An identifier for the subscribed query, allocated by the server.
pub query_id: QueryId,
/// The matching rows for this query.
pub rows: SubscribeRows<F>,
}

/// Server response to a client [`Unsubscribe`] request.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct UnsubscribeApplied<F: WebsocketFormat> {
/// Provided by the client via the `Subscribe` message.
/// TODO: switch to subscription id?
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
pub query_id: QueryId,
/// The matching rows for this query.
/// TODO: Sending row ids could reduce bandwidth here.
jsdt marked this conversation as resolved.
Show resolved Hide resolved
pub rows: SubscribeRows<F>,
}

/// Server response to an error at any point of the subscription lifecycle.
/// TODO: How are we supposed to handle errors without a request_id?
/// Should we drop all subscriptions?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the SDK receives a SubscriptionError message:

  1. The client state is updated such that all SubscriptionHandles are ended.
  2. All rows are removed from the client cache.
  3. on_delete callbacks are invoked.
  4. on_error callbacks are invoked.

NOTE: If a SubscriptionError is returned in response to a Subscribe,
only that particular SubscriptionHandle is ended.
That is, all other active queries shall remain active.
No on_delete callbacks are invoked.

#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionError {
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
/// [`None`] if this occurred as the result of a [`TransactionUpdate`].
pub request_id: Option<u32>,
/// The return table of the query in question.
/// The server is not required to set this field.
/// It has been added to avoid a breaking change post 1.0.
///
/// If unset, an error results in the entire subscription being dropped.
/// Otherwise only queries of this table type must be dropped.
pub table_id: Option<TableId>,
/// An error message describing the failure.
///
/// This should reference specific fragments of the query where applicable,
/// but should not include the full text of the query,
/// as the client can retrieve that from the `request_id`.
///
/// This is intended for diagnostic purposes.
/// It need not have a predictable/parseable format.
pub error: Box<str>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionUpdate<F: WebsocketFormat> {
/// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
pub database_update: DatabaseUpdate<F>,
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
}

/// Response to [`Subscribe`] containing the initial matching rows.
Expand Down Expand Up @@ -397,6 +543,15 @@ impl<F: WebsocketFormat> TableUpdate<F> {
}
}

pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
Self {
table_id,
table_name,
num_rows: 0,
updates: SmallVec::new(),
}
}

pub fn push(&mut self, (update, num_rows): (F::QueryUpdate, u64)) {
self.updates.push(update);
self.num_rows += num_rows;
Expand Down
24 changes: 22 additions & 2 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use spacetimedb_client_api_messages::websocket::{CallReducerFlags, Compression, FormatSwitch};
use spacetimedb_client_api_messages::websocket::{
CallReducerFlags, Compression, FormatSwitch, SubscribeSingle, Unsubscribe,
};
use spacetimedb_lib::identity::RequestId;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
Expand Down Expand Up @@ -283,12 +285,30 @@ impl ClientConnection {
.await
}

pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscription(me.sender, subscription, timer, None)
})
.await
.unwrap() // TODO: is unwrapping right here?
}

pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || me.module.subscriptions().remove_subscription(me.sender, request, timer))
.await
.unwrap() // TODO: is unwrapping right here?
}

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscriber(me.sender, subscription, timer, None)
.add_legacy_subscriber(me.sender, subscription, timer, None)
})
.await
.unwrap()
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
)
})
}
ClientMessage::SubscribeSingle(subscription) => {
let res = client.subscribe_single(subscription, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Subscribe(subscription) => {
let res = client.subscribe(subscription, timer).await;
WORKER_METRICS
Expand Down
Loading
Loading