Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Email inbox memory leak on connection failure #26850

Merged
merged 25 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1b0c72d
[Regression] Invalid Email Inbox no longer leaks event listeners, and…
cauefcr Sep 12, 2022
9a4712e
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 12, 2022
17f1d19
OC-184 jira linking
cauefcr Sep 12, 2022
76d94d9
Merge ssh://github.com/RocketChat/Rocket.Chat into regression/email-m…
cauefcr Sep 12, 2022
fa26b4b
forgotten comment
cauefcr Sep 12, 2022
f473e6a
Merge branch 'regression/email-memory-leak-and-attachment-issues' of …
cauefcr Sep 12, 2022
7191b2b
Update apps/meteor/server/email/IMAPInterceptor.ts
cauefcr Sep 13, 2022
60a5e94
Update apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts
cauefcr Sep 13, 2022
c76a2f5
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 13, 2022
d0cacb2
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 13, 2022
25b20f4
promisifying most callbacks
cauefcr Sep 14, 2022
7ec4deb
Merge github.com:RocketChat/Rocket.Chat into regression/email-memory-…
cauefcr Sep 14, 2022
6249880
Merge branch 'regression/email-memory-leak-and-attachment-issues' of …
cauefcr Sep 14, 2022
24448ec
type error
cauefcr Sep 14, 2022
5c46461
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 14, 2022
fe5f059
linter complaint
cauefcr Sep 14, 2022
886be36
code review changes
cauefcr Sep 20, 2022
b21d2ea
forgotten paren
cauefcr Sep 20, 2022
6b0c057
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 20, 2022
00b9388
review fixes
KevLehman Sep 21, 2022
dc7af33
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
KevLehman Sep 27, 2022
84caaae
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
a35086e
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
470b035
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
dae0e56
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/meteor/app/models/server/models/LivechatRooms.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ export class LivechatRooms extends Base {
const query = {
't': 'l',
'v.token': visitorToken,
'$or': [{ 'email.thread': { $elemMatch: { $in: emailThread } } }, { 'email.thread': new RegExp(emailThread.join('|')) }],
'$or': [
{ 'email.thread': { $elemMatch: { $in: emailThread } } },
{ 'email.thread': new RegExp(emailThread.map((t) => `"${t}"`).join('|')) },
],
...(departmentId && { departmentId }),
};

Expand Down
230 changes: 143 additions & 87 deletions apps/meteor/server/email/IMAPInterceptor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { EventEmitter } from 'events';

import type { ImapMessage, ImapMessageBodyInfo } from 'imap';
import IMAP from 'imap';
import type Connection from 'imap';
import type { ParsedMail } from 'mailparser';
import { simpleParser } from 'mailparser';
import { EmailInbox } from '@rocket.chat/models';

import { logger } from '../features/EmailInbox/logger';

Expand All @@ -24,7 +26,7 @@ export class IMAPInterceptor extends EventEmitter {

private config: IMAP.Config;

private initialBackoffDurationMS = 30000;
private backoffDurationMS = 3000;

private backoff: NodeJS.Timeout;

Expand All @@ -49,132 +51,186 @@ export class IMAPInterceptor extends EventEmitter {
...(imapConfig.tls && { tlsOptions: { servername: imapConfig.host } }),
...imapConfig,
});
this.retries = 0;
this.start();
}

openInbox(): Promise<Connection.Box> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mailbox: Connection.Box) => {
if (err) {
reject(err);
} else {
resolve(mailbox);
}
};
this.imap.openBox('INBOX', false, cb);
});
}

