Skip to content

Commit

Permalink
Handle retry for syncLoop and watchLoop (#863)
Browse files Browse the repository at this point in the history
This commit addresses the need to retry `syncLoop` and `watchLoop`
functions based on different error codes from `ConnectError`.

Connect guide indicates that for error codes like `ResourceExhausted`
and `Unavailable`, retries should be attempted following guidelines.
https://connectrpc.com/docs/protocol/#error-codes.

Additionally, `Unknown` and `Canceled` are added separately as it
typically occurs when the server is stopped.

To enhance visibility into the status of `syncLoop` and `watchLoop`,
`Conditions` has been added.
  • Loading branch information
hackerwins authored Jul 8, 2024
1 parent 4f7dcce commit ff7d63a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 8 deletions.
1 change: 1 addition & 0 deletions public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ <h4 class="title">
const codemirror = CodeMirror.fromTextArea(textarea, {
lineNumbers: true,
});
yorkie.setLogLevel(yorkie.LogLevel.Debug);
devtool.setCodeMirror(codemirror);

// 02-1. create client with RPCAddr.
Expand Down
81 changes: 75 additions & 6 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ export enum ClientStatus {
Activated = 'activated',
}

/**
* `ClientCondition` represents the condition of the client.
*/
export enum ClientCondition {
/**
* `SyncLoop` is a key of the sync loop condition.
*/
SyncLoop = 'SyncLoop',

/**
* `WatchLoop` is a key of the watch loop condition.
*/
WatchLoop = 'WatchLoop',
}

/**
* `ClientOptions` are user-settable options used when defining clients.
*
Expand Down Expand Up @@ -158,13 +173,14 @@ export class Client {
private attachmentMap: Map<DocumentKey, Attachment<unknown, any>>;

private apiKey: string;
private conditions: Record<ClientCondition, boolean>;
private syncLoopDuration: number;
private reconnectStreamDelay: number;
private retrySyncLoopDelay: number;

private rpcClient: PromiseClient<typeof YorkieService>;
private taskQueue: Array<() => Promise<any>>;
private processing: boolean = false;
private processing = false;

/**
* @param rpcAddr - the address of the RPC server.
Expand All @@ -179,6 +195,10 @@ export class Client {

// TODO(hackerwins): Consider to group the options as a single object.
this.apiKey = opts.apiKey || '';
this.conditions = {
[ClientCondition.SyncLoop]: false,
[ClientCondition.WatchLoop]: false,
};
this.syncLoopDuration =
opts.syncLoopDuration || DefaultClientOptions.syncLoopDuration;
this.reconnectStreamDelay =
Expand Down Expand Up @@ -535,10 +555,22 @@ export class Client {
return this.status;
}

/**
* `getCondition` returns the condition of this client.
*/
public getCondition(condition: ClientCondition): boolean {
return this.conditions[condition];
}

/**
* `runSyncLoop` runs the sync loop. The sync loop pushes local changes to
* the server and pulls remote changes from the server.
*/
private runSyncLoop(): void {
const doLoop = (): void => {
if (!this.isActive()) {
logger.debug(`[SL] c:"${this.getKey()}" exit sync loop`);
this.conditions[ClientCondition.SyncLoop] = false;
return;
}

Expand All @@ -554,14 +586,24 @@ export class Client {
.then(() => setTimeout(doLoop, this.syncLoopDuration))
.catch((err) => {
logger.error(`[SL] c:"${this.getKey()}" sync failed:`, err);
setTimeout(doLoop, this.retrySyncLoopDelay);

if (this.isRetryableConnectError(err)) {
setTimeout(doLoop, this.retrySyncLoopDelay);
} else {
this.conditions[ClientCondition.SyncLoop] = false;
}
});
};

logger.debug(`[SL] c:"${this.getKey()}" run sync loop`);
this.conditions[ClientCondition.SyncLoop] = true;
doLoop();
}

/**
* `runWatchLoop` runs the watch loop for the given document. The watch loop
* listens to the events of the given document from the server.
*/
private async runWatchLoop(docKey: DocumentKey): Promise<void> {
const attachment = this.attachmentMap.get(docKey);
if (!attachment) {
Expand All @@ -571,9 +613,11 @@ export class Client {
);
}

this.conditions[ClientCondition.WatchLoop] = true;
return attachment.runWatchLoop(
(onDisconnect: () => void): Promise<[WatchStream, AbortController]> => {
if (!this.isActive()) {
this.conditions[ClientCondition.WatchLoop] = false;
return Promise.reject(
new YorkieError(Code.ClientNotActive, `${this.key} is not active`),
);
Expand Down Expand Up @@ -628,11 +672,10 @@ export class Client {
]);
logger.debug(`[WD] c:"${this.getKey()}" unwatches`);

if (
err instanceof ConnectError &&
err.code != ConnectErrorCode.Canceled
) {
if (this.isRetryableConnectError(err)) {
onDisconnect();
} else {
this.conditions[ClientCondition.WatchLoop] = false;
}

reject(err);
Expand Down Expand Up @@ -740,6 +783,32 @@ export class Client {
});
}

/**
* `isRetryableConnectError` returns whether the given error is a retryable error.
*/
private isRetryableConnectError(err: any): boolean {
if (!(err instanceof ConnectError)) {
return false;
}

// NOTE(hackerwins): These errors are retryable.
// Connect guide indicates that for error codes like `ResourceExhausted` and
// `Unavailable`, retries should be attempted following their guidelines.
// Additionally, `Unknown` and `Canceled` are added separately as it
// typically occurs when the server is stopped.
const retryables = [
ConnectErrorCode.Canceled,
ConnectErrorCode.Unknown,
ConnectErrorCode.ResourceExhausted,
ConnectErrorCode.Unavailable,
];

// TODO(hackerwins): We need to handle more cases.
// - FailedPrecondition: If the client fixes its state, it is retryable.
// - Unauthenticated: The client is not authenticated. It is retryable after reauthentication.
return retryables.includes(err.code);
}

/**
* `enqueueTask` enqueues the given task to the task queue.
*/
Expand Down
9 changes: 7 additions & 2 deletions src/yorkie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import * as Devtools from '@yorkie-js-sdk/src/devtools/types';
export {
Client,
ClientStatus,
ClientCondition,
SyncMode,
type ClientOptions,
} from '@yorkie-js-sdk/src/client/client';
Expand Down Expand Up @@ -112,8 +113,8 @@ export {
export { Change } from '@yorkie-js-sdk/src/document/change/change';
export { converter } from '@yorkie-js-sdk/src/api/converter';

export type { LogLevel } from '@yorkie-js-sdk/src/util/logger';
export { setLogLevel } from '@yorkie-js-sdk/src/util/logger';
import { LogLevel, setLogLevel } from '@yorkie-js-sdk/src/util/logger';
export { LogLevel, setLogLevel } from '@yorkie-js-sdk/src/util/logger';

export {
EventSourceDevPanel,
Expand Down Expand Up @@ -142,6 +143,8 @@ export default {
Text,
Counter,
Tree,
LogLevel,
setLogLevel,
IntType: CounterType.IntegerCnt,
LongType: CounterType.LongCnt,
};
Expand All @@ -155,6 +158,8 @@ if (typeof globalThis !== 'undefined') {
Text,
Counter,
Tree,
LogLevel,
setLogLevel,
IntType: CounterType.IntegerCnt,
LongType: CounterType.LongCnt,
};
Expand Down
37 changes: 37 additions & 0 deletions test/integration/client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { describe, it, assert, vi, afterEach, expect } from 'vitest';
import yorkie, {
Counter,
SyncMode,
ClientCondition,
DocEventType,
DocumentSyncStatus,
Tree,
Expand All @@ -14,6 +15,7 @@ import {
testRPCAddr,
withTwoClientsAndDocuments,
} from '@yorkie-js-sdk/test/integration/integration_helper';
import { ConnectError, Code } from '@connectrpc/connect';

describe.sequential('Client', function () {
afterEach(() => {
Expand Down Expand Up @@ -763,4 +765,39 @@ describe.sequential('Client', function () {
await cli.deactivate();
}
});

it('Should retry on network failure and eventually succeed', async ({
task,
}) => {
const cli = new yorkie.Client(testRPCAddr, {
retrySyncLoopDelay: 10,
});
await cli.activate();

const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`));
await cli.attach(doc);

// 01. Simulate Unknown error.
vi.stubGlobal('fetch', async () => {
throw new ConnectError('Failed to fetch', Code.Unknown);
});

doc.update((root) => {
root.t = new Text();
root.t.edit(0, 0, 'a');
});

await new Promise((res) => setTimeout(res, 30));
assert.isTrue(cli.getCondition(ClientCondition.SyncLoop));

// 02. Simulate FailedPrecondition error.
vi.stubGlobal('fetch', async () => {
throw new ConnectError('Failed to fetch', Code.FailedPrecondition);
});

await new Promise((res) => setTimeout(res, 30));
assert.isFalse(cli.getCondition(ClientCondition.SyncLoop));

vi.unstubAllGlobals();
});
});

0 comments on commit ff7d63a

Please sign in to comment.