Skip to content

Commit

Permalink
feat: added sqlite websocket support, cleanup, tests, optional rclone…
Browse files Browse the repository at this point in the history
… support
  • Loading branch information
titanism committed Nov 2, 2023
1 parent fe8f093 commit aee51f2
Show file tree
Hide file tree
Showing 71 changed files with 2,504 additions and 812 deletions.
3 changes: 3 additions & 0 deletions .env.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ HTTP_PORT=
#############################
## sqlite websocket server ##
#############################
SQLITE_WEBSOCKET_HOST=localhost
SQLITE_WEBSOCKET_PORT=3456
SQLITE_RCLONE_ENABLED=false
SQLITE_FTS5_ENABLED=false

################
## web server ##
Expand Down
3 changes: 3 additions & 0 deletions .env.schema
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ HTTP_PORT=
#############################
## sqlite websocket server ##
#############################
SQLITE_WEBSOCKET_HOST=
SQLITE_WEBSOCKET_PORT=
SQLITE_RCLONE_ENABLED=
SQLITE_FTS5_ENABLED=

################
## web server ##
Expand Down
79 changes: 34 additions & 45 deletions app/models/aliases.js
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,20 @@ Aliases.pre('save', async function (next) {
}
});

async function getStorageUsed(alias) {
let storageUsed = 0;

// async function getStorageUsed(alias) {
async function getStorageUsed(wsp, session) {
//
// calculate storage used across entire domain and its admin users domains
// (this is rudimentary storage system and has edge cases)
// (e.g. multi-accounts when users on team plan edge case)
//
const domain = await Domains.findById(alias.domain)
const domain = await Domains.findOne({ id: session.user.domain_id })
.populate('members.user', `id ${config.userFields.isBanned}`)
.lean()
.exec();
if (!domain)
throw Boom.notFound(
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', alias.locale)
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', 'en')
);

// filter out a domain's members without actual users
Expand All @@ -575,15 +574,15 @@ async function getStorageUsed(alias) {

if (adminMembers.length === 0)
throw Boom.notFound(
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', alias.locale)
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', 'en')
);

const ids = domain.members.map((m) => m.user);

// safeguard
if (ids.length === 0)
throw Boom.notFound(
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', alias.locale)
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', 'en')
);

// now get all domains where $elemMatch is the user id and group is admin
Expand All @@ -599,60 +598,50 @@ async function getStorageUsed(alias) {
// safeguard
if (domainIds.length === 0)
throw Boom.notFound(
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', alias.locale)
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', 'en')
);

if (domainIds.length > 0) {
const results = await this.aggregate([
{
$match: {
domain: {
$in: domainIds
}
}
},
{
$group: {
_id: '',
storageUsed: {
$sum: '$storageUsed'
}
}
}
]);
// results [ { _id: '', storageUsed: 91360 } ]
if (
results.length !== 1 ||
typeof results[0] !== 'object' ||
typeof results[0].storageUsed !== 'number'
)
throw Boom.notFound(
i18n.translateError('DOMAIN_DOES_NOT_EXIST_ANYWHERE', alias.locale)
);

storageUsed += results[0].storageUsed;
}
const aliasIds = await this.distinct('id', {
domain: { $in: domainIds }
});

// now get all aliases that belong to any of these domains and sum the storageQuota
return storageUsed;
const size = await wsp.request({
action: 'size',
session: { user: session.user },
alias_ids: aliasIds
});

return size;
}

Aliases.statics.getStorageUsed = getStorageUsed;

Aliases.statics.isOverQuota = async function (alias, size = 0) {
const storageUsed = await getStorageUsed.call(this, alias);
// Aliases.statics.isOverQuota = async function (alias, size = 0) {
Aliases.statics.isOverQuota = async function (
wsp,
session,
size = 0,
returnStorageUsed = false
) {
// const storageUsed = await getStorageUsed.call(this, alias);
const storageUsed = await getStorageUsed.call(this, wsp, session);

const isOverQuota = storageUsed + size > config.maxQuotaPerAlias;

// log fatal error to admins (so they will get notified by email/text)
if (isOverQuota) {
const err = new Error(
`Alias ID ${alias.id} is over quota (${storageUsed + size}/${
config.maxQuotaPerAlias
})`
`Alias ${session.user.username} (ID ${
session.user.alias_id
}) is over quota (${storageUsed + size}/${config.maxQuotaPerAlias})`
);
err.isCodeBug = true; // causes admin alerts
logger.fatal(err, { alias });
logger.fatal(err, { session });
}

if (returnStorageUsed) {
return { storageUsed, isOverQuota };
}

return isOverQuota;
Expand Down
60 changes: 56 additions & 4 deletions app/models/threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

const crypto = require('node:crypto');

const Database = require('better-sqlite3-multiple-ciphers');
const MessageHandler = require('wildduck/lib/message-handler');
const WebSocketAsPromised = require('websocket-as-promised');
const _ = require('lodash');
const mongoose = require('mongoose');
const safeStringify = require('fast-safe-stringify');
Expand Down Expand Up @@ -59,7 +61,17 @@ Threads.plugin(sqliteVirtualDB);
Threads.plugin(validationErrorTransform);

// code is inspired from wildduck (rewrite necessary for async/await and different db structure)
async function getThreadId(db, subject, mimeTree) {
// eslint-disable-next-line max-params, complexity
async function getThreadId(db, wsp, session, subject, mimeTree) {
if (!db || (!(db instanceof Database) && !db.wsp))
throw new TypeError('Database is missing');

if (!wsp || !(wsp instanceof WebSocketAsPromised))
throw new TypeError('WebSocketAsPromised missing');

if (typeof session?.user?.password !== 'string')
throw new TypeError('Session user and password missing');

let referenceIds = new Set(
[
[mimeTree.parsedHeader['message-id'] || []].flat().pop() || '',
Expand Down Expand Up @@ -97,8 +109,21 @@ async function getThreadId(db, subject, mimeTree) {
.map(() => `"value" = ?`)
.join(' or ')})) limit 1;`;

const values = [subject, ...referenceIds];

// reading so no need to lock
thread = db.prepare(sql).get([subject, ...referenceIds]);
if (db.wsp) {
thread = await wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql],
['get', values]
]
});
} else {
thread = db.prepare(sql).get(values);
}
}

if (thread) {
Expand Down Expand Up @@ -126,7 +151,20 @@ async function getThreadId(db, subject, mimeTree) {

// result of this will be like:
// `{ changes: 1, lastInsertRowid: 11 }`
db.prepare(sql.query).run(sql.values);

// use websockets if readonly
if (db.readonly) {
await wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['run', sql.values]
]
});
} else {
db.prepare(sql.query).run(sql.values);
}
}

{
Expand All @@ -138,7 +176,19 @@ async function getThreadId(db, subject, mimeTree) {
}
});

thread = db.prepare(sql.query).get(sql.values);
if (db.wsp) {
thread = await this.wsp.request({
action: 'stmt',
session: { user: session.user },
stmt: [
['prepare', sql.query],
['get', sql.values]
]
});
} else {
thread = db.prepare(sql.query).get(sql.values);
}

if (!thread) throw new TypeError('Thread does not exist');
thread = await convertResult(this, thread);
}
Expand All @@ -153,6 +203,8 @@ async function getThreadId(db, subject, mimeTree) {

thread = await this.create({
db,
wsp,
session,
ids: referenceIds,
subject
});
Expand Down
6 changes: 4 additions & 2 deletions app/views/encrypted-email/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ To accomplish writes with write-ahead-logging ("WAL") enabled (which drastically

We accomplish two-way communication with [WebSockets](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket):

* Primary servers use [ws](https://github.com/websockets/ws) as the `WebSocket` server.
* Secondary servers use [undici WebSocket](https://undici.nodejs.org/#/docs/api/WebSocket) as the `WebSocket` client.
* Primary servers use an instance of [ws](https://github.com/websockets/ws)'s `WebSocketServer` server.
* Secondary servers use an instance of [ws](https://github.com/websockets/ws)'s `WebSocket` client that is wrapped with [websocket-as-promised](https://github.com/vitalets/websocket-as-promised) and [reconnecting-websocket](https://github.com/pladaria/reconnecting-websocket). These two wrappers ensure that the `WebSocket` reconnects and can send and receive data for specific database writes.

If for any reason the Secondary servers have read issues with `rclone` (e.g. the mount failed or the `*.sqlite` file for the given alias cannot be found) – then it will fallback to use the `WebSocket` connection for reads.

For backups, we simply run the SQLite `backup` command periodically, which leverages your encrypted password from an in-memory IMAP connection.

Expand Down
2 changes: 1 addition & 1 deletion ava.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ module.exports = {
files: ['test/*.js', 'test/**/*.js', 'test/**/**/*.js', '!test/utils.js'],
// <https://github.com/lovell/sharp/issues/3164#issuecomment-1168328811>
workerThreads: familySync() !== GLIBC,
timeout: isCI ? '1m' : '15s'
timeout: isCI ? '3m' : '1.5m'
};
3 changes: 1 addition & 2 deletions config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ const config = {
allowlist: env.RATELIMIT_ALLOWLIST
},

// NOTE: during beta the limit is 1 GB per alias/domain/account (allows user to store Drafts/Sent Mail)
maxQuotaPerAlias: bytes('1GB'),
maxQuotaPerAlias: bytes('10GB'),

// <https://github.com/nodemailer/wildduck/issues/512>
maxMailboxes: 10000,
Expand Down
4 changes: 2 additions & 2 deletions config/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const env = require('./env');
module.exports = {
// eslint-disable-next-line no-undef
logger: typeof window === 'object' ? console : signale,
level: env.NODE_ENV === 'test' ? 'debug' : 'info',
level: env.NODE_ENV === 'development' ? 'debug' : 'info',
levels:
env.NODE_ENV === 'test'
env.NODE_ENV === 'development'
? ['trace', 'info', 'debug', 'warn', 'error', 'fatal']
: ['info', 'warn', 'error', 'fatal'],
showStack: env.AXE_SHOW_STACK,
Expand Down
19 changes: 16 additions & 3 deletions helpers/attachment-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class AttachmentStorage {
};
}

async create(db, attachment) {
async create(db, wsp, session, attachment) {
const hex = await this.calculateHashPromise(attachment.body);
attachment.hash = revHash(Buffer.from(hex, 'hex'));
attachment.counter = 1;
Expand All @@ -79,6 +79,8 @@ class AttachmentStorage {

const result = await Attachments.findOneAndUpdate(
db,
wsp,
session,
{
hash: attachment.hash
},
Expand All @@ -105,6 +107,8 @@ class AttachmentStorage {

// virtual helper
attachment.db = db;
attachment.wsp = wsp;
attachment.session = session;

return Attachments.create(attachment);
}
Expand All @@ -122,7 +126,8 @@ class AttachmentStorage {
return intoStream(attachment.body);
}

async deleteMany(db, attachmentIds, magic, lock = false) {
// eslint-disable-next-line max-params
async deleteMany(db, wsp, session, attachmentIds, magic, lock = false) {
if (Number.isNaN(magic) || typeof magic !== 'number') {
const err = new TypeError('Invalid magic');
err.attachmentIds = attachmentIds;
Expand All @@ -132,6 +137,8 @@ class AttachmentStorage {

const attachments = await Attachments.updateMany(
db,
wsp,
session,
{
hash: { $in: attachmentIds }
},
Expand Down Expand Up @@ -159,7 +166,13 @@ class AttachmentStorage {
attachments.map(async (attachment) => {
try {
if (attachment.counter === 0 && attachment.magic === 0)
await Attachments.deleteOne(db, { _id: attachment._id }, { lock });
await Attachments.deleteOne(
db,
wsp,
session,
{ _id: attachment._id },
{ lock }
);
} catch (err) {
logger.fatal(err, { attachment });
}
Expand Down
Loading

0 comments on commit aee51f2

Please sign in to comment.