diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 12656f396..8f430bedf 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -155,38 +155,6 @@ export interface ClientOptions { * default value is `1000`(ms). */ reconnectStreamDelay?: number; - - // retry 로직은 retryable 인 status code에 대해서 재시도 함 - // webhook 서버에서 잘못된 토큰을 발행하면 token refresh를 무한 반복하는 문제가 발생 됨 - // 이 무한 반복되는 문제를 해결하기 위해서 최대 제한 수를 제한함 - - // Q) retry 로직은 어디에 적용해야하는가? - // - client.activate 같은 함수에서 토큰 에러가 날 때 retry하고 있음... - // error retryable? = client.activate(); // 에러 발생 - // client.attach(doc); - // - client.sync(doc); - // background routine에 의한 싱크는 이벤트 핸들러로 알려줘야함 - - // ``` - // try { - // const doc = await client.attach(doc); // token refresh - // } catch(e) { - // await client.attach(doc); - // } - // ``` - - /** - * `retryRequestDelay` defines the waiting time between retry attempts. - * The default value is `1000`(ms). - */ - retryRequestDelay?: number; - - /** - * `maxRequestRetries` limits the maximum number of retry attempts for requests - * when a connectRPC error occurs and requires a retry. The default value is 3. - * This value must be greater than 0 to enable retry requests after token refresh. - */ - maxRequestRetries?: number; } /** @@ -196,8 +164,6 @@ const DefaultClientOptions = { syncLoopDuration: 50, retrySyncLoopDelay: 1000, reconnectStreamDelay: 1000, - retryRequestDelay: 1000, - maxRequestRetries: 3, }; /** @@ -209,8 +175,6 @@ const DefaultBroadcastOptions = { maxBackoff: 20000, }; -const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - /** * `Client` is a normal client that can communicate with the server. * It has documents and sends changes of the documents in local @@ -230,11 +194,8 @@ export class Client { private syncLoopDuration: number; private reconnectStreamDelay: number; private retrySyncLoopDelay: number; - private retryRequestDelay: number; - private maxRequestRetries: number; private rpcClient: PromiseClient; - private rpcAddr: string; private setAuthToken: (token: string) => void; private taskQueue: Array<() => Promise>; private processing = false; @@ -263,11 +224,6 @@ export class Client { opts.reconnectStreamDelay ?? DefaultClientOptions.reconnectStreamDelay; this.retrySyncLoopDelay = opts.retrySyncLoopDelay ?? DefaultClientOptions.retrySyncLoopDelay; - this.retryRequestDelay = - opts.retryRequestDelay ?? DefaultClientOptions.retryRequestDelay; - this.maxRequestRetries = - opts.maxRequestRetries ?? DefaultClientOptions.maxRequestRetries; - this.rpcAddr = rpcAddr; const { authInterceptor, setToken } = createAuthInterceptor(this.apiKey); this.setAuthToken = setToken; @@ -277,7 +233,7 @@ export class Client { this.rpcClient = createPromiseClient( YorkieService, createGrpcWebTransport({ - baseUrl: this.rpcAddr, + baseUrl: rpcAddr, interceptors: [authInterceptor, createMetricInterceptor()], }), ); @@ -300,38 +256,23 @@ export class Client { } return this.enqueueTask(async () => { - const requestActivateClient = async (retryCount = 0): Promise => { - return this.rpcClient - .activateClient( - { clientKey: this.key }, - { headers: { 'x-shard-key': this.apiKey } }, - ) - .then((res) => { - this.id = res.clientId; - this.status = ClientStatus.Activated; - this.runSyncLoop(); - logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}`); - }) - .catch(async (err) => { - if (retryCount >= this.maxRequestRetries) { - logger.error( - `[AC] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded`, - ); - throw err; - } - - if (await this.handleConnectError(err)) { - await delay(this.retryRequestDelay); - return requestActivateClient(retryCount + 1); - } + return this.rpcClient + .activateClient( + { clientKey: this.key }, + { headers: { 'x-shard-key': this.apiKey } }, + ) + .then((res) => { + this.id = res.clientId; + this.status = ClientStatus.Activated; + this.runSyncLoop(); - logger.error(`[AC] c:"${this.getKey()}" err :`, err); - throw err; - }); - }; - return requestActivateClient(); + logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`); + }) + .catch(async (err) => { + logger.error(`[AC] c:"${this.getKey()}" err :`, err); + await this.handleConnectError(err); + throw err; + }); }); } @@ -344,36 +285,20 @@ export class Client { } return this.enqueueTask(async () => { - const requestDeactivateClient = async (retryCount = 0): Promise => { - return this.rpcClient - .deactivateClient( - { clientId: this.id! }, - { headers: { 'x-shard-key': this.apiKey } }, - ) - .then(() => { - this.deactivateInternal(); - logger.info(`[DC] c"${this.getKey()}" deactivated`); - }) - .catch(async (err) => { - if (retryCount >= this.maxRequestRetries) { - logger.error( - `[DC] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded, id:"${this.id}`, - ); - throw err; - } - - if (await this.handleConnectError(err)) { - await delay(this.retryRequestDelay); - return requestDeactivateClient(retryCount + 1); - } - - logger.error(`[DC] c:"${this.getKey()}" err :`, err); - throw err; - }); - }; - return requestDeactivateClient(); + return this.rpcClient + .deactivateClient( + { clientId: this.id! }, + { headers: { 'x-shard-key': this.apiKey } }, + ) + .then(() => { + this.deactivateInternal(); + logger.info(`[DC] c"${this.getKey()}" deactivated`); + }) + .catch(async (err) => { + logger.error(`[DC] c:"${this.getKey()}" err :`, err); + await this.handleConnectError(err); + throw err; + }); }); } @@ -422,93 +347,59 @@ export class Client { const syncMode = options.syncMode ?? SyncMode.Realtime; return this.enqueueTask(async () => { - const requestAttachDocument = async ( - retryCount = 0, - ): Promise> => { - return this.rpcClient - .attachDocument( - { - clientId: this.id!, - changePack: converter.toChangePack(doc.createChangePack()), - }, - { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, - ) - .then(async (res) => { - const pack = converter.fromChangePack

(res.changePack!); - doc.applyChangePack(pack); - if (doc.getStatus() === DocumentStatus.Removed) { - return doc; - } + return this.rpcClient + .attachDocument( + { + clientId: this.id!, + changePack: converter.toChangePack(doc.createChangePack()), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then(async (res) => { + const pack = converter.fromChangePack

(res.changePack!); + doc.applyChangePack(pack); + if (doc.getStatus() === DocumentStatus.Removed) { + return doc; + } - doc.applyStatus(DocumentStatus.Attached); - this.attachmentMap.set( - doc.getKey(), - new Attachment( - this.reconnectStreamDelay, - doc, - res.documentId, - syncMode, - unsubscribeBroacastEvent, - ), - ); + doc.applyStatus(DocumentStatus.Attached); + this.attachmentMap.set( + doc.getKey(), + new Attachment( + this.reconnectStreamDelay, + doc, + res.documentId, + syncMode, + unsubscribeBroacastEvent, + ), + ); - if (syncMode !== SyncMode.Manual) { - await this.runWatchLoop(doc.getKey()); - } + if (syncMode !== SyncMode.Manual) { + await this.runWatchLoop(doc.getKey()); + } - logger.info( - `[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`, - ); + logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`); - const crdtObject = doc.getRootObject(); - if (options.initialRoot) { - const initialRoot = options.initialRoot; - doc.update((root) => { - for (const [k, v] of Object.entries(initialRoot)) { - if (!crdtObject.has(k)) { - const key = k as keyof T; - root[key] = v as any; - } + const crdtObject = doc.getRootObject(); + if (options.initialRoot) { + const initialRoot = options.initialRoot; + doc.update((root) => { + for (const [k, v] of Object.entries(initialRoot)) { + if (!crdtObject.has(k)) { + const key = k as keyof T; + root[key] = v as any; } - }); - } - - return doc; - }) - .catch(async (err) => { - if (retryCount >= this.maxRequestRetries) { - logger.error( - `[AD] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded, id:"${this.id}`, - ); - throw err; - } - - if (await this.handleConnectError(err)) { - if ( - err instanceof ConnectError && - errorCodeOf(err) === Code.ErrUnauthenticated - ) { - doc.publish([ - { - type: DocEventType.AuthError, - value: { - reason: errorMetadataOf(err).reason, - method: 'AttachDocument', - }, - }, - ]); } - await delay(this.retryRequestDelay); - return requestAttachDocument(retryCount + 1); - } + }); + } - logger.error(`[AD] c:"${this.getKey()}" err :`, err); - throw err; - }); - }; - return requestAttachDocument(); + return doc; + }) + .catch(async (err) => { + logger.error(`[AD] c:"${this.getKey()}" err :`, err); + await this.handleConnectError(err); + throw err; + }); }); } @@ -542,66 +433,32 @@ export class Client { doc.update((_, p) => p.clear()); return this.enqueueTask(async () => { - const requestDetachDocument = async ( - retryCount = 0, - ): Promise> => { - return this.rpcClient - .detachDocument( - { - clientId: this.id!, - documentId: attachment.docID, - changePack: converter.toChangePack(doc.createChangePack()), - removeIfNotAttached: options.removeIfNotAttached ?? false, - }, - { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, - ) - .then((res) => { - const pack = converter.fromChangePack

(res.changePack!); - doc.applyChangePack(pack); - if (doc.getStatus() !== DocumentStatus.Removed) { - doc.applyStatus(DocumentStatus.Detached); - } - this.detachInternal(doc.getKey()); - - logger.info( - `[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`, - ); - return doc; - }) - .catch(async (err) => { - if (retryCount >= this.maxRequestRetries) { - logger.error( - `[DD] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded, id:"${this.id}`, - ); - throw err; - } - - if (await this.handleConnectError(err)) { - if ( - err instanceof ConnectError && - errorCodeOf(err) === Code.ErrUnauthenticated - ) { - doc.publish([ - { - type: DocEventType.AuthError, - value: { - reason: errorMetadataOf(err).reason, - method: 'DetachDocument', - }, - }, - ]); - } - await delay(this.retryRequestDelay); - return requestDetachDocument(retryCount + 1); - } + return this.rpcClient + .detachDocument( + { + clientId: this.id!, + documentId: attachment.docID, + changePack: converter.toChangePack(doc.createChangePack()), + removeIfNotAttached: options.removeIfNotAttached ?? false, + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then((res) => { + const pack = converter.fromChangePack

(res.changePack!); + doc.applyChangePack(pack); + if (doc.getStatus() !== DocumentStatus.Removed) { + doc.applyStatus(DocumentStatus.Detached); + } + this.detachInternal(doc.getKey()); - logger.error(`[DD] c:"${this.getKey()}" err :`, err); - throw err; - }); - }; - return requestDetachDocument(); + logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`); + return doc; + }) + .catch(async (err) => { + logger.error(`[DD] c:"${this.getKey()}" err :`, err); + await this.handleConnectError(err); + throw err; + }); }); } @@ -680,11 +537,13 @@ export class Client { ); } return this.enqueueTask(async () => { - return this.syncInternal(attachment, SyncMode.Realtime).catch((err) => { - logger.error(`[SY] c:"${this.getKey()}" err :`, err); - this.handleConnectError(err); - throw err; - }); + return this.syncInternal(attachment, SyncMode.Realtime).catch( + async (err) => { + logger.error(`[SY] c:"${this.getKey()}" err :`, err); + await this.handleConnectError(err); + throw err; + }, + ); }); } @@ -693,9 +552,9 @@ export class Client { for (const [, attachment] of this.attachmentMap) { promises.push(this.syncInternal(attachment, attachment.syncMode)); } - return Promise.all(promises).catch((err) => { + return Promise.all(promises).catch(async (err) => { logger.error(`[SY] c:"${this.getKey()}" err :`, err); - this.handleConnectError(err); + await this.handleConnectError(err); throw err; }); }); @@ -740,9 +599,9 @@ export class Client { logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`); }) - .catch((err) => { + .catch(async (err) => { logger.error(`[RD] c:"${this.getKey()}" err :`, err); - this.handleConnectError(err); + await this.handleConnectError(err); throw err; }); }); @@ -888,7 +747,27 @@ export class Client { for (const [, attachment] of this.attachmentMap) { if (attachment.needRealtimeSync()) { attachment.remoteChangeEventReceived = false; - syncJobs.push(this.syncInternal(attachment, attachment.syncMode)); + syncJobs.push( + this.syncInternal(attachment, attachment.syncMode).catch( + async (err) => { + if ( + err instanceof ConnectError && + errorCodeOf(err) === Code.ErrUnauthenticated + ) { + attachment.doc.publish([ + { + type: DocEventType.AuthError, + value: { + reason: errorMetadataOf(err).reason, + method: 'PushPull', + }, + }, + ]); + } + throw err; + }, + ), + ); } } @@ -896,7 +775,6 @@ export class Client { .then(() => setTimeout(doLoop, this.syncLoopDuration)) .catch(async (err) => { logger.error(`[SL] c:"${this.getKey()}" sync failed:`, err); - if (await this.handleConnectError(err)) { setTimeout(doLoop, this.retrySyncLoopDelay); } else { @@ -924,7 +802,6 @@ export class Client { } this.conditions[ClientCondition.WatchLoop] = true; - let retryCount = 0; return attachment.runWatchLoop( (onDisconnect: () => void): Promise<[WatchStream, AbortController]> => { if (!this.isActive()) { @@ -991,15 +868,6 @@ export class Client { err instanceof ConnectError && errorCodeOf(err) === Code.ErrUnauthenticated ) { - if (retryCount >= this.maxRequestRetries) { - logger.error( - `[WD] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded`, - ); - reject(err); - return; - } attachment.doc.publish([ { type: DocEventType.AuthError, @@ -1011,7 +879,6 @@ export class Client { ]); } onDisconnect(); - retryCount++; } else { this.conditions[ClientCondition.WatchLoop] = false; } @@ -1072,100 +939,61 @@ export class Client { const { doc, docID } = attachment; const reqPack = doc.createChangePack(); - const requestPushPullChanges = async ( - retryCount = 0, - ): Promise> => { - return this.rpcClient - .pushPullChanges( - { - clientId: this.id!, - documentId: docID, - changePack: converter.toChangePack(reqPack), - pushOnly: syncMode === SyncMode.RealtimePushOnly, - }, - { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, - ) - .then((res) => { - const respPack = converter.fromChangePack

(res.changePack!); - - // NOTE(chacha912, hackerwins): If syncLoop already executed with - // PushPull, ignore the response when the syncMode is PushOnly. - if ( - respPack.hasChanges() && - (attachment.syncMode === SyncMode.RealtimePushOnly || - attachment.syncMode === SyncMode.RealtimeSyncOff) - ) { - return doc; - } - - doc.applyChangePack(respPack); - attachment.doc.publish([ - { - type: DocEventType.SyncStatusChanged, - value: DocumentSyncStatus.Synced, - }, - ]); - // NOTE(chacha912): If a document has been removed, watchStream should - // be disconnected to not receive an event for that document. - if (doc.getStatus() === DocumentStatus.Removed) { - this.detachInternal(doc.getKey()); - } - - const docKey = doc.getKey(); - const remoteSize = respPack.getChangeSize(); - logger.info( - `[PP] c:"${this.getKey()}" sync d:"${docKey}", push:${reqPack.getChangeSize()} pull:${remoteSize} cp:${respPack - .getCheckpoint() - .toTestString()}`, - ); + return this.rpcClient + .pushPullChanges( + { + clientId: this.id!, + documentId: docID, + changePack: converter.toChangePack(reqPack), + pushOnly: syncMode === SyncMode.RealtimePushOnly, + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then((res) => { + const respPack = converter.fromChangePack

(res.changePack!); + + // NOTE(chacha912, hackerwins): If syncLoop already executed with + // PushPull, ignore the response when the syncMode is PushOnly. + if ( + respPack.hasChanges() && + (attachment.syncMode === SyncMode.RealtimePushOnly || + attachment.syncMode === SyncMode.RealtimeSyncOff) + ) { return doc; - }) - .catch(async (err) => { - if (retryCount >= this.maxRequestRetries) { - doc.publish([ - { - type: DocEventType.SyncStatusChanged, - value: DocumentSyncStatus.SyncFailed, - }, - ]); - logger.error( - `[PP] c:"${this.getKey()}" max retries (${ - this.maxRequestRetries - }) exceeded`, - ); - throw err; - } + } - if (await this.handleConnectError(err)) { - if ( - err instanceof ConnectError && - errorCodeOf(err) === Code.ErrUnauthenticated - ) { - doc.publish([ - { - type: DocEventType.AuthError, - value: { - reason: errorMetadataOf(err).reason, - method: 'PushPull', - }, - }, - ]); - await delay(this.retryRequestDelay); - return requestPushPullChanges(retryCount + 1); - } - } + doc.applyChangePack(respPack); + attachment.doc.publish([ + { + type: DocEventType.SyncStatusChanged, + value: DocumentSyncStatus.Synced, + }, + ]); + // NOTE(chacha912): If a document has been removed, watchStream should + // be disconnected to not receive an event for that document. + if (doc.getStatus() === DocumentStatus.Removed) { + this.detachInternal(doc.getKey()); + } - doc.publish([ - { - type: DocEventType.SyncStatusChanged, - value: DocumentSyncStatus.SyncFailed, - }, - ]); - logger.error(`[PP] c:"${this.getKey()}" err :`, err); - throw err; - }); - }; - return requestPushPullChanges(); + const docKey = doc.getKey(); + const remoteSize = respPack.getChangeSize(); + logger.info( + `[PP] c:"${this.getKey()}" sync d:"${docKey}", push:${reqPack.getChangeSize()} pull:${remoteSize} cp:${respPack + .getCheckpoint() + .toTestString()}`, + ); + return doc; + }) + .catch(async (err) => { + doc.publish([ + { + type: DocEventType.SyncStatusChanged, + value: DocumentSyncStatus.SyncFailed, + }, + ]); + logger.error(`[PP] c:"${this.getKey()}" err :`, err); + throw err; + }); } /** @@ -1195,11 +1023,9 @@ export class Client { // token is invalid or expired. In this case, the client gets a new token // from the `authTokenInjector` and retries the api call. if (errorCodeOf(err) === Code.ErrUnauthenticated) { - const token = - this.authTokenInjector && - (await this.authTokenInjector(errorMetadataOf(err).reason)); - if (token) { - this.setAuthToken?.(token); + if (this.authTokenInjector) { + const token = await this.authTokenInjector(errorMetadataOf(err).reason); + this.setAuthToken(token); } return true; } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 74ee95de7..7de1150e4 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -201,7 +201,7 @@ export enum DocEventType { LocalBroadcast = 'local-broadcast', /** - * `AuthError` means that the authentification error occurs. + * `AuthError` indicates an authorization failure in syncLoop or watchLoop. */ AuthError = 'auth-error', } @@ -425,7 +425,7 @@ export interface AuthErrorEvent extends BaseDocEvent { type: DocEventType.AuthError; value: { reason: string; - method: 'AttachDocument' | 'DetachDocument' | 'PushPull' | 'WatchDocuments'; + method: 'PushPull' | 'WatchDocuments'; }; } diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 5c7f04db5..ce9ac6cf7 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -166,12 +166,8 @@ describe.sequential('Client', function () { it('Can recover from temporary disconnect (realtime sync)', async function ({ task, }) { - const c1 = new yorkie.Client(testRPCAddr, { - retryRequestDelay: 0, - }); - const c2 = new yorkie.Client(testRPCAddr, { - retryRequestDelay: 0, - }); + const c1 = new yorkie.Client(testRPCAddr); + const c2 = new yorkie.Client(testRPCAddr); await c1.activate(); await c2.activate(); diff --git a/packages/sdk/test/integration/integration_helper.ts b/packages/sdk/test/integration/integration_helper.ts index 677c57874..0333ab1aa 100644 --- a/packages/sdk/test/integration/integration_helper.ts +++ b/packages/sdk/test/integration/integration_helper.ts @@ -47,12 +47,8 @@ export async function withTwoClientsAndDocuments< title: string, syncMode: SyncMode = SyncMode.Manual, ): Promise { - const client1 = new yorkie.Client(testRPCAddr, { - retryRequestDelay: 0, - }); - const client2 = new yorkie.Client(testRPCAddr, { - retryRequestDelay: 0, - }); + const client1 = new yorkie.Client(testRPCAddr); + const client2 = new yorkie.Client(testRPCAddr); await client1.activate(); await client2.activate(); diff --git a/packages/sdk/test/integration/webhook_test.ts b/packages/sdk/test/integration/webhook_test.ts index a4d20af5e..e42291df7 100644 --- a/packages/sdk/test/integration/webhook_test.ts +++ b/packages/sdk/test/integration/webhook_test.ts @@ -155,9 +155,7 @@ describe('Auth Webhook', () => { // client without token const cliWithoutToken = new yorkie.Client(testRPCAddr, { apiKey, - retryRequestDelay: 0, }); - // fail after 3 retries await assertThrowsAsync( async () => { await cliWithoutToken.activate(); @@ -175,9 +173,7 @@ describe('Auth Webhook', () => { const cliWithInvalidToken = new yorkie.Client(testRPCAddr, { apiKey, authTokenInjector: invalidTokenInjector, - retryRequestDelay: 0, }); - // fail after 3 retries await assertThrowsAsync( async () => { await cliWithInvalidToken.activate(); @@ -208,12 +204,15 @@ describe('Auth Webhook', () => { expect(notAllowedTokenInjector).nthCalledWith(1); }); - it('should refresh token and retry methods when unauthenticated error occurs (in manual sync)', async ({ + it('should refresh token when unauthenticated error occurs (in manual sync)', async ({ task, }) => { const TokenExpirationMs = 500; const authTokenInjector = vi.fn(async (reason) => { - if (reason === ExpiredTokenErrorMessage) { + if ( + reason === ExpiredTokenErrorMessage || + authTokenInjector.mock.calls.length === 3 + ) { return `token-${Date.now() + TokenExpirationMs}`; } return `token-${Date.now() - TokenExpirationMs}`; // token expired @@ -222,64 +221,82 @@ describe('Auth Webhook', () => { const client = new yorkie.Client(testRPCAddr, { apiKey, authTokenInjector, - retryRequestDelay: 0, }); - // retry activate - await client.activate(); + await assertThrowsAsync( + async () => { + await client.activate(); + }, + ConnectError, + /^\[unauthenticated\]/i, + ); expect(authTokenInjector).toBeCalledTimes(2); expect(authTokenInjector).nthCalledWith(1); expect(authTokenInjector).nthCalledWith(2, ExpiredTokenErrorMessage); + // retry activate + await client.activate(); + expect(authTokenInjector).nthCalledWith(3); + expect(authTokenInjector).nthCalledWith(3); const doc = new yorkie.Document<{ k1: string }>( toDocKey(`${task.name}-${new Date().getTime()}`), ); - const authErrorEventCollector = new EventCollector<{ - reason: string; - method: string; - }>(); - doc.subscribe('auth-error', (event) => { - authErrorEventCollector.add(event.value); - }); - // retry attach await new Promise((res) => setTimeout(res, TokenExpirationMs)); + await assertThrowsAsync( + async () => { + await client.attach(doc, { syncMode: SyncMode.Manual }); + }, + ConnectError, + /^\[unauthenticated\]/i, + ); + expect(authTokenInjector).toBeCalledTimes(4); + expect(authTokenInjector).nthCalledWith(4, ExpiredTokenErrorMessage); + // retry attach await client.attach(doc, { syncMode: SyncMode.Manual }); - expect(authTokenInjector).toBeCalledTimes(3); - expect(authTokenInjector).nthCalledWith(3, ExpiredTokenErrorMessage); - await authErrorEventCollector.waitAndVerifyNthEvent(1, { - reason: ExpiredTokenErrorMessage, - method: 'AttachDocument', - }); - // retry sync in manual mode - await new Promise((res) => setTimeout(res, TokenExpirationMs)); doc.update((root) => { root.k1 = 'v1'; }); - await client.sync(doc); - expect(authTokenInjector).toBeCalledTimes(4); - expect(authTokenInjector).nthCalledWith(4, ExpiredTokenErrorMessage); - await authErrorEventCollector.waitAndVerifyNthEvent(2, { - reason: ExpiredTokenErrorMessage, - method: 'PushPull', - }); - // retry detach await new Promise((res) => setTimeout(res, TokenExpirationMs)); - await client.detach(doc); + await assertThrowsAsync( + async () => { + await client.sync(doc); + }, + ConnectError, + /^\[unauthenticated\]/i, + ); expect(authTokenInjector).toBeCalledTimes(5); expect(authTokenInjector).nthCalledWith(5, ExpiredTokenErrorMessage); - await authErrorEventCollector.waitAndVerifyNthEvent(3, { - reason: ExpiredTokenErrorMessage, - method: 'DetachDocument', - }); + // retry sync in manual mode + await client.sync(doc); - // retry deactivate await new Promise((res) => setTimeout(res, TokenExpirationMs)); - await client.deactivate(); + await assertThrowsAsync( + async () => { + await client.detach(doc); + }, + ConnectError, + /^\[unauthenticated\]/i, + ); expect(authTokenInjector).toBeCalledTimes(6); expect(authTokenInjector).nthCalledWith(6, ExpiredTokenErrorMessage); + // retry detach + await client.detach(doc); + + await new Promise((res) => setTimeout(res, TokenExpirationMs)); + await assertThrowsAsync( + async () => { + await client.deactivate(); + }, + ConnectError, + /^\[unauthenticated\]/i, + ); + expect(authTokenInjector).toBeCalledTimes(7); + expect(authTokenInjector).nthCalledWith(7, ExpiredTokenErrorMessage); + // retry deactivate + await client.deactivate(); }); it('should refresh token and retry realtime sync', async ({ task }) => { @@ -320,7 +337,7 @@ describe('Auth Webhook', () => { const client = new yorkie.Client(testRPCAddr, { apiKey, authTokenInjector, - retryRequestDelay: 0, + retrySyncLoopDelay: 100, }); await client.activate(); @@ -387,7 +404,7 @@ describe('Auth Webhook', () => { }, ); - const TokenExpirationMs = 2000; + const TokenExpirationMs = 500; const authTokenInjector = vi.fn(async (reason) => { if (reason === ExpiredTokenErrorMessage) { return `token-${Date.now() + TokenExpirationMs}`; @@ -398,7 +415,7 @@ describe('Auth Webhook', () => { const client = new yorkie.Client(testRPCAddr, { apiKey, authTokenInjector, - retryRequestDelay: 0, + reconnectStreamDelay: 100, }); await client.activate(); @@ -419,7 +436,6 @@ describe('Auth Webhook', () => { authTokenInjector: async () => { return `token-${Date.now() + 1000 * 60 * 60}`; // expire in 1 hour }, - retryRequestDelay: 0, }); await client2.activate(); const doc2 = new yorkie.Document<{ k1: string }>(docKey);