Skip to content

Commit

Permalink
[FIX] Email inbox memory leak on connection failure (#26850)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Aleman <11577696+KevLehman@users.noreply.github.com>
  • Loading branch information
2 people authored and weslley543 committed Oct 17, 2022
1 parent b6e3663 commit 5125a86
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 116 deletions.
5 changes: 4 additions & 1 deletion apps/meteor/app/models/server/models/LivechatRooms.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ export class LivechatRooms extends Base {
const query = {
't': 'l',
'v.token': visitorToken,
'$or': [{ 'email.thread': { $elemMatch: { $in: emailThread } } }, { 'email.thread': new RegExp(emailThread.join('|')) }],
'$or': [
{ 'email.thread': { $elemMatch: { $in: emailThread } } },
{ 'email.thread': new RegExp(emailThread.map((t) => `"${t}"`).join('|')) },
],
...(departmentId && { departmentId }),
};

Expand Down
232 changes: 141 additions & 91 deletions apps/meteor/server/email/IMAPInterceptor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { EventEmitter } from 'events';

import type { ImapMessage, ImapMessageBodyInfo } from 'imap';
import IMAP from 'imap';
import type Connection from 'imap';
import type { ParsedMail } from 'mailparser';
import { simpleParser } from 'mailparser';
import { EmailInbox } from '@rocket.chat/models';

import { logger } from '../features/EmailInbox/logger';

Expand All @@ -24,12 +25,14 @@ export class IMAPInterceptor extends EventEmitter {

private config: IMAP.Config;

private initialBackoffDurationMS = 30000;
private backoffDurationMS = 3000;

private backoff: NodeJS.Timeout;

private retries = 0;

private inboxId: string;

constructor(
imapConfig: IMAP.Config,
private options: IMAPOptions = {
Expand All @@ -38,6 +41,7 @@ export class IMAPInterceptor extends EventEmitter {
markSeen: true,
maxRetries: 10,
},
id: string,
) {
super();

Expand All @@ -49,132 +53,178 @@ export class IMAPInterceptor extends EventEmitter {
...(imapConfig.tls && { tlsOptions: { servername: imapConfig.host } }),
...imapConfig,
});
this.retries = 0;
this.inboxId = id;
this.start();
}

openInbox(): Promise<IMAP.Box> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mailbox: IMAP.Box) => {
if (err) {
reject(err);
} else {
resolve(mailbox);
}
};
this.imap.openBox('INBOX', false, cb);
});
}

