Skip to content

Commit

Permalink
Add taskQueue to handle each request one by one (#862)
Browse files Browse the repository at this point in the history
When using JS SDK in environments like React, it is possible to assign
Client to React Components. However, when Component Unmount is
executed by moving location, resources like `yorkie.Client` need to be
`deactivated` or `yorkie.Document` need to be `detached`. The
challenge arises when it is not possible to await asynchronous
requests while deactivating one by one due to some reasons.

To address this issue, this commit introduces a task queue in `Client`
to handle all requests one by one, ensuring proper deactivation or
detachment.
  • Loading branch information
hackerwins authored Jul 5, 2024
1 parent fe8ad49 commit 4f7dcce
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 135 deletions.
306 changes: 172 additions & 134 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ export class Client {
private retrySyncLoopDelay: number;

private rpcClient: PromiseClient<typeof YorkieService>;
private taskQueue: Array<() => Promise<any>>;
private processing: boolean = false;

/**
* @param rpcAddr - the address of the RPC server.
Expand Down Expand Up @@ -196,6 +198,7 @@ export class Client {
],
}),
);
this.taskQueue = [];
}

/**
Expand All @@ -208,24 +211,24 @@ export class Client {
return Promise.resolve();
}

return this.rpcClient
.activateClient(
{
clientKey: this.key,
},
{ headers: { 'x-shard-key': this.apiKey } },
)
.then((res) => {
this.id = res.clientId;
this.status = ClientStatus.Activated;
this.runSyncLoop();

logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`);
})
.catch((err) => {
logger.error(`[AC] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.activateClient(
{ clientKey: this.key },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then((res) => {
this.id = res.clientId;
this.status = ClientStatus.Activated;
this.runSyncLoop();

logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`);
})
.catch((err) => {
logger.error(`[AC] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand All @@ -236,27 +239,27 @@ export class Client {
return Promise.resolve();
}

return this.rpcClient
.deactivateClient(
{
clientId: this.id!,
},
{ headers: { 'x-shard-key': this.apiKey } },
)
.then(() => {
for (const [key, attachment] of this.attachmentMap) {
attachment.doc.applyStatus(DocumentStatus.Detached);
this.detachInternal(key);
}

this.status = ClientStatus.Deactivated;

logger.info(`[DC] c"${this.getKey()}" deactivated`);
})
.catch((err) => {
logger.error(`[DC] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.deactivateClient(
{ clientId: this.id! },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then(() => {
for (const [key, attachment] of this.attachmentMap) {
attachment.doc.applyStatus(DocumentStatus.Detached);
this.detachInternal(key);
}

this.status = ClientStatus.Deactivated;

logger.info(`[DC] c"${this.getKey()}" deactivated`);
})
.catch((err) => {
logger.error(`[DC] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand All @@ -283,46 +286,45 @@ export class Client {
doc.update((_, p) => p.set(options.initialPresence || {}));

const syncMode = options.syncMode ?? SyncMode.Realtime;
return this.enqueueTask(async () => {
return this.rpcClient
.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() === DocumentStatus.Removed) {
return doc;
}

doc.applyStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
syncMode,
),
);

return this.rpcClient
.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() === DocumentStatus.Removed) {
return doc;
}

doc.applyStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
syncMode,
),
);

if (syncMode !== SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
}
if (syncMode !== SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
}

logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[AD] c:"${this.getKey()}" err :`, err);
throw err;
});
logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[AD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -351,33 +353,33 @@ export class Client {
}
doc.update((_, p) => p.clear());

return this.rpcClient
.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() !== DocumentStatus.Removed) {
doc.applyStatus(DocumentStatus.Detached);
}
this.detachInternal(doc.getKey());
return this.enqueueTask(async () => {
return this.rpcClient
.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() !== DocumentStatus.Removed) {
doc.applyStatus(DocumentStatus.Detached);
}
this.detachInternal(doc.getKey());

logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[DD] c:"${this.getKey()}" err :`, err);
throw err;
});
logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[DD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -439,7 +441,6 @@ export class Client {
if (!this.isActive()) {
throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`);
}
const promises = [];
if (doc) {
// prettier-ignore
const attachment = this.attachmentMap.get(doc.getKey()) as Attachment<T, P>;
Expand All @@ -449,14 +450,18 @@ export class Client {
`${doc.getKey()} is not attached`,
);
}
promises.push(this.syncInternal(attachment, SyncMode.Realtime));
} else {
this.attachmentMap.forEach((attachment) => {
promises.push(this.syncInternal(attachment, attachment.syncMode));
});
return this.enqueueTask(async () =>
this.syncInternal(attachment, SyncMode.Realtime),
);
}

return Promise.all(promises);
return this.enqueueTask(async () => {
const promises = [];
for (const [, attachment] of this.attachmentMap) {
promises.push(this.syncInternal(attachment, attachment.syncMode));
}
return Promise.all(promises);
});
}

/**
Expand All @@ -477,28 +482,29 @@ export class Client {

const pbChangePack = converter.toChangePack(doc.createChangePack());
pbChangePack.isRemoved = true;
return this.rpcClient
.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
this.detachInternal(doc.getKey());

logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`);
})
.catch((err) => {
logger.error(`[RD] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
this.detachInternal(doc.getKey());

logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`);
})
.catch((err) => {
logger.error(`[RD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -685,9 +691,7 @@ export class Client {
changePack: converter.toChangePack(reqPack),
pushOnly: syncMode === SyncMode.RealtimePushOnly,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const respPack = converter.fromChangePack<P>(res.changePack!);
Expand Down Expand Up @@ -735,4 +739,38 @@ export class Client {
throw err;
});
}

/**
* `enqueueTask` enqueues the given task to the task queue.
*/
private enqueueTask(task: () => Promise<any>): Promise<any> {
return new Promise((resolve, reject) => {
this.taskQueue.push(() => task().then(resolve).catch(reject));

if (!this.processing) {
this.processNext();
}
});
}

/**
* `processNext` processes the next task in the task queue. This method is
* part of enqueueTask.
*/
private async processNext() {
if (this.taskQueue.length === 0) {
this.processing = false;
return;
}

try {
this.processing = true;
const task = this.taskQueue.shift()!;
await task();
} catch (error) {
logger.error(`[TQ] c:"${this.getKey()}" process failed, id:"${this.id}"`);
}

this.processNext();
}
}
Loading

0 comments on commit 4f7dcce

Please sign in to comment.