Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions impl, server side #1997

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/cli/src/subcommands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct SubscriptionTable {
}

pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
let queries = args.get_many::<String>("query").unwrap();
let query = args.get_one::<String>("query").unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case and because it is not part of the subscription proposal, I would consider not changing the CLI, as there is no way to add another query once you have added the first. Rather, you can issue N subscribe commands. We could also do this as future work if you prefer and make an issue for that.

let num = args.get_one::<u32>("num-updates").copied();
let timeout = args.get_one::<u32>("timeout").copied();
let print_initial_update = args.get_flag("print_initial_update");
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
let (mut ws, _) = tokio_tungstenite::connect_async(req).await?;

let task = async {
subscribe(&mut ws, queries.cloned().map(Into::into).collect()).await?;
subscribe(&mut ws, query.clone().into_boxed_str()).await?;
await_initial_update(&mut ws, print_initial_update.then_some(&module_def)).await?;
consume_transaction_updates(&mut ws, num, &module_def).await
};
Expand All @@ -175,13 +175,13 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
}

/// Send the subscribe message.
async fn subscribe<S>(ws: &mut S, query_strings: Box<[Box<str>]>) -> Result<(), S::Error>
async fn subscribe<S>(ws: &mut S, query: Box<str>) -> Result<(), S::Error>
where
S: Sink<WsMessage> + Unpin,
{
let msg = serde_json::to_string(&SerializeWrapper::new(ws::ClientMessage::<()>::Subscribe(
ws::Subscribe {
query_strings,
query,
request_id: 0,
},
)))
Expand All @@ -201,7 +201,7 @@ where
while let Some(msg) = ws.try_next().await? {
let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(sub) => {
ws::ServerMessage::SubscriptionUpdate(sub) => {
if let Some(module_def) = module_def {
let formatted = reformat_update(&sub.database_update, module_def)?;
let output = serde_json::to_string(&formatted)? + "\n";
Expand Down Expand Up @@ -247,7 +247,7 @@ where

let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(_) => {
ws::ServerMessage::SubscriptionUpdate(_) => {
anyhow::bail!("protocol error: received a second initial subscription update")
}
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { update, .. })
Expand Down
135 changes: 115 additions & 20 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use spacetimedb_sats::{
de::{Deserialize, Error},
impl_deserialize, impl_serialize, impl_st,
ser::{serde::SerializeWrapper, Serialize},
AlgebraicType, SpacetimeType,
AlgebraicType, SpacetimeType, u256
};
use std::{
io::{self, Read as _, Write as _},
Expand Down Expand Up @@ -88,6 +88,8 @@ pub enum ClientMessage<Args> {
CallReducer(CallReducer<Args>),
/// Register SQL queries on which to receive updates.
Subscribe(Subscribe),
/// Unregister SQL queries which are receiving updates.
Unsubscribe(Unsubscribe),
Comment on lines 89 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Register SQL queries on which to receive updates.
Subscribe(Subscribe),
/// Unregister SQL queries which are receiving updates.
Unsubscribe(Unsubscribe),
/// Register a SQL query on which to receive an initial and subsequent updates for.
/// The subscribed set is mutable, that is, this `Subscribe` message
/// can be followed by more, or `Unsubscribe` messages.
Subscribe(Subscribe),
/// Unregister a SQL query which the client is receiving updates for.
Unsubscribe(Unsubscribe),

/// Send a one-off SQL query without establishing a subscription.
OneOffQuery(OneOffQuery),
}
Expand All @@ -107,6 +109,7 @@ impl<Args> ClientMessage<Args> {
flags,
}),
ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
}
}
Expand Down Expand Up @@ -162,6 +165,12 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
});

#[derive(SpacetimeType, Clone, Debug)]
#[sats(crate = spacetimedb_lib)]
pub struct QueryId {
pub hash: u256,
}

/// Sent by client to database to register a set of queries, about which the client will
/// receive `TransactionUpdate`s.
///
Expand All @@ -179,9 +188,21 @@ impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct Subscribe {
/// A sequence of SQL queries.
pub query_strings: Box<[Box<str>]>,
/// A single SQL `SELECT` query to subscribe to.
pub query: Box<str>,
/// An identifier for a client request.
pub request_id: u32,
}

/// Client request for removing a query from a subscription.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct Unsubscribe {
/// The ID returned in the [`SubscribeApplied`] message.
/// An optimization to avoid reparsing and normalizing the query string.
pub request_id: u32,
/// An identifier for a client request.
pub query_id: QueryId,
}

