-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subscriptions impl, server side #1997
Conversation
(Seems to be failing tests?) |
@@ -121,7 +121,7 @@ struct SubscriptionTable { | |||
} | |||
|
|||
pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> { | |||
let queries = args.get_many::<String>("query").unwrap(); | |||
let query = args.get_one::<String>("query").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case and because it is not part of the subscription proposal, I would consider not changing the CLI, as there is no way to add another query once you have added the first. Rather, you can issue N subscribe commands. We could also do this as future work if you prefer and make an issue for that.
/// Register SQL queries on which to receive updates. | ||
Subscribe(Subscribe), | ||
/// Unregister SQL queries which are receiving updates. | ||
Unsubscribe(Unsubscribe), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Register SQL queries on which to receive updates. | |
Subscribe(Subscribe), | |
/// Unregister SQL queries which are receiving updates. | |
Unsubscribe(Unsubscribe), | |
/// Register a SQL query on which to receive an initial and subsequent updates for. | |
/// The subscribed set is mutable, that is, this `Subscribe` message | |
/// can be followed by more, or `Unsubscribe` messages. | |
Subscribe(Subscribe), | |
/// Unregister a SQL query which the client is receiving updates for. | |
Unsubscribe(Unsubscribe), |
/// After connecting, to inform client of its identity. | ||
IdentityToken(IdentityToken), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This will conflict with the ids-not-names stuff.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the ids-not-names stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let ptr = &self.data as *const [u8; 32] as *const u128; | ||
QueryId { hash: unsafe { u256::from_words(*ptr, *ptr.wrapping_add(1)) } } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need both a QueryHash
and QueryId
, why not reuse QueryHash = QueryId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why not use u256::from_le_bytes(self.data)
or some such?
impl Into<QueryHash> for QueryId { | ||
fn into(self) -> QueryHash { | ||
let ptr = &self.hash.0 as *const [u128; 2] as *const [u8; 32]; | ||
QueryHash { data: unsafe { *ptr } } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here...
FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::SubscribeApplied { | ||
total_host_execution_duration_micros, | ||
request_id, | ||
query_id, | ||
rows: ws::SubscribeRows { | ||
table_id: result.table_id, | ||
table_name: result.table_name, | ||
table_rows | ||
}, | ||
}.into()), | ||
FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::SubscribeApplied { | ||
total_host_execution_duration_micros, | ||
request_id, | ||
query_id, | ||
rows: ws::SubscribeRows { | ||
table_id: result.table_id, | ||
table_name: result.table_name, | ||
table_rows | ||
}, | ||
}.into()), | ||
} | ||
}, | ||
SubscriptionResult::Unsubscribe(result) => { | ||
protocol.assert_matches_format_switch(&result.table_rows); | ||
match result.table_rows { | ||
FormatSwitch::Bsatn(table_rows) => FormatSwitch::Bsatn(ws::UnsubscribeApplied { | ||
total_host_execution_duration_micros, | ||
request_id, | ||
query_id, | ||
rows: ws::SubscribeRows { | ||
table_id: result.table_id, | ||
table_name: result.table_name, | ||
table_rows | ||
}, | ||
}.into()), | ||
FormatSwitch::Json(table_rows) => FormatSwitch::Json(ws::UnsubscribeApplied { | ||
total_host_execution_duration_micros, | ||
request_id, | ||
query_id, | ||
rows: ws::SubscribeRows { | ||
table_id: result.table_id, | ||
table_name: result.table_name, | ||
table_rows | ||
}, | ||
}.into()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code duplication here is rather unfortunate :(
let mut remove_table_query = |table_id: TableId| { | ||
if let Entry::Occupied(mut entry) = self.tables.entry(table_id) { | ||
let hashes = entry.get_mut(); | ||
if hashes.remove(&query) && hashes.is_empty() { | ||
entry.remove(); | ||
} | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the version below from which it was copied looks like it could be extracted to a method.
} | ||
} | ||
|
||
if let Some(ids) = self.subscribers.get_mut(&query) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use ?
let mut compiled = compile_sql(relational_db, auth, tx, &input)?; | ||
match compiled.len() { | ||
1 => {}, | ||
0 => return Err(SubscriptionError::Empty.into()), | ||
_ => return Err(SubscriptionError::Multiple.into()), | ||
} | ||
|
||
Err(SubscriptionError::SideEffect(match compiled.pop().unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut compiled = compile_sql(relational_db, auth, tx, &input)?; | |
match compiled.len() { | |
1 => {}, | |
0 => return Err(SubscriptionError::Empty.into()), | |
_ => return Err(SubscriptionError::Multiple.into()), | |
} | |
Err(SubscriptionError::SideEffect(match compiled.pop().unwrap() { | |
let mut compiled = compile_sql(relational_db, auth, tx, &input)?; | |
let single = match (compiled.pop(), complied.pop()) { | |
(None, None) => return Err(SubscriptionError::Empty.into()), | |
(Some(_), Some(_)) => return Err(SubscriptionError::Multiple.into()), | |
(Some(x), None) => x, | |
}; | |
Err(SubscriptionError::SideEffect(match single { |
@@ -208,7 +223,7 @@ impl ExecutionUnit { | |||
sql: &str, | |||
slow_query_threshold: Option<Duration>, | |||
compression: Compression, | |||
) -> Option<TableUpdate<F>> { | |||
) -> TableUpdate<F> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? I noticed the filter_map
-> map
@jsdt Note there's an edge case where republishing a module with a hot-swap can remove an index, which can break an indexed semijoin subscription query, since we refuse incremental semijoins which are not indexed on both of the joined columns. The intended semantics here is that, after a hotswap, we recompile all active subscribed queries. If any of them fails to compile, like in the removed-index case, the host sends an error message to the client and then ends the subscription, but the websocket connection is not closed. Any queries that successfully compile stay in place. Please find out if this PR implements that behavior. If it does not, please create a ticket to fix it in a follow-up. |
Closing in favor of #2030 |
Description of Changes
Moving subscriptions from one batch-subscribe call to one call per subscribe with matching unsubscribe call.
API and ABI breaking changes
Updates to the websocket API and introduction of new messages.
Expected complexity level and risk
3: changes the way subscriptions are stored and used.
Testing
In progress