diff --git a/src/client/client.ts b/src/client/client.ts index 1f2bc75d9..c43d9347d 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -163,6 +163,8 @@ export class Client { private retrySyncLoopDelay: number; private rpcClient: PromiseClient; + private taskQueue: Array<() => Promise>; + private processing: boolean = false; /** * @param rpcAddr - the address of the RPC server. @@ -196,6 +198,7 @@ export class Client { ], }), ); + this.taskQueue = []; } /** @@ -208,24 +211,24 @@ export class Client { return Promise.resolve(); } - return this.rpcClient - .activateClient( - { - clientKey: this.key, - }, - { headers: { 'x-shard-key': this.apiKey } }, - ) - .then((res) => { - this.id = res.clientId; - this.status = ClientStatus.Activated; - this.runSyncLoop(); - - logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`); - }) - .catch((err) => { - logger.error(`[AC] c:"${this.getKey()}" err :`, err); - throw err; - }); + return this.enqueueTask(async () => { + return this.rpcClient + .activateClient( + { clientKey: this.key }, + { headers: { 'x-shard-key': this.apiKey } }, + ) + .then((res) => { + this.id = res.clientId; + this.status = ClientStatus.Activated; + this.runSyncLoop(); + + logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`); + }) + .catch((err) => { + logger.error(`[AC] c:"${this.getKey()}" err :`, err); + throw err; + }); + }); } /** @@ -236,27 +239,27 @@ export class Client { return Promise.resolve(); } - return this.rpcClient - .deactivateClient( - { - clientId: this.id!, - }, - { headers: { 'x-shard-key': this.apiKey } }, - ) - .then(() => { - for (const [key, attachment] of this.attachmentMap) { - attachment.doc.applyStatus(DocumentStatus.Detached); - this.detachInternal(key); - } - - this.status = ClientStatus.Deactivated; - - logger.info(`[DC] c"${this.getKey()}" deactivated`); - }) - .catch((err) => { - logger.error(`[DC] c:"${this.getKey()}" err :`, err); - throw err; - }); + return this.enqueueTask(async () => { + return this.rpcClient + .deactivateClient( + { clientId: this.id! }, + { headers: { 'x-shard-key': this.apiKey } }, + ) + .then(() => { + for (const [key, attachment] of this.attachmentMap) { + attachment.doc.applyStatus(DocumentStatus.Detached); + this.detachInternal(key); + } + + this.status = ClientStatus.Deactivated; + + logger.info(`[DC] c"${this.getKey()}" deactivated`); + }) + .catch((err) => { + logger.error(`[DC] c:"${this.getKey()}" err :`, err); + throw err; + }); + }); } /** @@ -283,46 +286,45 @@ export class Client { doc.update((_, p) => p.set(options.initialPresence || {})); const syncMode = options.syncMode ?? SyncMode.Realtime; + return this.enqueueTask(async () => { + return this.rpcClient + .attachDocument( + { + clientId: this.id!, + changePack: converter.toChangePack(doc.createChangePack()), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then(async (res) => { + const pack = converter.fromChangePack

(res.changePack!); + doc.applyChangePack(pack); + if (doc.getStatus() === DocumentStatus.Removed) { + return doc; + } + + doc.applyStatus(DocumentStatus.Attached); + this.attachmentMap.set( + doc.getKey(), + new Attachment( + this.reconnectStreamDelay, + doc, + res.documentId, + syncMode, + ), + ); - return this.rpcClient - .attachDocument( - { - clientId: this.id!, - changePack: converter.toChangePack(doc.createChangePack()), - }, - { - headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` }, - }, - ) - .then(async (res) => { - const pack = converter.fromChangePack

(res.changePack!); - doc.applyChangePack(pack); - if (doc.getStatus() === DocumentStatus.Removed) { - return doc; - } - - doc.applyStatus(DocumentStatus.Attached); - this.attachmentMap.set( - doc.getKey(), - new Attachment( - this.reconnectStreamDelay, - doc, - res.documentId, - syncMode, - ), - ); - - if (syncMode !== SyncMode.Manual) { - await this.runWatchLoop(doc.getKey()); - } + if (syncMode !== SyncMode.Manual) { + await this.runWatchLoop(doc.getKey()); + } - logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`); - return doc; - }) - .catch((err) => { - logger.error(`[AD] c:"${this.getKey()}" err :`, err); - throw err; - }); + logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`); + return doc; + }) + .catch((err) => { + logger.error(`[AD] c:"${this.getKey()}" err :`, err); + throw err; + }); + }); } /** @@ -351,33 +353,33 @@ export class Client { } doc.update((_, p) => p.clear()); - return this.rpcClient - .detachDocument( - { - clientId: this.id!, - documentId: attachment.docID, - changePack: converter.toChangePack(doc.createChangePack()), - removeIfNotAttached: options.removeIfNotAttached ?? false, - }, - { - headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` }, - }, - ) - .then((res) => { - const pack = converter.fromChangePack

(res.changePack!); - doc.applyChangePack(pack); - if (doc.getStatus() !== DocumentStatus.Removed) { - doc.applyStatus(DocumentStatus.Detached); - } - this.detachInternal(doc.getKey()); + return this.enqueueTask(async () => { + return this.rpcClient + .detachDocument( + { + clientId: this.id!, + documentId: attachment.docID, + changePack: converter.toChangePack(doc.createChangePack()), + removeIfNotAttached: options.removeIfNotAttached ?? false, + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then((res) => { + const pack = converter.fromChangePack

(res.changePack!); + doc.applyChangePack(pack); + if (doc.getStatus() !== DocumentStatus.Removed) { + doc.applyStatus(DocumentStatus.Detached); + } + this.detachInternal(doc.getKey()); - logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`); - return doc; - }) - .catch((err) => { - logger.error(`[DD] c:"${this.getKey()}" err :`, err); - throw err; - }); + logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`); + return doc; + }) + .catch((err) => { + logger.error(`[DD] c:"${this.getKey()}" err :`, err); + throw err; + }); + }); } /** @@ -439,7 +441,6 @@ export class Client { if (!this.isActive()) { throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`); } - const promises = []; if (doc) { // prettier-ignore const attachment = this.attachmentMap.get(doc.getKey()) as Attachment; @@ -449,14 +450,18 @@ export class Client { `${doc.getKey()} is not attached`, ); } - promises.push(this.syncInternal(attachment, SyncMode.Realtime)); - } else { - this.attachmentMap.forEach((attachment) => { - promises.push(this.syncInternal(attachment, attachment.syncMode)); - }); + return this.enqueueTask(async () => + this.syncInternal(attachment, SyncMode.Realtime), + ); } - return Promise.all(promises); + return this.enqueueTask(async () => { + const promises = []; + for (const [, attachment] of this.attachmentMap) { + promises.push(this.syncInternal(attachment, attachment.syncMode)); + } + await Promise.all(promises); + }); } /** @@ -477,28 +482,29 @@ export class Client { const pbChangePack = converter.toChangePack(doc.createChangePack()); pbChangePack.isRemoved = true; - return this.rpcClient - .removeDocument( - { - clientId: this.id!, - documentId: attachment.docID, - changePack: pbChangePack, - }, - { - headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` }, - }, - ) - .then((res) => { - const pack = converter.fromChangePack

(res.changePack!); - doc.applyChangePack(pack); - this.detachInternal(doc.getKey()); - logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`); - }) - .catch((err) => { - logger.error(`[RD] c:"${this.getKey()}" err :`, err); - throw err; - }); + return this.enqueueTask(async () => { + return this.rpcClient + .removeDocument( + { + clientId: this.id!, + documentId: attachment.docID, + changePack: pbChangePack, + }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, + ) + .then((res) => { + const pack = converter.fromChangePack

(res.changePack!); + doc.applyChangePack(pack); + this.detachInternal(doc.getKey()); + + logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`); + }) + .catch((err) => { + logger.error(`[RD] c:"${this.getKey()}" err :`, err); + throw err; + }); + }); } /** @@ -685,9 +691,7 @@ export class Client { changePack: converter.toChangePack(reqPack), pushOnly: syncMode === SyncMode.RealtimePushOnly, }, - { - headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` }, - }, + { headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } }, ) .then((res) => { const respPack = converter.fromChangePack

(res.changePack!); @@ -735,4 +739,38 @@ export class Client { throw err; }); } + + /** + * `enqueueTask` enqueues the given task to the task queue. + */ + private enqueueTask(task: () => Promise): Promise { + return new Promise((resolve, reject) => { + this.taskQueue.push(() => task().then(resolve).catch(reject)); + + if (!this.processing) { + this.processNext(); + } + }); + } + + /** + * `processNext` processes the next task in the task queue. This method is + * part of enqueueTask. + */ + private async processNext() { + if (this.taskQueue.length === 0) { + this.processing = false; + return; + } + + try { + this.processing = true; + const task = this.taskQueue.shift()!; + await task(); + } catch (error) { + logger.error(`[TQ] c:"${this.getKey()}" process failed, id:"${this.id}"`); + } + + this.processNext(); + } } diff --git a/test/integration/client_test.ts b/test/integration/client_test.ts index 76e5a5676..670206c27 100644 --- a/test/integration/client_test.ts +++ b/test/integration/client_test.ts @@ -1,4 +1,4 @@ -import { describe, it, assert, vi, afterEach } from 'vitest'; +import { describe, it, assert, vi, afterEach, expect } from 'vitest'; import yorkie, { Counter, @@ -748,4 +748,19 @@ describe.sequential('Client', function () { await c1.deactivate(); await c2.deactivate(); }); + + it('Should handle each request one by one', async function ({ task }) { + for (let i = 0; i < 10; i++) { + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>( + toDocKey(`${task.name}-${new Date().getTime()}-{i}`), + ); + await cli.attach(doc); + + expect(cli.detach(doc)).resolves.toBeDefined(); + await cli.deactivate(); + } + }); });