From 8ccd84d668e99c24fa2ec1354c12ed3b48ae4f40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tadeusz=20=E2=80=9Etadzik=E2=80=9D=20So=C5=9Bnierz?= Date: Thu, 4 Jul 2024 15:54:38 +0200 Subject: [PATCH] Push Matrix->Slack messages through the same queue as Slack->Matrix There is a possible race condition when a message gets sent from Matrix to Slack, and its echo arrives from Slack to Matrix before we got the response from Slack. We'd then check for its presence in recentSlackMessages before it actually gets added there, resulting in an undesirable echo and duplicate messages. This adds Matrix->Slack sends to the same FIFO queue as we do for Slack->Matrix, which ensures that we would have added a message to recentSlackMessages before we start processing its echo. Hopefully fixes GH-788. --- src/BridgedRoom.ts | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/BridgedRoom.ts b/src/BridgedRoom.ts index 126d6815..59d8ec5b 100644 --- a/src/BridgedRoom.ts +++ b/src/BridgedRoom.ts @@ -159,11 +159,17 @@ export class BridgedRoom { public MatrixRoomActive: boolean; private recentSlackMessages: string[] = []; - private slackSendLock: Promise = Promise.resolve(); + private bridgingQueue: Promise = Promise.resolve(); private waitingForJoin?: Promise; private waitingForJoinResolve?: () => void; + private async pushToBridgingQueue(fn: () => Promise): Promise { + return new Promise((resolve) => { + this.bridgingQueue = this.bridgingQueue.finally(() => fn().then(resolve)); + }); + } + /** * True if this instance has changed from the version last read/written to the RoomStore. */ @@ -528,7 +534,6 @@ export class BridgedRoom { body.as_user = true; delete body.username; } - let res: ChatPostMessageResponse; const chatPostMessageArgs = { ...body, // Ensure that text is defined, even for attachments. @@ -537,21 +542,25 @@ export class BridgedRoom { unfurl_links: true, }; - try { - res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; - } catch (ex) { - const platformError = ex as WebAPIPlatformError; - if (platformError.data?.error === "not_in_channel") { - await slackClient.conversations.join({ - channel: chatPostMessageArgs.channel, - }); + const res = await this.pushToBridgingQueue(async () => { + let res: ChatPostMessageResponse; + try { res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; - } else { - throw ex; + } catch (ex) { + const platformError = ex as WebAPIPlatformError; + if (platformError.data?.error === "not_in_channel") { + await slackClient.conversations.join({ + channel: chatPostMessageArgs.channel, + }); + res = await slackClient.chat.postMessage(chatPostMessageArgs) as ChatPostMessageResponse; + } else { + throw ex; + } } - } - this.addRecentSlackMessage(res.ts); + this.addRecentSlackMessage(res.ts); + return res; + }); this.main.incCounter(METRIC_SENT_MESSAGES, {side: "remote"}); // Log activity, but don't await the answer or throw errors @@ -710,7 +719,7 @@ export class BridgedRoom { if (ghostChanged) { await this.main.fixDMMetadata(this, ghost); } - this.slackSendLock = this.slackSendLock.then(() => { + await this.pushToBridgingQueue(async () => { // Check again if (this.recentSlackMessages.includes(message.ts)) { // We sent this, ignore @@ -720,7 +729,6 @@ export class BridgedRoom { log.warn(`Failed to handle slack message ${message.ts} for ${this.MatrixRoomId} ${this.slackChannelId}`, ex); }); }); - await this.slackSendLock; } catch (err) { log.error("Failed to process event"); log.error(err);