diff --git a/apps/meteor/app/models/server/models/LivechatRooms.js b/apps/meteor/app/models/server/models/LivechatRooms.js index a2c5239daf36b..cef4529b21b0b 100644 --- a/apps/meteor/app/models/server/models/LivechatRooms.js +++ b/apps/meteor/app/models/server/models/LivechatRooms.js @@ -209,7 +209,10 @@ export class LivechatRooms extends Base { const query = { 't': 'l', 'v.token': visitorToken, - '$or': [{ 'email.thread': { $elemMatch: { $in: emailThread } } }, { 'email.thread': new RegExp(emailThread.join('|')) }], + '$or': [ + { 'email.thread': { $elemMatch: { $in: emailThread } } }, + { 'email.thread': new RegExp(emailThread.map((t) => `"${t}"`).join('|')) }, + ], ...(departmentId && { departmentId }), }; diff --git a/apps/meteor/server/email/IMAPInterceptor.ts b/apps/meteor/server/email/IMAPInterceptor.ts index 00646f3ba844e..b62534bf7edd7 100644 --- a/apps/meteor/server/email/IMAPInterceptor.ts +++ b/apps/meteor/server/email/IMAPInterceptor.ts @@ -1,9 +1,10 @@ import { EventEmitter } from 'events'; +import type { ImapMessage, ImapMessageBodyInfo } from 'imap'; import IMAP from 'imap'; -import type Connection from 'imap'; import type { ParsedMail } from 'mailparser'; import { simpleParser } from 'mailparser'; +import { EmailInbox } from '@rocket.chat/models'; import { logger } from '../features/EmailInbox/logger'; @@ -24,12 +25,14 @@ export class IMAPInterceptor extends EventEmitter { private config: IMAP.Config; - private initialBackoffDurationMS = 30000; + private backoffDurationMS = 3000; private backoff: NodeJS.Timeout; private retries = 0; + private inboxId: string; + constructor( imapConfig: IMAP.Config, private options: IMAPOptions = { @@ -38,6 +41,7 @@ export class IMAPInterceptor extends EventEmitter { markSeen: true, maxRetries: 10, }, + id: string, ) { super(); @@ -49,132 +53,178 @@ export class IMAPInterceptor extends EventEmitter { ...(imapConfig.tls && { tlsOptions: { servername: imapConfig.host } }), ...imapConfig, }); + this.retries = 0; + this.inboxId = id; + this.start(); + } + + openInbox(): Promise { + return new Promise((resolve, reject) => { + const cb = (err: Error, mailbox: IMAP.Box) => { + if (err) { + reject(err); + } else { + resolve(mailbox); + } + }; + this.imap.openBox('INBOX', false, cb); + }); + } + async start(): Promise { // On successfully connected. - this.imap.on('ready', () => { - if (this.imap.state !== 'disconnected') { + this.imap.on('ready', async () => { + if (this.isActive()) { + logger.info(`IMAP connected to ${this.config.user}`); clearTimeout(this.backoff); this.retries = 0; - this.openInbox((err) => { - if (err) { - logger.error(`Error occurred during imap on inbox ${this.config.user}: `, err); - throw err; - } - // fetch new emails & wait [IDLE] - this.getEmails(); - - // If new message arrived, fetch them - this.imap.on('mail', () => { - this.getEmails(); - }); - }); + this.backoffDurationMS = 3000; + await this.openInbox(); + this.imap.on('mail', () => this.getEmails().catch((err: Error) => logger.debug('Error on getEmails: ', err.message))); } else { - logger.error(`IMAP did not connect on inbox ${this.config.user}`); - this.imap.end(); - this.reconnect(); + logger.error("Can't connect to IMAP server"); } }); - this.imap.on('error', (err: Error) => { - logger.error(`Error occurred on inbox ${this.config.user}: `, err); - this.stop(() => this.reconnect()); + this.imap.on('error', async (err: Error) => { + logger.error({ err }); + logger.error(`IMAP error: ${err.message}`); + this.retries++; + await this.reconnect(); }); - this.imap.on('close', () => { - this.reconnect(); + this.imap.on('close', async () => { + await this.reconnect(); }); - } - - openInbox(cb: (error: Error, mailbox: Connection.Box) => void): void { - this.imap.openBox('INBOX', false, cb); - } - - start(): void { - this.imap.connect(); + this.retries += 1; + return this.imap.connect(); } isActive(): boolean { - if (this.imap?.state && this.imap.state === 'disconnected') { - return false; - } - - return true; + return !!(this.imap?.state && this.imap.state !== 'disconnected'); } stop(callback = new Function()): void { - logger.info('IMAP stop called'); - this.imap.end(); + logger.debug('IMAP stop called'); + this.imap.removeAllListeners(); this.imap.once('end', () => { - logger.info('IMAP stopped'); + logger.debug('IMAP stopped'); callback?.(); }); + this.imap.end(); } - reconnect(): void { + async reconnect(): Promise { + if (!this.isActive() && !this.canRetry()) { + logger.info(`Max retries reached for ${this.config.user}`); + this.stop(); + return this.selfDisable(); + } + if (this.backoff) { + clearTimeout(this.backoff); + this.backoffDurationMS = 3000; + } const loop = (): void => { - this.start(); - if (this.retries < this.options.maxRetries) { - this.retries += 1; - this.initialBackoffDurationMS *= 2; - this.backoff = setTimeout(loop, this.initialBackoffDurationMS); + logger.debug(`Reconnecting to ${this.config.user}: ${this.retries}`); + if (this.canRetry()) { + this.backoffDurationMS *= 2; + this.backoff = setTimeout(loop, this.backoffDurationMS); } else { - logger.error(`IMAP reconnection failed on inbox ${this.config.user}`); + logger.info(`IMAP reconnection failed on inbox ${this.config.user}`); clearTimeout(this.backoff); + this.stop(); + this.selfDisable(); + return; } + this.stop(); + this.start(); }; - this.backoff = setTimeout(loop, this.initialBackoffDurationMS); + this.backoff = setTimeout(loop, this.backoffDurationMS); } - // Fetch all UNSEEN messages and pass them for further processing - getEmails(): void { - this.imap.search(this.options.filter, (err, newEmails) => { - logger.debug(`IMAP search on inbox ${this.config.user} returned ${newEmails.length} new emails: `, newEmails); - if (err) { - logger.error(err); - throw err; - } - // newEmails => array containing serials of unseen messages - if (newEmails.length > 0) { - const fetch = this.imap.fetch(newEmails, { - bodies: ['HEADER', 'TEXT', ''], - struct: true, - markSeen: this.options.markSeen, - }); - - fetch.on('message', (msg, seqno) => { - logger.debug('E-mail received', seqno, msg); - - msg.on('body', (stream, type) => { - if (type.which !== '') { - return; - } + imapSearch(): Promise { + return new Promise((resolve, reject) => { + const cb = (err: Error, results: number[]) => { + if (err) { + reject(err); + } else { + resolve(results); + } + }; + this.imap.search(this.options.filter, cb); + }); + } - simpleParser(stream, (_err, email) => { - if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) { - logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject); - return; - } - this.emit('email', email); - }); - }); + parseEmails(stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): Promise { + return new Promise((resolve, reject) => { + const cb = (err: Error, mail: ParsedMail) => { + if (err) { + reject(err); + } else { + resolve(mail); + } + }; + simpleParser(stream, cb); + }); + } - // On fetched each message, pass it further - msg.once('end', () => { - // delete message from inbox + imapFetch(emailIds: number[]): Promise { + return new Promise((resolve, reject) => { + const out: number[] = []; + const messagecb = (msg: ImapMessage, seqno: number) => { + out.push(seqno); + const bodycb = (stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): void => { + simpleParser(stream, (_err, email) => { + if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) { + logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject); + return; + } + this.emit('email', email); if (this.options.deleteAfterRead) { - this.imap.seq.addFlags(seqno, 'Deleted', (err) => { + this.imap.seq.addFlags(email, 'Deleted', (err) => { if (err) { - logger.error(`Mark deleted error: ${err}`); + logger.warn(`Mark deleted error: ${err}`); } }); } }); - }); - - fetch.once('error', (err) => { - logger.error(`Fetch error: ${err}`); - }); - } + }; + msg.once('body', bodycb); + }; + const errorcb = (err: Error): void => { + logger.warn(`Fetch error: ${err}`); + reject(err); + }; + const endcb = (): void => { + resolve(out); + }; + const fetch = this.imap.fetch(emailIds, { + bodies: ['HEADER', 'TEXT', ''], + struct: true, + markSeen: this.options.markSeen, + }); + + fetch.on('message', messagecb); + fetch.on('error', errorcb); + fetch.on('end', endcb); }); } + + // Fetch all UNSEEN messages and pass them for further processing + async getEmails(): Promise { + const emailIds = await this.imapSearch(); + await this.imapFetch(emailIds); + } + + canRetry(): boolean { + return this.retries < this.options.maxRetries || this.options.maxRetries === -1; + } + + async selfDisable(): Promise { + logger.info(`Disabling inbox ${this.inboxId}`); + // Again, if there's 2 inboxes with the same email, this will prevent looping over the already disabled one + // Active filter is just in case :) + await EmailInbox.findOneAndUpdate({ _id: this.inboxId, active: true }, { $set: { active: false } }); + logger.info(`IMAP inbox ${this.inboxId} automatically disabled`); + } } diff --git a/apps/meteor/server/features/EmailInbox/EmailInbox.ts b/apps/meteor/server/features/EmailInbox/EmailInbox.ts index 739888aa29dc1..320dd71a84c3d 100644 --- a/apps/meteor/server/features/EmailInbox/EmailInbox.ts +++ b/apps/meteor/server/features/EmailInbox/EmailInbox.ts @@ -49,11 +49,12 @@ export async function configureEmailInboxes(): Promise { }, { deleteAfterRead: false, - filter: [['UNSEEN'], ['SINCE', emailInboxRecord._updatedAt]], - rejectBeforeTS: emailInboxRecord._updatedAt, + filter: [['UNSEEN'], ['SINCE', emailInboxRecord._createdAt]], + rejectBeforeTS: emailInboxRecord._createdAt, markSeen: true, maxRetries: emailInboxRecord.imap.maxRetries, }, + emailInboxRecord._id, ); imap.on( diff --git a/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts b/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts index 4105b301b45db..2d7b701ebe048 100644 --- a/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts +++ b/apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts @@ -127,7 +127,7 @@ async function uploadAttachment(attachment: Attachment, rid: string, visitorToke } export async function onEmailReceived(email: ParsedMail, inbox: string, department = ''): Promise { - logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`, email); + logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`); if (!email.from?.value?.[0]?.address) { return; } @@ -161,12 +161,18 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme room = await QueueManager.unarchiveRoom(room); } - let msg = email.text; - - if (email.html) { - // Try to remove the signature and history - msg = stripHtml(email.html.replace(/