Skip to content

Commit 27fe643

Browse files
xixixaoConvex, Inc.
authored andcommitted
React: Pause websocket manager when fetching auth token (#28422)
Wait for token fetching before subscribing to new queries / running mutations. GitOrigin-RevId: 12985c6f4b854b8fc52493c43987165c9a718a4e
1 parent a42ed28 commit 27fe643

File tree

7 files changed

+369
-94
lines changed

7 files changed

+369
-94
lines changed

src/browser/sync/authentication_manager.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export class AuthenticationManager {
8585
private readonly authenticate: (token: string) => void;
8686
private readonly stopSocket: () => Promise<void>;
8787
private readonly restartSocket: () => void;
88+
private readonly pauseSocket: () => void;
89+
private readonly resumeSocket: () => void;
8890
// Passed down by BaseClient, sends a message to the server
8991
private readonly clearAuth: () => void;
9092
private readonly verbose: boolean;
@@ -95,12 +97,16 @@ export class AuthenticationManager {
9597
authenticate,
9698
stopSocket,
9799
restartSocket,
100+
pauseSocket,
101+
resumeSocket,
98102
clearAuth,
99103
verbose,
100104
}: {
101105
authenticate: (token: string) => void;
102106
stopSocket: () => Promise<void>;
103107
restartSocket: () => void;
108+
pauseSocket: () => void;
109+
resumeSocket: () => void;
104110
clearAuth: () => void;
105111
verbose: boolean;
106112
},
@@ -109,6 +115,8 @@ export class AuthenticationManager {
109115
this.authenticate = authenticate;
110116
this.stopSocket = stopSocket;
111117
this.restartSocket = restartSocket;
118+
this.pauseSocket = pauseSocket;
119+
this.resumeSocket = resumeSocket;
112120
this.clearAuth = clearAuth;
113121
this.verbose = verbose;
114122
}
@@ -118,6 +126,8 @@ export class AuthenticationManager {
118126
onChange: (isAuthenticated: boolean) => void,
119127
) {
120128
this.resetAuthState();
129+
this._logVerbose("pausing WS for auth token fetch");
130+
this.pauseSocket();
121131
const token = await this.fetchTokenAndGuardAgainstRace(fetchToken, {
122132
forceRefreshToken: false,
123133
});
@@ -131,6 +141,8 @@ export class AuthenticationManager {
131141
hasRetried: false,
132142
});
133143
this.authenticate(token.value);
144+
this._logVerbose("resuming WS after auth token fetch");
145+
this.resumeSocket();
134146
} else {
135147
this.setAuthState({
136148
state: "initialRefetch",
@@ -224,12 +236,11 @@ export class AuthenticationManager {
224236
},
225237
);
226238
if (token.isFromOutdatedConfig) {
227-
await this.restartSocket();
228239
return;
229240
}
230241

231242
if (token.value && this.syncState.isNewAuth(token.value)) {
232-
this.syncState.setAuth(token.value);
243+
this.authenticate(token.value);
233244
this.setAuthState({
234245
state: "waitingForServerConfirmationOfFreshToken",
235246
config: this.authState.config,
@@ -245,7 +256,7 @@ export class AuthenticationManager {
245256
}
246257
this.setAndReportAuthFailed(this.authState.config.onAuthChange);
247258
}
248-
await this.restartSocket();
259+
this.restartSocket();
249260
}
250261

251262
// Force refetch the token and schedule another refetch
@@ -288,6 +299,12 @@ export class AuthenticationManager {
288299
}
289300
this.setAndReportAuthFailed(this.authState.config.onAuthChange);
290301
}
302+
// Resuming in case this refetch was triggered
303+
// by an invalid cached token.
304+
this._logVerbose(
305+
"resuming WS after auth token fetch (if currently paused)",
306+
);
307+
this.resumeSocket();
291308
}
292309

293310
private scheduleTokenRefetch(token: string) {

src/browser/sync/client.ts

Lines changed: 96 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ export class BaseConvexClient {
218218
},
219219
stopSocket: () => this.webSocketManager.stop(),
220220
restartSocket: () => this.webSocketManager.restart(),
221+
pauseSocket: () => this.webSocketManager.pause(),
222+
resumeSocket: () => this.webSocketManager.resume(),
221223
clearAuth: () => {
222224
this.clearAuth();
223225
},
@@ -258,89 +260,107 @@ export class BaseConvexClient {
258260

259261
this.webSocketManager = new WebSocketManager(
260262
wsUri,
261-
(reconnectMetadata: ReconnectMetadata) => {
262-
// We have a new WebSocket!
263-
this.mark("convexWebSocketOpen");
264-
this.webSocketManager.sendMessage({
265-
...reconnectMetadata,
266-
type: "Connect",
267-
sessionId: this._sessionId,
268-
maxObservedTimestamp: this.maxObservedTimestamp,
269-
});
270-
271-
// Throw out our remote query, reissue queries
272-
// and outstanding mutations, and reauthenticate.
273-
const oldRemoteQueryResults = new Set(
274-
this.remoteQuerySet.remoteQueryResults().keys(),
275-
);
276-
this.remoteQuerySet = new RemoteQuerySet((queryId) =>
277-
this.state.queryPath(queryId),
278-
);
279-
const [querySetModification, authModification] = this.state.restart(
280-
oldRemoteQueryResults,
281-
);
282-
if (authModification) {
283-
this.webSocketManager.sendMessage(authModification);
284-
}
285-
this.webSocketManager.sendMessage(querySetModification);
286-
for (const message of this.requestManager.restart()) {
287-
this.webSocketManager.sendMessage(message);
288-
}
289-
},
290-
(serverMessage: ServerMessage) => {
291-
// Metrics events grow linearly with reconnection attempts so this
292-
// conditional prevents n^2 metrics reporting.
293-
if (!this.firstMessageReceived) {
294-
this.firstMessageReceived = true;
295-
this.mark("convexFirstMessageReceived");
296-
this.reportMarks();
297-
}
298-
switch (serverMessage.type) {
299-
case "Transition": {
300-
this.observedTimestamp(serverMessage.endVersion.ts);
301-
this.authenticationManager.onTransition(serverMessage);
302-
this.remoteQuerySet.transition(serverMessage);
303-
this.state.transition(serverMessage);
304-
const completedRequests = this.requestManager.removeCompleted(
305-
this.remoteQuerySet.timestamp(),
306-
);
307-
this.notifyOnQueryResultChanges(completedRequests);
308-
break;
263+
{
264+
onOpen: (reconnectMetadata: ReconnectMetadata) => {
265+
// We have a new WebSocket!
266+
this.mark("convexWebSocketOpen");
267+
this.webSocketManager.sendMessage({
268+
...reconnectMetadata,
269+
type: "Connect",
270+
sessionId: this._sessionId,
271+
maxObservedTimestamp: this.maxObservedTimestamp,
272+
});
273+
274+
// Throw out our remote query, reissue queries
275+
// and outstanding mutations, and reauthenticate.
276+
const oldRemoteQueryResults = new Set(
277+
this.remoteQuerySet.remoteQueryResults().keys(),
278+
);
279+
this.remoteQuerySet = new RemoteQuerySet((queryId) =>
280+
this.state.queryPath(queryId),
281+
);
282+
const [querySetModification, authModification] = this.state.restart(
283+
oldRemoteQueryResults,
284+
);
285+
if (authModification) {
286+
this.webSocketManager.sendMessage(authModification);
309287
}
310-
case "MutationResponse": {
311-
if (serverMessage.success) {
312-
this.observedTimestamp(serverMessage.ts);
313-
}
314-
const completedMutationId =
315-
this.requestManager.onResponse(serverMessage);
316-
if (completedMutationId !== null) {
317-
this.notifyOnQueryResultChanges(new Set([completedMutationId]));
318-
}
319-
break;
288+
this.webSocketManager.sendMessage(querySetModification);
289+
for (const message of this.requestManager.restart()) {
290+
this.webSocketManager.sendMessage(message);
320291
}
321-
case "ActionResponse": {
322-
this.requestManager.onResponse(serverMessage);
323-
break;
292+
},
293+
onResume: () => {
294+
const remoteQueryResults = new Set(
295+
this.remoteQuerySet.remoteQueryResults().keys(),
296+
);
297+
const [querySetModification, authModification] =
298+
this.state.resume(remoteQueryResults);
299+
if (authModification) {
300+
this.webSocketManager.sendMessage(authModification);
324301
}
325-
case "AuthError": {
326-
this.authenticationManager.onAuthError(serverMessage);
327-
break;
302+
if (querySetModification) {
303+
this.webSocketManager.sendMessage(querySetModification);
328304
}
329-
case "FatalError": {
330-
const error = logFatalError(serverMessage.error);
331-
void this.webSocketManager.terminate();
332-
throw error;
305+
for (const message of this.requestManager.resume()) {
306+
this.webSocketManager.sendMessage(message);
333307
}
334-
case "Ping":
335-
break; // do nothing
336-
default: {
337-
const _typeCheck: never = serverMessage;
308+
},
309+
onMessage: (serverMessage: ServerMessage) => {
310+
// Metrics events grow linearly with reconnection attempts so this
311+
// conditional prevents n^2 metrics reporting.
312+
if (!this.firstMessageReceived) {
313+
this.firstMessageReceived = true;
314+
this.mark("convexFirstMessageReceived");
315+
this.reportMarks();
316+
}
317+
switch (serverMessage.type) {
318+
case "Transition": {
319+
this.observedTimestamp(serverMessage.endVersion.ts);
320+
this.authenticationManager.onTransition(serverMessage);
321+
this.remoteQuerySet.transition(serverMessage);
322+
this.state.transition(serverMessage);
323+
const completedRequests = this.requestManager.removeCompleted(
324+
this.remoteQuerySet.timestamp(),
325+
);
326+
this.notifyOnQueryResultChanges(completedRequests);
327+
break;
328+
}
329+
case "MutationResponse": {
330+
if (serverMessage.success) {
331+
this.observedTimestamp(serverMessage.ts);
332+
}
333+
const completedMutationId =
334+
this.requestManager.onResponse(serverMessage);
335+
if (completedMutationId !== null) {
336+
this.notifyOnQueryResultChanges(new Set([completedMutationId]));
337+
}
338+
break;
339+
}
340+
case "ActionResponse": {
341+
this.requestManager.onResponse(serverMessage);
342+
break;
343+
}
344+
case "AuthError": {
345+
this.authenticationManager.onAuthError(serverMessage);
346+
break;
347+
}
348+
case "FatalError": {
349+
const error = logFatalError(serverMessage.error);
350+
void this.webSocketManager.terminate();
351+
throw error;
352+
}
353+
case "Ping":
354+
break; // do nothing
355+
default: {
356+
const _typeCheck: never = serverMessage;
357+
}
338358
}
339-
}
340359

341-
return {
342-
hasSyncedPastLastReconnect: this.hasSyncedPastLastReconnect(),
343-
};
360+
return {
361+
hasSyncedPastLastReconnect: this.hasSyncedPastLastReconnect(),
362+
};
363+
},
344364
},
345365
webSocketConstructor,
346366
this.verbose,

src/browser/sync/local_state.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,56 @@ export class LocalSyncState {
282282
return [querySet, authenticate];
283283
}
284284

285+
resume(
286+
remoteQueryResults: Set<QueryId>,
287+
): [QuerySetModification?, Authenticate?] {
288+
const localQueryIds = new Set();
289+
const modifications = [];
290+
for (const localQuery of this.querySet.values()) {
291+
localQueryIds.add(localQuery.id);
292+
293+
if (!remoteQueryResults.has(localQuery.id)) {
294+
const add: AddQuery = {
295+
type: "Add",
296+
queryId: localQuery.id,
297+
udfPath: localQuery.canonicalizedUdfPath,
298+
args: [convexToJson(localQuery.args)],
299+
journal: localQuery.journal,
300+
};
301+
modifications.push(add);
302+
}
303+
}
304+
305+
for (const remoteQueryId of remoteQueryResults) {
306+
if (!localQueryIds.has(remoteQueryId)) {
307+
const remove: RemoveQuery = {
308+
type: "Remove",
309+
queryId: remoteQueryId,
310+
};
311+
modifications.push(remove);
312+
}
313+
}
314+
315+
const querySet: QuerySetModification | undefined =
316+
modifications.length > 0
317+
? {
318+
type: "ModifyQuerySet",
319+
baseVersion: this.querySetVersion,
320+
newVersion: this.querySetVersion + 1,
321+
modifications,
322+
}
323+
: undefined;
324+
const authenticate: Authenticate | undefined =
325+
this.auth !== undefined
326+
? {
327+
type: "Authenticate",
328+
baseVersion: this.identityVersion,
329+
...this.auth,
330+
}
331+
: undefined;
332+
return [querySet, authenticate];
333+
}
334+
285335
private removeSubscriber(
286336
queryToken: QueryToken,
287337
): QuerySetModification | null {

src/browser/sync/request_manager.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,18 @@ export class RequestManager {
192192
return allMessages;
193193
}
194194

195+
resume(): ClientMessage[] {
196+
const allMessages = [];
197+
for (const [, value] of this.inflightRequests) {
198+
if (value.status.status === "NotSent") {
199+
value.status.status = "Requested";
200+
allMessages.push(value.message);
201+
continue;
202+
}
203+
}
204+
return allMessages;
205+
}
206+
195207
/**
196208
* @returns true if there are any requests that have been requested but have
197209
* not be completed yet.

0 commit comments

Comments
 (0)