async start(): Promise<void> {
const errorBuild =
(location: string, severity: 'error' | 'info' | 'warn' | 'debug' = 'error') =>
(err: Error) => {
logger[severity](`IMAP connect: ${location}: ${err.message}`);
};
// On successfully connected.
this.imap.on('ready', () => {
if (this.imap.state !== 'disconnected') {
this.imap.on('ready', async () => {
if (this.isActive()) {
logger.info(`IMAP connected to ${this.config.user}`);
clearTimeout(this.backoff);
this.retries = 0;
this.openInbox((err) => {
if (err) {
logger.error(`Error occurred during imap on inbox ${this.config.user}: `, err);
throw err;
}
// fetch new emails & wait [IDLE]
this.getEmails();

// If new message arrived, fetch them
this.imap.on('mail', () => {
this.getEmails();
});
});
this.backoffDurationMS = 3000;
await this.openInbox();
this.imap.on('mail', () => this.getEmails().catch(errorBuild('getEmails', 'debug')));
} else {
logger.error(`IMAP did not connect on inbox ${this.config.user}`);
this.imap.end();
this.reconnect();
errorBuild('ready')(Error("Can't connect to IMAP server"));
}
});

this.imap.on('error', (err: Error) => {
logger.error(`Error occurred on inbox ${this.config.user}: `, err);
this.stop(() => this.reconnect());
logger.error(`IMAP error: ${err.message}`);
this.retries++;
this.reconnect();
});

this.imap.on('close', () => {
this.reconnect();
});
}

openInbox(cb: (error: Error, mailbox: Connection.Box) => void): void {
this.imap.openBox('INBOX', false, cb);
}

start(): void {
this.imap.connect();
this.retries += 1;
return this.imap.connect();
}

isActive(): boolean {
if (this.imap?.state && this.imap.state === 'disconnected') {
return false;
}

return true;
return !!(this.imap?.state && this.imap.state !== 'disconnected');
}

stop(callback = new Function()): void {
logger.info('IMAP stop called');
this.imap.end();
logger.debug('IMAP stop called');
this.imap.removeAllListeners();
this.imap.once('end', () => {
logger.info('IMAP stopped');
logger.debug('IMAP stopped');
callback?.();
});
this.imap.end();
}

reconnect(): void {
if (!this.isActive() && !this.canRetry()) {
logger.info(`Max retries reached for ${this.config.user}`);
this.stop();
this.selfDisable();
return;
}
if (this.backoff) {
clearTimeout(this.backoff);
this.backoffDurationMS = 3000;
}
const loop = (): void => {
this.start();
if (this.retries < this.options.maxRetries) {
this.retries += 1;
this.initialBackoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
logger.debug(`Reconnecting to ${this.config.user}: ${this.retries}`);
if (this.canRetry()) {
this.backoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.backoffDurationMS);
} else {
logger.error(`IMAP reconnection failed on inbox ${this.config.user}`);
logger.info(`IMAP reconnection failed on inbox ${this.config.user}`);
clearTimeout(this.backoff);
this.stop();
this.selfDisable();
return;
}
this.stop();
this.start();
};
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
this.backoff = setTimeout(loop, this.backoffDurationMS);
}

// Fetch all UNSEEN messages and pass them for further processing
getEmails(): void {
this.imap.search(this.options.filter, (err, newEmails) => {
logger.debug(`IMAP search on inbox ${this.config.user} returned ${newEmails.length} new emails: `, newEmails);
if (err) {
logger.error(err);
throw err;
}
// newEmails => array containing serials of unseen messages
if (newEmails.length > 0) {
const fetch = this.imap.fetch(newEmails, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});
imapSearch(): Promise<number[]> {
return new Promise((resolve, reject) => {
const cb = (err: Error, results: number[]) => {
if (err) {
reject(err);
} else {
resolve(results);
}
};
this.imap.search(this.options.filter, cb);
});
}

fetch.on('message', (msg, seqno) => {
logger.debug('E-mail received', seqno, msg);
parseEmails(stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): Promise<ParsedMail> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mail: ParsedMail) => {
if (err) {
reject(err);
} else {
resolve(mail);
}
};
simpleParser(stream, cb);
});
}

msg.on('body', (stream, type) => {
if (type.which !== '') {
imapFetch(emailIds: number[]): Promise<number[]> {
return new Promise((resolve, reject) => {
const out: number[] = [];
const messagecb = (msg: ImapMessage, seqno: number) => {
out.push(seqno);
const bodycb = (stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): void => {
simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}

simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}
this.emit('email', email);
});
this.emit('email', email);
});
};
msg.once('body', bodycb);
};
const errorcb = (err: Error): void => {
logger.warn(`Fetch error: ${err}`);
reject(err);
};
const endcb = (): void => {
resolve(out);
};
const fetch = this.imap.fetch(emailIds, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});

fetch.on('message', messagecb);
fetch.on('error', errorcb);
fetch.on('end', endcb);
});
}

// On fetched each message, pass it further
msg.once('end', () => {
// delete message from inbox
if (this.options.deleteAfterRead) {
this.imap.seq.addFlags(seqno, 'Deleted', (err) => {
if (err) {
logger.error(`Mark deleted error: ${err}`);
}
});
}
});
});

fetch.once('error', (err) => {
logger.error(`Fetch error: ${err}`);
// Fetch all UNSEEN messages and pass them for further processing
async getEmails(): Promise<void> {
const emailIds = await this.imapSearch();
const emailsFetched = await this.imapFetch(emailIds);

// this.imapSearch()
// .then(this.imapFetch)
// .then((emails) => {
// emails.forEach((email) => {
for (const email of emailsFetched) {
// this.emit('email', email.mail);
if (this.options.deleteAfterRead) {
this.imap.seq.addFlags(email, 'Deleted', (err) => {
if (err) {
logger.warn(`Mark deleted error: ${err}`);
}
});
}
});
}
}

canRetry(): boolean {
return this.retries < this.options.maxRetries || this.options.maxRetries === -1;
}

selfDisable(): void {
EmailInbox.findOneAndUpdate({ email: this.config.user }, { $set: { active: false } });
}
}
4 changes: 2 additions & 2 deletions apps/meteor/server/features/EmailInbox/EmailInbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export async function configureEmailInboxes(): Promise<void> {
},
{
deleteAfterRead: false,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._updatedAt]],
rejectBeforeTS: emailInboxRecord._updatedAt,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._createdAt]],
rejectBeforeTS: emailInboxRecord._createdAt,
markSeen: true,
maxRetries: emailInboxRecord.imap.maxRetries,
},
Expand Down
20 changes: 13 additions & 7 deletions apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async function uploadAttachment(attachment: Attachment, rid: string, visitorToke
}

export async function onEmailReceived(email: ParsedMail, inbox: string, department = ''): Promise<void> {
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`, email);
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`);
if (!email.from?.value?.[0]?.address) {
return;
}
Expand Down Expand Up @@ -161,12 +161,18 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme
room = await QueueManager.unarchiveRoom(room);
}

let msg = email.text;

if (email.html) {
// Try to remove the signature and history
msg = stripHtml(email.html.replace(/<div name="messageSignatureSection.+/s, '')).result;
}
// TODO: html => md with turndown
const msg = email.html
? stripHtml(email.html, {
dumpLinkHrefsNearby: {
enabled: true,
putOnNewLine: false,
wrapHeads: '(',
wrapTails: ')',
},
skipHtmlDecoding: false,
}).result
: email.text || '';

const rid = room?._id ?? Random.id();
const msgId = Random.id();
Expand Down
Loading