async start(): Promise<void> {
// On successfully connected.
this.imap.on('ready', () => {
if (this.imap.state !== 'disconnected') {
this.imap.on('ready', async () => {
if (this.isActive()) {
logger.info(`IMAP connected to ${this.config.user}`);
clearTimeout(this.backoff);
this.retries = 0;
this.openInbox((err) => {
if (err) {
logger.error(`Error occurred during imap on inbox ${this.config.user}: `, err);
throw err;
}
// fetch new emails & wait [IDLE]
this.getEmails();

// If new message arrived, fetch them
this.imap.on('mail', () => {
this.getEmails();
});
});
this.backoffDurationMS = 3000;
await this.openInbox();
this.imap.on('mail', () => this.getEmails().catch((err: Error) => logger.debug('Error on getEmails: ', err.message)));
} else {
logger.error(`IMAP did not connect on inbox ${this.config.user}`);
this.imap.end();
this.reconnect();
logger.error("Can't connect to IMAP server");
}
});

this.imap.on('error', (err: Error) => {
logger.error(`Error occurred on inbox ${this.config.user}: `, err);
this.stop(() => this.reconnect());
this.imap.on('error', async (err: Error) => {
logger.error({ err });
logger.error(`IMAP error: ${err.message}`);
this.retries++;
await this.reconnect();
});

this.imap.on('close', () => {
this.reconnect();
this.imap.on('close', async () => {
await this.reconnect();
});
}

openInbox(cb: (error: Error, mailbox: Connection.Box) => void): void {
this.imap.openBox('INBOX', false, cb);
}

start(): void {
this.imap.connect();
this.retries += 1;
return this.imap.connect();
}

isActive(): boolean {
if (this.imap?.state && this.imap.state === 'disconnected') {
return false;
}

return true;
return !!(this.imap?.state && this.imap.state !== 'disconnected');
}

stop(callback = new Function()): void {
logger.info('IMAP stop called');
this.imap.end();
logger.debug('IMAP stop called');
this.imap.removeAllListeners();
this.imap.once('end', () => {
logger.info('IMAP stopped');
logger.debug('IMAP stopped');
callback?.();
});
this.imap.end();
}

reconnect(): void {
async reconnect(): Promise<void> {
if (!this.isActive() && !this.canRetry()) {
logger.info(`Max retries reached for ${this.config.user}`);
this.stop();
return this.selfDisable();
}
if (this.backoff) {
clearTimeout(this.backoff);
this.backoffDurationMS = 3000;
}
const loop = (): void => {
this.start();
if (this.retries < this.options.maxRetries) {
this.retries += 1;
this.initialBackoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
logger.debug(`Reconnecting to ${this.config.user}: ${this.retries}`);
if (this.canRetry()) {
this.backoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.backoffDurationMS);
} else {
logger.error(`IMAP reconnection failed on inbox ${this.config.user}`);
logger.info(`IMAP reconnection failed on inbox ${this.config.user}`);
clearTimeout(this.backoff);
this.stop();
this.selfDisable();
return;
}
this.stop();
this.start();
};
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
this.backoff = setTimeout(loop, this.backoffDurationMS);
}

// Fetch all UNSEEN messages and pass them for further processing
getEmails(): void {
this.imap.search(this.options.filter, (err, newEmails) => {
logger.debug(`IMAP search on inbox ${this.config.user} returned ${newEmails.length} new emails: `, newEmails);
if (err) {
logger.error(err);
throw err;
}
// newEmails => array containing serials of unseen messages
if (newEmails.length > 0) {
const fetch = this.imap.fetch(newEmails, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});

fetch.on('message', (msg, seqno) => {
logger.debug('E-mail received', seqno, msg);

msg.on('body', (stream, type) => {
if (type.which !== '') {
return;
}
imapSearch(): Promise<number[]> {
return new Promise((resolve, reject) => {
const cb = (err: Error, results: number[]) => {
if (err) {
reject(err);
} else {
resolve(results);
}
};
this.imap.search(this.options.filter, cb);
});
}

simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}
this.emit('email', email);
});
});
parseEmails(stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): Promise<ParsedMail> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mail: ParsedMail) => {
if (err) {
reject(err);
} else {
resolve(mail);
}
};
simpleParser(stream, cb);
});
}

// On fetched each message, pass it further
msg.once('end', () => {
// delete message from inbox
imapFetch(emailIds: number[]): Promise<number[]> {
return new Promise((resolve, reject) => {
const out: number[] = [];
const messagecb = (msg: ImapMessage, seqno: number) => {
out.push(seqno);
const bodycb = (stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): void => {
simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}
this.emit('email', email);
if (this.options.deleteAfterRead) {
this.imap.seq.addFlags(seqno, 'Deleted', (err) => {
this.imap.seq.addFlags(email, 'Deleted', (err) => {
if (err) {
logger.error(`Mark deleted error: ${err}`);
logger.warn(`Mark deleted error: ${err}`);
}
});
}
});
});

fetch.once('error', (err) => {
logger.error(`Fetch error: ${err}`);
});
}
};
msg.once('body', bodycb);
};
const errorcb = (err: Error): void => {
logger.warn(`Fetch error: ${err}`);
reject(err);
};
const endcb = (): void => {
resolve(out);
};
const fetch = this.imap.fetch(emailIds, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});

fetch.on('message', messagecb);
fetch.on('error', errorcb);
fetch.on('end', endcb);
});
}

// Fetch all UNSEEN messages and pass them for further processing
async getEmails(): Promise<void> {
const emailIds = await this.imapSearch();
await this.imapFetch(emailIds);
}

canRetry(): boolean {
return this.retries < this.options.maxRetries || this.options.maxRetries === -1;
}

async selfDisable(): Promise<void> {
logger.info(`Disabling inbox ${this.inboxId}`);
// Again, if there's 2 inboxes with the same email, this will prevent looping over the already disabled one
// Active filter is just in case :)
await EmailInbox.findOneAndUpdate({ _id: this.inboxId, active: true }, { $set: { active: false } });
logger.info(`IMAP inbox ${this.inboxId} automatically disabled`);
}
}
5 changes: 3 additions & 2 deletions apps/meteor/server/features/EmailInbox/EmailInbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ export async function configureEmailInboxes(): Promise<void> {
},
{
deleteAfterRead: false,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._updatedAt]],
rejectBeforeTS: emailInboxRecord._updatedAt,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._createdAt]],
rejectBeforeTS: emailInboxRecord._createdAt,
markSeen: true,
maxRetries: emailInboxRecord.imap.maxRetries,
},
emailInboxRecord._id,
);

imap.on(
Expand Down
20 changes: 13 additions & 7 deletions apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async function uploadAttachment(attachment: Attachment, rid: string, visitorToke
}

export async function onEmailReceived(email: ParsedMail, inbox: string, department = ''): Promise<void> {
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`, email);
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`);
if (!email.from?.value?.[0]?.address) {
return;
}
Expand Down Expand Up @@ -161,12 +161,18 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme
room = await QueueManager.unarchiveRoom(room);
}

let msg = email.text;

if (email.html) {
// Try to remove the signature and history
msg = stripHtml(email.html.replace(/<div name="messageSignatureSection.+/s, '')).result;
}
// TODO: html => md with turndown
const msg = email.html
? stripHtml(email.html, {
dumpLinkHrefsNearby: {
enabled: true,
putOnNewLine: false,
wrapHeads: '(',
wrapTails: ')',
},
skipHtmlDecoding: false,
}).result
: email.text || '';

const rid = room?._id ?? Random.id();
const msgId = Random.id();
Expand Down
Loading

0 comments on commit 5125a86

Please sign in to comment.