Skip to content

Commit

Permalink
Add taskQueue to handle each request one by one
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Jul 5, 2024
1 parent fe8ad49 commit ed9779a
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 135 deletions.
306 changes: 172 additions & 134 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ export class Client {
private retrySyncLoopDelay: number;

private rpcClient: PromiseClient<typeof YorkieService>;
private taskQueue: Array<() => Promise<any>>;
private processing: boolean = false;

/**
* @param rpcAddr - the address of the RPC server.
Expand Down Expand Up @@ -196,6 +198,7 @@ export class Client {
],
}),
);
this.taskQueue = [];
}

/**
Expand All @@ -208,24 +211,24 @@ export class Client {
return Promise.resolve();
}

return this.rpcClient
.activateClient(
{
clientKey: this.key,
},
{ headers: { 'x-shard-key': this.apiKey } },
)
.then((res) => {
this.id = res.clientId;
this.status = ClientStatus.Activated;
this.runSyncLoop();

logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`);
})
.catch((err) => {
logger.error(`[AC] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.activateClient(
{ clientKey: this.key },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then((res) => {
this.id = res.clientId;
this.status = ClientStatus.Activated;
this.runSyncLoop();

logger.info(`[AC] c:"${this.getKey()}" activated, id:"${this.id}"`);
})
.catch((err) => {
logger.error(`[AC] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand All @@ -236,27 +239,27 @@ export class Client {
return Promise.resolve();
}

return this.rpcClient
.deactivateClient(
{
clientId: this.id!,
},
{ headers: { 'x-shard-key': this.apiKey } },
)
.then(() => {
for (const [key, attachment] of this.attachmentMap) {
attachment.doc.applyStatus(DocumentStatus.Detached);
this.detachInternal(key);
}

this.status = ClientStatus.Deactivated;

logger.info(`[DC] c"${this.getKey()}" deactivated`);
})
.catch((err) => {
logger.error(`[DC] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.deactivateClient(
{ clientId: this.id! },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then(() => {
for (const [key, attachment] of this.attachmentMap) {
attachment.doc.applyStatus(DocumentStatus.Detached);
this.detachInternal(key);
}

this.status = ClientStatus.Deactivated;

logger.info(`[DC] c"${this.getKey()}" deactivated`);
})
.catch((err) => {
logger.error(`[DC] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand All @@ -283,46 +286,45 @@ export class Client {
doc.update((_, p) => p.set(options.initialPresence || {}));

const syncMode = options.syncMode ?? SyncMode.Realtime;
return this.enqueueTask(async () => {
return this.rpcClient
.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() === DocumentStatus.Removed) {
return doc;
}

doc.applyStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
syncMode,
),
);

return this.rpcClient
.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() === DocumentStatus.Removed) {
return doc;
}

doc.applyStatus(DocumentStatus.Attached);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
syncMode,
),
);

if (syncMode !== SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
}
if (syncMode !== SyncMode.Manual) {
await this.runWatchLoop(doc.getKey());
}

logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[AD] c:"${this.getKey()}" err :`, err);
throw err;
});
logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[AD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -351,33 +353,33 @@ export class Client {
}
doc.update((_, p) => p.clear());

return this.rpcClient
.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() !== DocumentStatus.Removed) {
doc.applyStatus(DocumentStatus.Detached);
}
this.detachInternal(doc.getKey());
return this.enqueueTask(async () => {
return this.rpcClient
.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
if (doc.getStatus() !== DocumentStatus.Removed) {
doc.applyStatus(DocumentStatus.Detached);
}
this.detachInternal(doc.getKey());

logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[DD] c:"${this.getKey()}" err :`, err);
throw err;
});
logger.info(`[DD] c:"${this.getKey()}" detaches d:"${doc.getKey()}"`);
return doc;
})
.catch((err) => {
logger.error(`[DD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -439,7 +441,6 @@ export class Client {
if (!this.isActive()) {
throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`);
}
const promises = [];
if (doc) {
// prettier-ignore
const attachment = this.attachmentMap.get(doc.getKey()) as Attachment<T, P>;
Expand All @@ -449,14 +450,18 @@ export class Client {
`${doc.getKey()} is not attached`,
);
}
promises.push(this.syncInternal(attachment, SyncMode.Realtime));
} else {
this.attachmentMap.forEach((attachment) => {
promises.push(this.syncInternal(attachment, attachment.syncMode));
});
return this.enqueueTask(async () =>
this.syncInternal(attachment, SyncMode.Realtime),
);
}

return Promise.all(promises);
return this.enqueueTask(async () => {
const promises = [];
for (const [, attachment] of this.attachmentMap) {
promises.push(this.syncInternal(attachment, attachment.syncMode));
}
await Promise.all(promises);
});
}

/**
Expand All @@ -477,28 +482,29 @@ export class Client {

const pbChangePack = converter.toChangePack(doc.createChangePack());
pbChangePack.isRemoved = true;
return this.rpcClient
.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
this.detachInternal(doc.getKey());

logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`);
})
.catch((err) => {
logger.error(`[RD] c:"${this.getKey()}" err :`, err);
throw err;
});
return this.enqueueTask(async () => {
return this.rpcClient
.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
this.detachInternal(doc.getKey());

logger.info(`[RD] c:"${this.getKey()}" removes d:"${doc.getKey()}"`);
})
.catch((err) => {
logger.error(`[RD] c:"${this.getKey()}" err :`, err);
throw err;
});
});
}

/**
Expand Down Expand Up @@ -685,9 +691,7 @@ export class Client {
changePack: converter.toChangePack(reqPack),
pushOnly: syncMode === SyncMode.RealtimePushOnly,
},
{
headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` },
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const respPack = converter.fromChangePack<P>(res.changePack!);
Expand Down Expand Up @@ -735,4 +739,38 @@ export class Client {
throw err;
});
}

/**
* `enqueueTask` enqueues the given task to the task queue.
*/
private enqueueTask(task: () => Promise<any>): Promise<any> {
return new Promise((resolve, reject) => {
this.taskQueue.push(() => task().then(resolve).catch(reject));

if (!this.processing) {
this.processNext();
}
});
}

/**
* `processNext` processes the next task in the task queue. This method is
* part of enqueueTask.
*/
private async processNext() {
if (this.taskQueue.length === 0) {
this.processing = false;
return;
}

try {
this.processing = true;
const task = this.taskQueue.shift()!;
await task();
} catch (error) {
logger.error(`[TQ] c:"${this.getKey()}" process failed, id:"${this.id}"`);
}

this.processNext();
}
}
Loading

0 comments on commit ed9779a

Please sign in to comment.