From ff7d63ac3e0d920f9b68a564a08b848adae78862 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Mon, 8 Jul 2024 16:01:48 +0900 Subject: [PATCH] Handle retry for syncLoop and watchLoop (#863) This commit addresses the need to retry `syncLoop` and `watchLoop` functions based on different error codes from `ConnectError`. Connect guide indicates that for error codes like `ResourceExhausted` and `Unavailable`, retries should be attempted following guidelines. https://connectrpc.com/docs/protocol/#error-codes. Additionally, `Unknown` and `Canceled` are added separately as it typically occurs when the server is stopped. To enhance visibility into the status of `syncLoop` and `watchLoop`, `Conditions` has been added. --- public/index.html | 1 + src/client/client.ts | 81 ++++++++++++++++++++++++++++++--- src/yorkie.ts | 9 +++- test/integration/client_test.ts | 37 +++++++++++++++ 4 files changed, 120 insertions(+), 8 deletions(-) diff --git a/public/index.html b/public/index.html index 92a56179c..55cbcf815 100644 --- a/public/index.html +++ b/public/index.html @@ -316,6 +316,7 @@

const codemirror = CodeMirror.fromTextArea(textarea, { lineNumbers: true, }); + yorkie.setLogLevel(yorkie.LogLevel.Debug); devtool.setCodeMirror(codemirror); // 02-1. create client with RPCAddr. diff --git a/src/client/client.ts b/src/client/client.ts index 9ce085d49..347eae23a 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -89,6 +89,21 @@ export enum ClientStatus { Activated = 'activated', } +/** + * `ClientCondition` represents the condition of the client. + */ +export enum ClientCondition { + /** + * `SyncLoop` is a key of the sync loop condition. + */ + SyncLoop = 'SyncLoop', + + /** + * `WatchLoop` is a key of the watch loop condition. + */ + WatchLoop = 'WatchLoop', +} + /** * `ClientOptions` are user-settable options used when defining clients. * @@ -158,13 +173,14 @@ export class Client { private attachmentMap: Map>; private apiKey: string; + private conditions: Record; private syncLoopDuration: number; private reconnectStreamDelay: number; private retrySyncLoopDelay: number; private rpcClient: PromiseClient; private taskQueue: Array<() => Promise>; - private processing: boolean = false; + private processing = false; /** * @param rpcAddr - the address of the RPC server. @@ -179,6 +195,10 @@ export class Client { // TODO(hackerwins): Consider to group the options as a single object. this.apiKey = opts.apiKey || ''; + this.conditions = { + [ClientCondition.SyncLoop]: false, + [ClientCondition.WatchLoop]: false, + }; this.syncLoopDuration = opts.syncLoopDuration || DefaultClientOptions.syncLoopDuration; this.reconnectStreamDelay = @@ -535,10 +555,22 @@ export class Client { return this.status; } + /** + * `getCondition` returns the condition of this client. + */ + public getCondition(condition: ClientCondition): boolean { + return this.conditions[condition]; + } + + /** + * `runSyncLoop` runs the sync loop. The sync loop pushes local changes to + * the server and pulls remote changes from the server. + */ private runSyncLoop(): void { const doLoop = (): void => { if (!this.isActive()) { logger.debug(`[SL] c:"${this.getKey()}" exit sync loop`); + this.conditions[ClientCondition.SyncLoop] = false; return; } @@ -554,14 +586,24 @@ export class Client { .then(() => setTimeout(doLoop, this.syncLoopDuration)) .catch((err) => { logger.error(`[SL] c:"${this.getKey()}" sync failed:`, err); - setTimeout(doLoop, this.retrySyncLoopDelay); + + if (this.isRetryableConnectError(err)) { + setTimeout(doLoop, this.retrySyncLoopDelay); + } else { + this.conditions[ClientCondition.SyncLoop] = false; + } }); }; logger.debug(`[SL] c:"${this.getKey()}" run sync loop`); + this.conditions[ClientCondition.SyncLoop] = true; doLoop(); } + /** + * `runWatchLoop` runs the watch loop for the given document. The watch loop + * listens to the events of the given document from the server. + */ private async runWatchLoop(docKey: DocumentKey): Promise { const attachment = this.attachmentMap.get(docKey); if (!attachment) { @@ -571,9 +613,11 @@ export class Client { ); } + this.conditions[ClientCondition.WatchLoop] = true; return attachment.runWatchLoop( (onDisconnect: () => void): Promise<[WatchStream, AbortController]> => { if (!this.isActive()) { + this.conditions[ClientCondition.WatchLoop] = false; return Promise.reject( new YorkieError(Code.ClientNotActive, `${this.key} is not active`), ); @@ -628,11 +672,10 @@ export class Client { ]); logger.debug(`[WD] c:"${this.getKey()}" unwatches`); - if ( - err instanceof ConnectError && - err.code != ConnectErrorCode.Canceled - ) { + if (this.isRetryableConnectError(err)) { onDisconnect(); + } else { + this.conditions[ClientCondition.WatchLoop] = false; } reject(err); @@ -740,6 +783,32 @@ export class Client { }); } + /** + * `isRetryableConnectError` returns whether the given error is a retryable error. + */ + private isRetryableConnectError(err: any): boolean { + if (!(err instanceof ConnectError)) { + return false; + } + + // NOTE(hackerwins): These errors are retryable. + // Connect guide indicates that for error codes like `ResourceExhausted` and + // `Unavailable`, retries should be attempted following their guidelines. + // Additionally, `Unknown` and `Canceled` are added separately as it + // typically occurs when the server is stopped. + const retryables = [ + ConnectErrorCode.Canceled, + ConnectErrorCode.Unknown, + ConnectErrorCode.ResourceExhausted, + ConnectErrorCode.Unavailable, + ]; + + // TODO(hackerwins): We need to handle more cases. + // - FailedPrecondition: If the client fixes its state, it is retryable. + // - Unauthenticated: The client is not authenticated. It is retryable after reauthentication. + return retryables.includes(err.code); + } + /** * `enqueueTask` enqueues the given task to the task queue. */ diff --git a/src/yorkie.ts b/src/yorkie.ts index 13f015b7c..8c9fda87f 100644 --- a/src/yorkie.ts +++ b/src/yorkie.ts @@ -25,6 +25,7 @@ import * as Devtools from '@yorkie-js-sdk/src/devtools/types'; export { Client, ClientStatus, + ClientCondition, SyncMode, type ClientOptions, } from '@yorkie-js-sdk/src/client/client'; @@ -112,8 +113,8 @@ export { export { Change } from '@yorkie-js-sdk/src/document/change/change'; export { converter } from '@yorkie-js-sdk/src/api/converter'; -export type { LogLevel } from '@yorkie-js-sdk/src/util/logger'; -export { setLogLevel } from '@yorkie-js-sdk/src/util/logger'; +import { LogLevel, setLogLevel } from '@yorkie-js-sdk/src/util/logger'; +export { LogLevel, setLogLevel } from '@yorkie-js-sdk/src/util/logger'; export { EventSourceDevPanel, @@ -142,6 +143,8 @@ export default { Text, Counter, Tree, + LogLevel, + setLogLevel, IntType: CounterType.IntegerCnt, LongType: CounterType.LongCnt, }; @@ -155,6 +158,8 @@ if (typeof globalThis !== 'undefined') { Text, Counter, Tree, + LogLevel, + setLogLevel, IntType: CounterType.IntegerCnt, LongType: CounterType.LongCnt, }; diff --git a/test/integration/client_test.ts b/test/integration/client_test.ts index 670206c27..cc0f8a004 100644 --- a/test/integration/client_test.ts +++ b/test/integration/client_test.ts @@ -3,6 +3,7 @@ import { describe, it, assert, vi, afterEach, expect } from 'vitest'; import yorkie, { Counter, SyncMode, + ClientCondition, DocEventType, DocumentSyncStatus, Tree, @@ -14,6 +15,7 @@ import { testRPCAddr, withTwoClientsAndDocuments, } from '@yorkie-js-sdk/test/integration/integration_helper'; +import { ConnectError, Code } from '@connectrpc/connect'; describe.sequential('Client', function () { afterEach(() => { @@ -763,4 +765,39 @@ describe.sequential('Client', function () { await cli.deactivate(); } }); + + it('Should retry on network failure and eventually succeed', async ({ + task, + }) => { + const cli = new yorkie.Client(testRPCAddr, { + retrySyncLoopDelay: 10, + }); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + // 01. Simulate Unknown error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', Code.Unknown); + }); + + doc.update((root) => { + root.t = new Text(); + root.t.edit(0, 0, 'a'); + }); + + await new Promise((res) => setTimeout(res, 30)); + assert.isTrue(cli.getCondition(ClientCondition.SyncLoop)); + + // 02. Simulate FailedPrecondition error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', Code.FailedPrecondition); + }); + + await new Promise((res) => setTimeout(res, 30)); + assert.isFalse(cli.getCondition(ClientCondition.SyncLoop)); + + vi.unstubAllGlobals(); + }); });