Skip to content

Commit

Permalink
fix: fixed sqlite/websocket argument invocation issues, fixed entries…
Browse files Browse the repository at this point in the history
… on fetch
  • Loading branch information
titanism committed Nov 2, 2023
1 parent aee51f2 commit d5fb1d4
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 73 deletions.
2 changes: 1 addition & 1 deletion app/models/threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async function getThreadId(db, wsp, session, subject, mimeTree) {
});

if (db.wsp) {
thread = await this.wsp.request({
thread = await wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
Expand Down
12 changes: 7 additions & 5 deletions helpers/create-websocket-as-promised.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const { encrypt } = require('#helpers/encrypt-decrypt');
// partysocket.WebSocket.prototype._debug = (...args) =>
// logger.debug('partysocket', { args });

ReconnectingWebSocket.prototype._debug = (...args) => {
if (config.env === 'development')
logger.debug('reconnectingwebsocket', { args });
ReconnectingWebSocket.prototype._debug = () => {
// if (config.env === 'development')
// logger.debug('reconnectingwebsocket', { args });
};

function createWebSocketAsPromised(options = {}) {
Expand Down Expand Up @@ -57,8 +57,8 @@ function createWebSocketAsPromised(options = {}) {
// return new partysocket.WebSocket(url, [], {
return new ReconnectingWebSocket(url, [], {
// <https://github.com/pladaria/reconnecting-websocket#available-options>
WebSocket,
debug: config.env === 'development'
WebSocket
// debug: config.env === 'development'
});
},
packMessage: (data) => safeStringify(data),
Expand Down Expand Up @@ -88,6 +88,7 @@ function createWebSocketAsPromised(options = {}) {
//
// bind event listeners
//
/*
if (config.env === 'development') {
for (const event of [
'onOpen',
Expand All @@ -103,6 +104,7 @@ function createWebSocketAsPromised(options = {}) {
);
}
}
*/

// <https://github.com/vitalets/websocket-as-promised/issues/46>
wsp.request = async function (data) {
Expand Down
3 changes: 2 additions & 1 deletion helpers/get-database.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async function setupPragma(db, session) {
// safeguards
if (!db.open) throw new TypeError('Database is not open');
if (db.memory) throw new TypeError('Memory database');
db.pragma(`cipher='aes256cbc'`);
// db.pragma(`cipher='aes256cbc'`);
db.pragma(`cipher='chacha20'`);
db.pragma(`key='${decrypt(session.user.password)}'`);
db.pragma('journal_mode=WAL');
// <https://litestream.io/tips/#busy-timeout>
Expand Down
11 changes: 11 additions & 0 deletions helpers/imap-notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
const { EventEmitter } = require('node:events');

const Axe = require('axe');
const Database = require('better-sqlite3-multiple-ciphers');
const WebSocketAsPromised = require('websocket-as-promised');
const _ = require('lodash');
const safeStringify = require('fast-safe-stringify');

Expand Down Expand Up @@ -116,6 +118,15 @@ class IMAPNotifier extends EventEmitter {

// eslint-disable-next-line complexity, max-params
async addEntries(db, wsp, session, mailboxId, entries, lock = false) {
if (!db || (!(db instanceof Database) && !db.wsp))
throw new TypeError('Database is missing');

if (!wsp || !(wsp instanceof WebSocketAsPromised))
throw new TypeError('WebSocketAsPromised missing');

if (typeof session?.user?.password !== 'string')
throw new TypeError('Session user and password missing');

if (entries && !Array.isArray(entries)) {
entries = [entries];
} else if (!entries || entries.length === 0) {
Expand Down
67 changes: 17 additions & 50 deletions helpers/imap/on-fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const refineAndLogError = require('#helpers/refine-and-log-error');
const { convertResult } = require('#helpers/mongoose-to-sqlite');

// const LIMITED_PROJECTION_KEYS = new Set(['_id', 'flags', 'modseq', 'uid']);
const MAX_PAGE_SIZE = 2500;
const MAX_BULK_WRITE_SIZE = 150;

const builder = new Builder();
Expand Down Expand Up @@ -76,7 +75,6 @@ async function getMessages(db, wsp, session, server, opts = {}) {
throw new Error('Total bytes must be a number >= 0');

let queryAll;
let count = 0;

const pageQuery = { ...query };

Expand Down Expand Up @@ -175,48 +173,6 @@ async function getMessages(db, wsp, session, server, opts = {}) {
if (!socket || socket?.destroyed || socket?.readyState !== 'open')
throw new SocketError();

// break out of cursor if no message retrieved
// TODO: does this actually occur as an edge case (?)
if (!message) {
server.logger.fatal('message not fetched', {
session,
options,
query,
pageQuery
});

// may have more pages, try to fetch more
if (count === MAX_PAGE_SIZE) {
// eslint-disable-next-line no-await-in-loop
const results = await getMessages(db, wsp, session, server, {
options,
projection,
query,
mailbox,
alias,
attachmentStorage,
entries,
bulkWrite,
successful,
lastUid,
rowCount,
totalBytes
});
// re-assign variables (or we could return early here with `results`)
entries = results.entries;
bulkWrite = results.bulkWrite;
successful = results.successful;
lastUid = results.lastUid;
rowCount = results.rowCount;
totalBytes = results.totalBytes;
}

break;
}

// store counter for how many messages we processed
count++;

// store reference to last message uid
lastUid = message.uid;

Expand All @@ -226,8 +182,9 @@ async function getMessages(db, wsp, session, server, opts = {}) {
typeof session?.selected?.uidList === 'object' &&
Array.isArray(session.selected.uidList) &&
!session.selected.uidList.includes(message.uid)
)
) {
continue;
}

const markAsSeen =
options.markAsSeen && message.flags && !message.flags.includes('\\Seen');
Expand Down Expand Up @@ -311,7 +268,7 @@ async function getMessages(db, wsp, session, server, opts = {}) {
rowCount++;

// add operation to bulkWrite
if (!markAsSeen)
if (!markAsSeen) {
bulkWrite.push({
updateOne: {
filter: {
Expand All @@ -329,13 +286,23 @@ async function getMessages(db, wsp, session, server, opts = {}) {
}
}
});
entries.push({
ignore: session.id,
command: 'FETCH',
uid: message.uid,
message: message._id,
mailbox: mailbox._id,
thread: message.thread,
flags: message.flags
});
}

if (bulkWrite.length >= MAX_BULK_WRITE_SIZE) {
try {
// eslint-disable-next-line no-await-in-loop
await Messages.bulkWrite(db, wsp, session, bulkWrite, {
ordered: false,
w: 1
// ordered: false,
// w: 1
});
bulkWrite = [];
if (entries.length >= MAX_BULK_WRITE_SIZE) {
Expand Down Expand Up @@ -430,8 +397,8 @@ async function onFetch(mailboxId, options, session, fn) {
// mark messages as Seen
if (results.bulkWrite.length > 0)
await Messages.bulkWrite(db, this.wsp, session, results.bulkWrite, {
ordered: false,
w: 1
// ordered: false,
// w: 1
});

// close the connection
Expand Down
6 changes: 6 additions & 0 deletions helpers/imap/on-move.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ async function onMove(mailboxId, update, session, fn) {
// eslint-disable-next-line no-await-in-loop
await this.server.notifier.addEntries(
db,
this.wsp,
session,
targetMailbox._id,
existEntries,
lock
Expand Down Expand Up @@ -351,6 +353,8 @@ async function onMove(mailboxId, update, session, fn) {
try {
await this.server.notifier.addEntries(
db,
this.wsp,
session,
mailbox,
expungeEntries,
lock
Expand All @@ -366,6 +370,8 @@ async function onMove(mailboxId, update, session, fn) {
try {
await this.server.notifier.addEntries(
db,
this.wsp,
session,
targetMailbox,
existEntries,
lock
Expand Down
3 changes: 1 addition & 2 deletions helpers/imap/on-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ async function onStore(mailboxId, update, session, fn) {
});

entries.push({
// TODO: do we want to ignore this (?)
// ignore: session.id,
// TODO: ignore: session.id,
command: 'FETCH',
uid: message.uid,
message: message._id,
Expand Down
16 changes: 13 additions & 3 deletions helpers/on-auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,17 @@ async function onAuth(auth, session, fn) {
});

try {
const paths = await Mailboxes.distinct(db, this.wsp, session, 'path', {
alias: alias._id
});
const paths = await Mailboxes.distinct(
db,
this.wsp,
{
...session,
user
},
'path',
{}
);

const required = [];
for (const path of REQUIRED_PATHS) {
if (!paths.includes(path)) required.push(path);
Expand All @@ -274,6 +282,8 @@ async function onAuth(auth, session, fn) {
required.map((path) => ({
// virtual helper
db,
wsp: this.wsp,
session: { ...session, user },

path,
retention:
Expand Down
10 changes: 5 additions & 5 deletions imap-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ class IMAP {
// in test/development listen for locking and releasing
// <https://github.com/nodemailer/ioredfour/blob/0bc1035c34c548b2d3058352c588dc20422cfb96/lib/ioredfour.js#L48-L49>
//
if (config.env === 'development') {
this.lock._redisSubscriber.on('message', (channel, message) => {
logger.debug('lock message received', { channel, message });
});
}
// if (config.env === 'development') {
// this.lock._redisSubscriber.on('message', (channel, message) => {
// logger.debug('lock message received', { channel, message });
// });
// }

//
// NOTE: it is using a lock under `wildduck` prefix
Expand Down
3 changes: 2 additions & 1 deletion sqlite-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class SQLite {
namespace: 'imap_lock'
});

this.wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
// this.wss = new WebSocketServer({ noServer: true, perMessageDeflate: true });
this.wss = new WebSocketServer({ noServer: true });
this.server = server;

// bind listen/close to this
Expand Down
11 changes: 6 additions & 5 deletions sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@

// eslint-disable-next-line import/no-unassigned-import
require('#config/env');
// require('#config/mongoose');
// eslint-disable-next-line import/no-unassigned-import
require('#config/mongoose');

const process = require('node:process');
const { promisify } = require('node:util');

const Graceful = require('@ladjs/graceful');
const Redis = require('@ladjs/redis');
const ip = require('ip');
// const mongoose = require('mongoose');
const mongoose = require('mongoose');
const sharedConfig = require('@ladjs/shared-config');

const SQLite = require('./sqlite-server');

const logger = require('#helpers/logger');
// const setupMongoose = require('#helpers/setup-mongoose');
const setupMongoose = require('#helpers/setup-mongoose');

const breeSharedConfig = sharedConfig('BREE');
const client = new Redis(breeSharedConfig.redis, logger);

const sqlite = new SQLite({ client });

const graceful = new Graceful({
// mongooses: [mongoose],
mongooses: [mongoose],
servers: [sqlite.server],
redisClients: [client],
logger,
Expand All @@ -44,7 +45,7 @@ graceful.listen();
`SQLite WebSocket server listening on ${port} (LAN: ${ip.address()}:${port})`,
{ hide_meta: true }
);
// await setupMongoose(logger);
await setupMongoose(logger);
} catch (err) {
logger.error(err);

Expand Down

0 comments on commit d5fb1d4

Please sign in to comment.