/// A one-off query submission.
Expand Down Expand Up @@ -212,31 +233,24 @@ pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
#[derive(SpacetimeType, derive_more::From)]
#[sats(crate = spacetimedb_lib)]
pub enum ServerMessage<F: WebsocketFormat> {
/// After connecting, to inform client of its identity.
IdentityToken(IdentityToken),
Comment on lines +236 to +237
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This will conflict with the ids-not-names stuff.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the ids-not-names stuff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Request a stream of changes to subscribed rows.
SubscribeApplied(SubscribeApplied<F>),
/// Cancel a stream of changes from subscribed rows.
UnsubscribeApplied(UnsubscribeApplied<F>),
/// Communicate an error in the subscription lifecycle.
SubscriptionError(SubscriptionError),
/// Informs of changes to subscribed rows.
InitialSubscription(InitialSubscription<F>),
SubscriptionUpdate(SubscriptionUpdate<F>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be removed? And corresponding then-dead-code?

/// Upon reducer run.
TransactionUpdate(TransactionUpdate<F>),
/// Upon reducer run, but limited to just the table updates.
TransactionUpdateLight(TransactionUpdateLight<F>),
/// After connecting, to inform client of its identity.
IdentityToken(IdentityToken),
/// Return results to a one off SQL query.
OneOffQueryResponse(OneOffQueryResponse<F>),
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct InitialSubscription<F: WebsocketFormat> {
/// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
pub database_update: DatabaseUpdate<F>,
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
}

/// Received by database from client to inform of user's identity, token and client address.
///
/// The database will always send an `IdentityToken` message
Expand All @@ -253,6 +267,87 @@ pub struct IdentityToken {
pub address: Address,
}

/// The matching rows of a subscription query.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeRows<F: WebsocketFormat> {
/// The table ID of the query.
pub table_id: TableId,
/// The table name of the query.
pub table_name: Box<str>,
/// The BSATN row values.
pub table_rows: TableUpdate<F>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscribeApplied<F: WebsocketFormat> {
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// An identifier for the subscribed query, allocated by the server.
pub query_id: QueryId,
/// The matching rows for this query.
pub rows: SubscribeRows<F>,
}

/// Server response to a client [`Unsubscribe`] request.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct UnsubscribeApplied<F: WebsocketFormat> {
/// Provided by the client via the `Subscribe` message.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
pub query_id: QueryId,
/// The matching rows for this query.
pub rows: SubscribeRows<F>,
}

/// Server response to an error at any point of the subscription lifecycle.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionError {
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
/// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
/// [`None`] if this occurred as the result of a [`TransactionUpdate`].
pub request_id: Option<u32>,
/// The return table of the query in question.
/// The server is not required to set this field.
/// It has been added to avoid a breaking change post 1.0.
///
/// If unset, an error results in the entire subscription being dropped.
/// Otherwise only queries of this table type must be dropped.
pub table_id: Option<TableId>,
/// An error message describing the failure.
///
/// This should reference specific fragments of the query where applicable,
/// but should not include the full text of the query,
/// as the client can retrieve that from the `request_id`.
///
/// This is intended for diagnostic purposes.
/// It need not have a predictable/parseable format.
pub error: Box<str>,
}

/// Response to [`Subscribe`] containing the initial matching rows.
#[derive(SpacetimeType)]
#[sats(crate = spacetimedb_lib)]
pub struct SubscriptionUpdate<F: WebsocketFormat> {
/// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
pub database_update: DatabaseUpdate<F>,
/// An identifier sent by the client in requests.
/// The server will include the same request_id in the response.
pub request_id: u32,
/// The overall time between the server receiving a request and sending the response.
pub total_host_execution_duration_micros: u64,
}

// TODO: Evaluate if it makes sense for this to also include the
// address of the database this is calling

Expand Down Expand Up @@ -626,7 +721,7 @@ pub fn brotli_compress(bytes: &[u8], out: &mut Vec<u8>) {

encoder
.read_to_end(out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
.expect("Failed to Brotli compress `bytes`");
}

pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
Expand Down
17 changes: 14 additions & 3 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::messages::{OneOffQueryResponseMessage, SerializableMessage};
use super::{message_handlers, ClientActorId, MessageHandleError};
use crate::error::DBError;
use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
use crate::messages::websocket::Subscribe;
use crate::messages::websocket::{Subscribe, Unsubscribe};
use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
Expand Down Expand Up @@ -283,12 +283,23 @@ impl ClientConnection {
.await
}

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
pub async fn subscribe(&self, request: Subscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.add_subscriber(me.sender, subscription, timer, None)
.add_subscriber(me.sender, request, timer, None)
})
.await
.unwrap()
}

pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
.subscriptions()
.remove_subscriber(me.sender, request, timer)
})
.await
.unwrap()
Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &address, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::OneOffQuery(OneOffQuery {
query_string: query,
message_id,
Expand Down
Loading
Loading