Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inbox Limit #4703

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "misskey",
"author": "mei23 <m@m544.net>",
"version": "10.102.666-m544",
"version": "10.102.666-m544-1",
"codename": "m544",
"repository": {
"type": "git",
Expand Down
57 changes: 57 additions & 0 deletions src/client/app/admin/views/queue.vue
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,38 @@
<ui-button @click="removeJobs('inbox')">{{ $t('clearJobs') }}</ui-button>
</ui-horizon-group >
</section>

<!-- InboxLazy -->
<section class="wptihjuy">
<header><fa :icon="faInbox"/> Inbox (Lazy)</header>
<ui-horizon-group inputs v-if="latestStats" class="fit-bottom">
<ui-input :value="latestStats.inboxLazy.activeSincePrevTick" type="text" readonly>
<span>Process</span>
<template #prefix><fa :icon="fasPlayCircle"/></template>
<template #suffix>jobs/tick</template>
</ui-input>
<ui-input :value="latestStats.inboxLazy.active" type="text" readonly>
<span>Active</span>
<template #prefix><fa :icon="farPlayCircle"/></template>
<template #suffix>{{ `/ ${latestStats.inboxLazy.limit} jobs` }}</template>
</ui-input>
<ui-input :value="latestStats.inboxLazy.waiting" type="text" readonly>
<span>Waiting</span>
<template #prefix><fa :icon="faStopCircle"/></template>
<template #suffix>jobs</template>
</ui-input>
<ui-input :value="latestStats.inboxLazy.delayed" type="text" readonly>
<span>Delayed</span>
<template #prefix><fa :icon="faStopwatch"/></template>
<template #suffix>jobs</template>
</ui-input>
</ui-horizon-group>
<div ref="inboxLazyChart" class="chart"></div>
<ui-horizon-group v-if="$store.getters.isAdminOrModerator" inputs class="fit-bottom">
<ui-button @click="promoteJobs('inboxLazy')">{{ $t('promoteJobs') }}</ui-button>
<ui-button @click="removeJobs('inboxLazy')">{{ $t('clearJobs') }}</ui-button>
</ui-horizon-group >
</section>
</ui-card>

<!-- job queue list -->
Expand All @@ -78,6 +110,7 @@
<template #label>{{ $t('queue') }}</template>
<option value="deliver">{{ $t('domains.deliver') }}</option>
<option value="inbox">{{ $t('domains.inbox') }}</option>
<option value="inboxLazy">{{ $t('domains.inboxLazy') }}</option>
<option value="db">{{ $t('domains.db') }}</option>
</ui-select>
<ui-select v-model="state">
Expand Down Expand Up @@ -106,6 +139,9 @@
<template v-if="domain === 'inbox'">
<span>{{ job.data.activity.id }}</span>
</template>
<template v-if="domain === 'inboxLazy'">
<span>{{ job.data.activity.id }}</span>
</template>
<template v-if="domain === 'db'">
<span>{{ job.name }}</span>
</template>
Expand Down Expand Up @@ -140,6 +176,7 @@ export default Vue.extend({
stats: [],
deliverChart: null,
inboxChart: null,
inboxLazyChart: null,
jobs: [],
jobsLimit: 1000,
jobsFetched: Date.now(),
Expand Down Expand Up @@ -193,6 +230,23 @@ export default Vue.extend({
type: 'line',
data: stats.map((x, i) => ({ x: i, y: x.inbox.delayed }))
}]);
this.inboxLazyChart.updateSeries([{
name: 'Process',
type: 'area',
data: stats.map((x, i) => ({ x: i, y: x.inboxLazy.activeSincePrevTick }))
}, {
name: 'Active',
type: 'area',
data: stats.map((x, i) => ({ x: i, y: x.inboxLazy.active }))
}, {
name: 'Waiting',
type: 'line',
data: stats.map((x, i) => ({ x: i, y: x.inboxLazy.waiting }))
}, {
name: 'Delayed',
type: 'line',
data: stats.map((x, i) => ({ x: i, y: x.inboxLazy.delayed }))
}]);
this.deliverChart.updateSeries([{
name: 'Process',
type: 'area',
Expand Down Expand Up @@ -291,9 +345,11 @@ export default Vue.extend({
});

this.inboxChart = new ApexCharts(this.$refs.inboxChart, chartOpts('a'));
this.inboxLazyChart = new ApexCharts(this.$refs.inboxLazyChart, chartOpts('a'));
this.deliverChart = new ApexCharts(this.$refs.deliverChart, chartOpts('b'));

this.inboxChart.render();
this.inboxLazyChart.render();
this.deliverChart.render();

const connection = this.$root.stream.useSharedConnection('queueStats');
Expand All @@ -307,6 +363,7 @@ export default Vue.extend({
this.$once('hook:beforeDestroy', () => {
connection.dispose();
this.inboxChart.destroy();
this.inboxLazyChart.destroy();
this.deliverChart.destroy();
});
},
Expand Down
18 changes: 16 additions & 2 deletions src/daemons/queue-stats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Deque from 'double-ended-queue';
import Xev from 'xev';
import { deliverQueue, inboxQueue } from '../queue/queues';
import { deliverQueue, inboxQueue, inboxLazyQueue } from '../queue/queues';
import config from '../config';
import { getWorkerStrategies } from '..';
import { deliverJobConcurrency, inboxJobConcurrency } from '../queue';
Expand All @@ -24,6 +24,7 @@ export default function() {

let activeDeliverJobs = 0;
let activeInboxJobs = 0;
let activeInboxLazyJobs = 0;

deliverQueue.on('global:active', () => {
activeDeliverJobs++;
Expand All @@ -33,9 +34,14 @@ export default function() {
activeInboxJobs++;
});

inboxLazyQueue.on('global:active', () => {
activeInboxLazyJobs++;
});

async function tick() {
const deliverJobCounts = await deliverQueue.getJobCounts();
const inboxJobCounts = await inboxQueue.getJobCounts();
const inboxLazyJobCounts = await inboxLazyQueue.getJobCounts();

const stats = {
deliver: {
Expand All @@ -51,7 +57,14 @@ export default function() {
active: inboxJobCounts.active,
waiting: inboxJobCounts.waiting,
delayed: inboxJobCounts.delayed
}
},
inboxLazy: {
limit: 1 * workers,
activeSincePrevTick: activeInboxLazyJobs,
active: inboxLazyJobCounts.active,
waiting: inboxLazyJobCounts.waiting,
delayed: inboxLazyJobCounts.delayed
},
};

ev.emit('queueStats', stats);
Expand All @@ -61,6 +74,7 @@ export default function() {

activeDeliverJobs = 0;
activeInboxJobs = 0;
activeInboxLazyJobs = 0;
}

tick();
Expand Down
44 changes: 43 additions & 1 deletion src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as httpSignature from 'http-signature';
import config from '../config';
import { InboxInfo, InboxRequestData, WebpushDeliverJobData } from './types';
import { deliverQueue, webpushDeliverQueue, inboxQueue, dbQueue } from './queues';
import { deliverQueue, webpushDeliverQueue, inboxQueue, inboxLazyQueue, dbQueue } from './queues';
import { getJobInfo } from './get-job-info';
import processDeliver from './processors/deliver';
import processWebpushDeliver from './processors/webpushDeliver';
Expand All @@ -19,6 +19,7 @@ import { cpus } from 'os';
const deliverLogger = queueLogger.createSubLogger('deliver');
const webpushDeliverLogger = queueLogger.createSubLogger('webpushDeliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const inboxLazyLogger = queueLogger.createSubLogger('inboxLazy');
const dbLogger = queueLogger.createSubLogger('db');

let deliverDeltaCounts = 0;
Expand Down Expand Up @@ -71,6 +72,20 @@ inboxQueue
.on('error', (error) => inboxLogger.error(`error ${error}`))
.on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));

inboxLazyQueue
.on('waiting', (jobId) => {
inboxLazyLogger.debug(`waiting id=${jobId}`);
})
.on('active', (job) => inboxLazyLogger.info(`active ${getJobInfo(job, true)} activity=${job.data.activity ? job.data.activity.id : 'none'}`))
.on('completed', (job, result) => inboxLazyLogger.info(`completed(${result}) ${getJobInfo(job, true)} activity=${job.data.activity ? job.data.activity.id : 'none'}`))
.on('failed', (job, err) => {
const msg = `failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`;
if (job.opts.attempts && (job.opts.attempts > job.attemptsMade)) job.log(msg);
inboxLazyLogger.warn(msg);
})
.on('error', (error) => inboxLazyLogger.error(`error ${error}`))
.on('stalled', (job) => inboxLazyLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));

dbQueue
.on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => dbLogger.info(`${job.name} active ${getJobInfo(job, true)}`))
Expand Down Expand Up @@ -159,6 +174,25 @@ export function inbox(activity: IActivity, signature: httpSignature.IParsedSigna
});
}

export function inboxLazy(activity: IActivity, signature: httpSignature.IParsedSignature, request: InboxRequestData) {
const data = {
activity,
signature,
request,
};

return inboxLazyQueue.add(data, {
attempts: 1,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff'
},
removeOnComplete: true,
removeOnFail: true
});
}


export function createDeleteNotesJob(user: IUser) {
return dbQueue.add('deleteNotes', {
user: { _id: `${user._id}` }
Expand Down Expand Up @@ -334,6 +368,7 @@ export default function() {
deliverQueue.process(deliverJobConcurrency, processDeliver);
webpushDeliverQueue.process(8, processWebpushDeliver);
inboxQueue.process(inboxJobConcurrency, processInbox);
inboxLazyQueue.process(1, processInbox);
processDb(dbQueue);
}

Expand All @@ -352,6 +387,13 @@ export function destroy(domain?: string) {
inboxQueue.clean(0, 'delayed');
}

if (domain == null || domain === 'inboxLazy') {
inboxLazyQueue.once('cleaned', (jobs, status) => {
inboxLazyLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
inboxLazyQueue.clean(0, 'delayed');
}

if (domain === 'db') {
dbQueue.once('cleaned', (jobs, status) => {
dbLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
Expand Down
1 change: 1 addition & 0 deletions src/queue/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import { DeliverJobData, WebpushDeliverJobData, InboxJobData, DbJobData } from '
export const deliverQueue = initialize<DeliverJobData>('deliver', config.deliverJobPerSec || -1);
export const webpushDeliverQueue = initialize<WebpushDeliverJobData>('webpushDeliver', -1);
export const inboxQueue = initialize<InboxJobData>('inbox', config.inboxJobPerSec || -1);
export const inboxLazyQueue = initialize<InboxJobData>('inboxLazy', -1);
export const dbQueue = initialize<DbJobData>('db');
20 changes: 12 additions & 8 deletions src/server/activitypub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Outbox, { packActivity } from './activitypub/outbox';
import Followers from './activitypub/followers';
import Following from './activitypub/following';
import Featured from './activitypub/featured';
import { inbox as processInbox } from '../queue';
import { inbox as processInbox, inboxLazy as processInboxLazy } from '../queue';
import { isSelfHost } from '../misc/convert-host';
import NoteReaction from '../models/note-reaction';
import { renderLike } from '../remote/activitypub/renderer/like';
Expand All @@ -27,6 +27,8 @@ import { toUnicode } from 'punycode/';
import Logger from '../services/logger';
import limiter from './api/limiter';
import { IEndpoint } from './api/endpoints';
import { IActivity } from '../remote/activitypub/type';
import { toSingle } from '../prelude/array';

const logger = new Logger('activitypub');

Expand Down Expand Up @@ -65,15 +67,18 @@ async function inbox(ctx: Router.RouterContext) {
}

const actor = signature.keyId.replace(/[^0-9A-Za-z]/g, '_');
const activity = ctx.request.body as IActivity;

if (actor) {
let lazy = false;

if (actor && ['Delete', 'Undo'].includes(toSingle(activity.type)!)) {
const ep = {
name: `inboxx120-${actor}`,
name: `inboxDeletex60-${actor}`,
exec: null,
meta: {
limit: {
duration: 120 * 1000,
max: 120,
duration: 60 * 1000,
max: 10, //TODO
}
}
} as IEndpoint;
Expand All @@ -82,12 +87,11 @@ async function inbox(ctx: Router.RouterContext) {
await limiter(ep, undefined, undefined);
} catch (e) {
console.log(`InboxLimit: ${actor}`);
ctx.status = 429;
return;
lazy = true;
}
}

const queue = await processInbox(ctx.request.body, signature, {
const queue = await (lazy ? processInboxLazy : processInbox)(activity, signature, {
ip: ctx.request.ip
});

Expand Down
3 changes: 2 additions & 1 deletion src/server/api/endpoints/admin/queue/job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import $ from 'cafy';
import define from '../../../define';
import { deliverQueue, inboxQueue, dbQueue } from '../../../../../queue/queues';
import { deliverQueue, inboxQueue, inboxLazyQueue, dbQueue } from '../../../../../queue/queues';
import * as Bull from 'bull';

export const meta = {
Expand All @@ -24,6 +24,7 @@ export default define(meta, async (ps) => {
const queue =
ps.domain === 'deliver' ? deliverQueue :
ps.domain === 'inbox' ? inboxQueue :
ps.domain === 'inboxLazy' ? inboxLazyQueue :
ps.domain === 'db' ? dbQueue :
null;

Expand Down
3 changes: 2 additions & 1 deletion src/server/api/endpoints/admin/queue/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import $ from 'cafy';
import define from '../../../define';
import { deliverQueue, inboxQueue, dbQueue } from '../../../../../queue/queues';
import { deliverQueue, inboxQueue, inboxLazyQueue, dbQueue } from '../../../../../queue/queues';
import * as Bull from 'bull';

export const meta = {
Expand Down Expand Up @@ -29,6 +29,7 @@ export default define(meta, async (ps) => {
const queue =
ps.domain === 'deliver' ? deliverQueue :
ps.domain === 'inbox' ? inboxQueue :
ps.domain === 'inboxLazy' ? inboxLazyQueue :
ps.domain === 'db' ? dbQueue :
null;

Expand Down
3 changes: 2 additions & 1 deletion src/server/api/endpoints/admin/queue/promote.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import $ from 'cafy';
import define from '../../../define';
import { deliverQueue, inboxQueue } from '../../../../../queue/queues';
import { deliverQueue, inboxQueue, inboxLazyQueue } from '../../../../../queue/queues';

export const meta = {
tags: ['admin'],
Expand All @@ -24,6 +24,7 @@ export default define(meta, async (ps) => {
const queue =
ps.domain === 'deliver' ? deliverQueue :
ps.domain === 'inbox' ? inboxQueue :
ps.domain === 'inboxLazy' ? inboxLazyQueue :
null;

if (queue == null) throw(`invalid domain`);
Expand Down
6 changes: 4 additions & 2 deletions src/server/api/endpoints/admin/queue/stats.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import define from '../../../define';
import { deliverQueue, inboxQueue } from '../../../../../queue/queues';
import { deliverQueue, inboxQueue, inboxLazyQueue } from '../../../../../queue/queues';

export const meta = {
tags: ['admin'],
Expand All @@ -13,9 +13,11 @@ export const meta = {
export default define(meta, async (ps) => {
const deliverJobCounts = await deliverQueue.getJobCounts();
const inboxJobCounts = await inboxQueue.getJobCounts();
const inboxLazyJobCounts = await inboxLazyQueue.getJobCounts();

return {
deliver: deliverJobCounts,
inbox: inboxJobCounts
inbox: inboxJobCounts,
inboxLazy: inboxLazyJobCounts,
};
});