1
- use std:: { fmt :: Error , sync:: Arc } ;
1
+ use std:: sync:: Arc ;
2
2
3
3
use super :: {
4
4
query:: compile_read_only_query,
@@ -18,7 +18,7 @@ use crate::{
18
18
db:: relational_db:: Tx ,
19
19
host:: module_host:: { EventStatus , ModuleEvent } ,
20
20
} ;
21
- use futures:: stream:: FuturesUnordered ;
21
+ use futures:: { stream:: FuturesUnordered , FutureExt , StreamExt } ;
22
22
use parking_lot:: RwLock ;
23
23
use spacetimedb_lib:: identity:: AuthCtx ;
24
24
use spacetimedb_lib:: Identity ;
@@ -99,12 +99,11 @@ impl ModuleSubscriptionManager {
99
99
}
100
100
}
101
101
102
- type SubscriptionVecRw = Vec < Arc < RwLock < Subscription > > > ;
102
+ type SubscriptionRw = Arc < RwLock < Subscription > > ;
103
103
104
- // Hold subscription lock always after initiating tx.
105
104
struct ModuleSubscriptionActor {
106
105
relational_db : Arc < RelationalDB > ,
107
- subscriptions : SubscriptionVecRw ,
106
+ subscriptions : Vec < SubscriptionRw > ,
108
107
owner_identity : Identity ,
109
108
}
110
109
@@ -192,54 +191,61 @@ impl ModuleSubscriptionActor {
192
191
}
193
192
194
193
async fn broadcast_commit_event ( & mut self , event : ModuleEvent ) -> Result < ( ) , DBError > {
194
+ async fn _broadcast_commit_event (
195
+ auth : AuthCtx ,
196
+ mut event : ModuleEvent ,
197
+ subscription : SubscriptionRw ,
198
+ relational_db : Arc < RelationalDB > ,
199
+ ) -> Result < ( ) , DBError > {
200
+ let ctx = ExecutionContext :: incremental_update ( relational_db. address ( ) ) ;
201
+ let mut tx = relational_db. begin_tx ( ) ;
202
+ let database_update = event. status . database_update ( ) . unwrap ( ) ;
203
+ let subscription = subscription. read_arc ( ) ;
204
+ let futures = FuturesUnordered :: new ( ) ;
205
+
206
+ let incr = subscription
207
+ . queries
208
+ . eval_incr ( & relational_db, & mut tx, database_update, auth) ?;
209
+
210
+ if incr. tables . is_empty ( ) {
211
+ return Ok ( ( ) ) ;
212
+ }
213
+
214
+ let message = TransactionUpdateMessage {
215
+ event : & mut event,
216
+ database_update : incr,
217
+ } ;
218
+ let mut message = CachedMessage :: new ( message) ;
219
+
220
+ for subscriber in subscription. subscribers ( ) {
221
+ // rustc realllly doesn't like subscriber.send_message(message) here for weird
222
+ // lifetime reasons, even though it would be sound
223
+ let message = message. serialize ( subscriber. protocol ) ;
224
+ futures. push ( subscriber. send ( message) . map ( drop) )
225
+ }
226
+ futures. collect :: < ( ) > ( ) . await ;
227
+
228
+ relational_db. release_tx ( & ctx, tx) ;
229
+ Ok ( ( ) )
230
+ }
231
+
195
232
let auth = AuthCtx :: new ( self . owner_identity , event. caller_identity ) ;
196
233
let relational_db = self . relational_db . clone ( ) ;
197
- let futures: FuturesUnordered < tokio:: task:: JoinHandle < Result < ( ) , Error > > > = FuturesUnordered :: new ( ) ;
234
+ let futures: FuturesUnordered < tokio:: task:: JoinHandle < Result < ( ) , DBError > > > = FuturesUnordered :: new ( ) ;
198
235
199
236
for subscription in & self . subscriptions {
200
- let subscription = subscription. clone ( ) ;
201
- let relational_db = relational_db. clone ( ) ; // Clone for each subscription
202
- let mut event_clone = event. clone ( ) ; // Clone for each subscription
203
- let ctx = ExecutionContext :: incremental_update ( relational_db. address ( ) ) ;
204
- let mut tx = relational_db. begin_tx ( ) ;
205
- let auth_clone = auth. clone ( ) ; // Clone for each subscription
206
- let future: JoinHandle < Result < ( ) , _ > > = tokio:: spawn ( async move {
207
- let database_update = event_clone. status . database_update ( ) . unwrap ( ) ;
208
- let subscription = subscription. read_arc ( ) ;
209
-
210
- match subscription
211
- . queries
212
- . eval_incr ( & relational_db, & mut tx, database_update, auth_clone)
213
- {
214
- Ok ( incr) => {
215
- if incr. tables . is_empty ( ) {
216
- return Ok ( ( ) ) ;
217
- }
218
-
219
- let message = TransactionUpdateMessage {
220
- event : & mut event_clone,
221
- database_update : incr,
222
- } ;
223
- let mut message = CachedMessage :: new ( message) ;
224
-
225
- for subscriber in subscription. subscribers ( ) {
226
- let message = message. serialize ( subscriber. protocol ) ;
227
- if let Err ( e) = subscriber. send ( message) . await {
228
- log:: error!( "Error sending message to subscriber: {:?}" , e) ;
229
- }
230
- }
231
- }
232
- Err ( e) => log:: error!( "Error occurred in broadcasting commit event: {:?}" , e) ,
233
- }
234
-
235
- relational_db. release_tx ( & ctx, tx) ;
236
- Ok ( ( ) )
237
- } ) ;
237
+ let future: JoinHandle < Result < ( ) , _ > > = tokio:: spawn ( _broadcast_commit_event (
238
+ auth,
239
+ event. clone ( ) ,
240
+ subscription. clone ( ) ,
241
+ relational_db. clone ( ) ,
242
+ ) ) ;
238
243
239
244
futures. push ( future) ;
240
245
}
241
246
242
- // futures.collect::<Vec<_>>().await; // Collect into Vec to handle results
247
+ // waiting for for all subscription query sets to process
248
+ futures. collect :: < Vec < _ > > ( ) . await ;
243
249
244
250
Ok ( ( ) )
245
251
}
0 commit comments