From e42fdfb5688683731932d9be5aa3a21931cd7dda Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 8 Jul 2024 21:05:55 +0200 Subject: [PATCH 1/2] api: projects: long running job cleanup --- packages/api/src/controllers/project.ts | 121 +++++++++++++++- .../api/src/jobs/projects-cleanup.test.ts | 129 ++++++++++++++++++ packages/api/src/jobs/projects-cleanup.ts | 35 +++++ packages/api/src/parse-cli.ts | 5 + 4 files changed, 287 insertions(+), 3 deletions(-) create mode 100644 packages/api/src/jobs/projects-cleanup.test.ts create mode 100644 packages/api/src/jobs/projects-cleanup.ts diff --git a/packages/api/src/controllers/project.ts b/packages/api/src/controllers/project.ts index 720f85c03..34e5e0af0 100644 --- a/packages/api/src/controllers/project.ts +++ b/packages/api/src/controllers/project.ts @@ -1,6 +1,6 @@ -import { Router } from "express"; +import { Request, Router } from "express"; import { authorizer } from "../middleware"; -import { db } from "../store"; +import { db, jobsDb } from "../store"; import { v4 as uuid } from "uuid"; import { makeNextHREF, @@ -11,7 +11,11 @@ import { import { NotFoundError, ForbiddenError } from "../store/errors"; import sql from "sql-template-strings"; import { WithID } from "../store/types"; -import { Project } from "../schema/types"; +import { Asset, Project } from "../schema/types"; +import { CliArgs } from "../parse-cli"; +import Queue from "../store/queue"; +import logger from "../logger"; +import { DB } from "../store/db"; const app = Router(); @@ -177,4 +181,115 @@ app.delete("/:id", authorizer({}), async (req, res) => { res.end(); }); +app.post( + "/job/projects-cleanup", + authorizer({ anyAdmin: true }), + async (req, res) => { + // import the job dynamically to avoid circular dependencies + const { default: projectsCleanup } = await import( + "../jobs/projects-cleanup" + ); + + const limit = parseInt(req.query.limit?.toString()) || 1000; + + const { cleanedUp } = await projectsCleanup( + { + ...req.config, + projectsCleanupLimit: limit, + }, + req, + { jobsDb }, + ); + + res.status(200); + res.json({ cleanedUp }); + }, +); + +export function triggerCleanUpProjectsJob( + projects: Project[], + req: Request, +): [Project[], Promise] { + if (!projects.length) { + return [projects, Promise.resolve()]; + } + + const jobPromise = Promise.resolve().then(async () => { + try { + await Promise.all(projects.map((s) => cleanUpProject(jobsDb, s, req))); + } catch (err) { + const ids = projects.map((s) => s.id); + logger.error(`Error cleaning up projectId=${ids} err=`, err); + } + }); + + return [projects, jobPromise]; +} + +async function cleanUpProject(db: DB, project: Project, req: Request) { + let [assets] = await db.asset.find({ + filters: { + projectId: project.id, + deleted: false, + }, + }); + + let [streams] = await db.stream.find({ + filters: { + projectId: project.id, + deleted: false, + }, + }); + + let [signingKeys] = await db.signingKey.find({ + filters: { + projectId: project.id, + deleted: false, + }, + }); + + let [webhooks] = await db.webhook.find({ + filters: { + projectId: project.id, + deleted: false, + }, + }); + + let [sessions] = await db.session.find({ + filters: { + projectId: project.id, + deleted: false, + }, + }); + + for (const asset of assets) { + await req.taskScheduler.deleteAsset(asset.id); + } + + for (const stream of streams) { + await db.stream.update(stream.id, { + deleted: true, + }); + } + + for (const signingKey of signingKeys) { + await db.signingKey.update(signingKey.id, { + deleted: true, + disabled: true, + }); + } + + for (const webhook of webhooks) { + await db.webhook.update(webhook.id, { + deleted: true, + }); + } + + for (const session of sessions) { + await db.session.update(session.id, { + deleted: true, + }); + } +} + export default app; diff --git a/packages/api/src/jobs/projects-cleanup.test.ts b/packages/api/src/jobs/projects-cleanup.test.ts new file mode 100644 index 000000000..93fa7af3c --- /dev/null +++ b/packages/api/src/jobs/projects-cleanup.test.ts @@ -0,0 +1,129 @@ +import { v4 as uuid } from "uuid"; + +import * as appRouter from "../app-router"; +import * as projectsController from "../controllers/project"; +import { cache } from "../store/cache"; +import { DB } from "../store/db"; +import Queue, { RabbitQueue } from "../store/queue"; +import { rabbitMgmt } from "../test-helpers"; +import params, { testId } from "../test-params"; +import projectsCleanup from "./projects-cleanup"; +import { Request } from "express"; + +describe("projects-cleanup", () => { + // There are further functional tests under controllers/stream.test.ts "active clean-up" + + let db: DB; + let initClientsSpy: jest.SpyInstance; + + beforeAll(async () => { + db = new DB(); + await db.start({ postgresUrl: params.postgresUrl }); + await rabbitMgmt.createVhost(testId); + }); + + afterAll(async () => { + await rabbitMgmt.deleteVhost(testId); + }); + + let queue: Queue; + + beforeEach(() => { + const originalInitClient = appRouter.initClients; + initClientsSpy = jest + .spyOn(appRouter, "initClients") + .mockImplementation(async (params, name) => { + const result = await originalInitClient(params, name); + queue = result.queue; + jest.spyOn(queue, "consume"); + jest.spyOn(queue, "delayedPublishWebhook"); + return result; + }); + }); + + afterEach(() => { + jest.restoreAllMocks(); + cache.storage = null; + queue?.close(); + queue = null; + }); + + const mockProject = (deleted: boolean, deletedAt: number) => { + return { + id: uuid(), + name: "project1", + deleted, + deletedAt, + }; + }; + + const mockStream = (projectId: string) => { + return { + id: uuid(), + name: "stream1", + playbackId: uuid(), + streamKey: uuid(), + projectId, + }; + }; + + const mockAsset = (projectId: string) => { + return { + id: uuid(), + name: "asset1", + projectId, + source: { + type: "url" as const, + url: "someSource", + }, + }; + }; + + it("it should deleted related assets and streams", async () => { + const triggerSpy = jest + .spyOn(projectsController, "triggerCleanUpProjectsJob") + .mockImplementation(() => [[], Promise.resolve()]); + + const now = Date.now(); + const project = await db.project.create(mockProject(false, now - 10000)); + + let streamsToDelete = []; + let assetsToDelete = []; + // 3 streams + for (let i = 0; i < 3; i++) { + const stream = await db.stream.create(mockStream(project.id)); + streamsToDelete.push(stream.id); + } + // 3 assets + for (let i = 0; i < 3; i++) { + const asset = await db.asset.create(mockAsset(project.id)); + assetsToDelete.push(asset.id); + } + + let mockReq: Request = { + user: { + id: "test", + admin: false, + defaultProjectId: uuid(), + }, + project: { + id: "test", + }, + } as Request; + + await projectsCleanup(params, mockReq); + + expect(triggerSpy).toHaveBeenCalledTimes(1); + + for (const streamId of streamsToDelete) { + let stream = await db.stream.get(streamId); + expect(stream.id).toBe(streamId); + expect(stream.deleted).toBe(true); + } + for (const assetId of assetsToDelete) { + let asset = await db.asset.get(assetId); + expect(asset.id).toBe(assetId); + expect(asset.deleted).toBe(true); + } + }); +}); diff --git a/packages/api/src/jobs/projects-cleanup.ts b/packages/api/src/jobs/projects-cleanup.ts new file mode 100644 index 000000000..b702ac949 --- /dev/null +++ b/packages/api/src/jobs/projects-cleanup.ts @@ -0,0 +1,35 @@ +import sql from "sql-template-strings"; +import { initClients } from "../app-router"; +import { CliArgs } from "../parse-cli"; +import { DB } from "../store/db"; +import Queue from "../store/queue"; +import { triggerCleanUpProjectsJob } from "../controllers/project"; +import { Request } from "express"; + +// queries for all the deleted projects +// clean up logic for all related assets and streams +export default async function projectsCleanup( + config: CliArgs, + req: Request, + clients?: { jobsDb: DB }, +) { + if (!config.ingest?.length) { + throw new Error("ingest not configured"); + } + const { jobsDb } = + clients ?? (await initClients(config, "projects-cleanup-job")); + const { projectsCleanupLimit: limit, ingest } = config; + + let [projects] = await jobsDb.stream.find([sql`data->>'deleted' = 'true'`], { + limit, + order: "data->>'lastSeen' DESC", + }); + + const [cleanedUp, jobPromise] = triggerCleanUpProjectsJob(projects, req); + await jobPromise; + + return { + cleanedUp, + logContext: `limit=${limit} numCleanedUp=${cleanedUp.length}`, + }; +} diff --git a/packages/api/src/parse-cli.ts b/packages/api/src/parse-cli.ts index f60cac75a..30644556e 100755 --- a/packages/api/src/parse-cli.ts +++ b/packages/api/src/parse-cli.ts @@ -535,6 +535,11 @@ export default function parseCli(argv?: string | readonly string[]) { type: "number", default: 1000, }, + "projects-cleanup-limit": { + describe: "job/projects-cleanup: max number of projects to clean up", + type: "number", + default: 100, + }, "update-usage-from": { describe: "job/update-usage: unix millis timestamp for start time of update usage job", From e911d985d81d6e2f3bb6a43570618af5667b0dcd Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Tue, 9 Jul 2024 18:14:16 +0200 Subject: [PATCH 2/2] update --- packages/api/src/controllers/helpers.ts | 37 +++++++++++++ packages/api/src/controllers/project.ts | 69 ++----------------------- packages/api/src/controllers/user.ts | 7 ++- 3 files changed, 44 insertions(+), 69 deletions(-) diff --git a/packages/api/src/controllers/helpers.ts b/packages/api/src/controllers/helpers.ts index c0815ef82..5cf794710 100644 --- a/packages/api/src/controllers/helpers.ts +++ b/packages/api/src/controllers/helpers.ts @@ -751,6 +751,43 @@ export function getProjectId(req: Request): string { return projectId; } +export async function deleteAllOwnedObjects( + req: Request, + params: { + projectId?: string; + userId?: string; + deleted?: boolean; + }, +) { + const filters: any = {}; + if (params.projectId) { + filters.projectId = params.projectId; + } + if (params.userId) { + filters.userId = params.userId; + } + if (params.deleted !== undefined) { + filters.deleted = params.deleted; + } else { + filters.deleted = false; + } + + let [assets] = await db.asset.find({ filters }); + let [streams] = await db.stream.find({ filters }); + let [signingKeys] = await db.signingKey.find({ filters }); + let [webhooks] = await db.webhook.find({ filters }); + let [sessions] = await db.session.find({ filters }); + + for (const asset of assets) { + await req.taskScheduler.deleteAsset(asset.id); + } + + await db.stream.markDeletedMany(streams.map((s) => s.id)); + await db.signingKey.markDeletedMany(signingKeys.map((sk) => sk.id)); + await db.webhook.markDeletedMany(webhooks.map((w) => w.id)); + await db.session.markDeletedMany(sessions.map((s) => s.id)); +} + export async function addDefaultProjectId( body: any, req: Request, diff --git a/packages/api/src/controllers/project.ts b/packages/api/src/controllers/project.ts index 34e5e0af0..8976246c4 100644 --- a/packages/api/src/controllers/project.ts +++ b/packages/api/src/controllers/project.ts @@ -3,6 +3,7 @@ import { authorizer } from "../middleware"; import { db, jobsDb } from "../store"; import { v4 as uuid } from "uuid"; import { + deleteAllOwnedObjects, makeNextHREF, parseFilters, parseOrder, @@ -216,7 +217,7 @@ export function triggerCleanUpProjectsJob( const jobPromise = Promise.resolve().then(async () => { try { - await Promise.all(projects.map((s) => cleanUpProject(jobsDb, s, req))); + await Promise.all(projects.map((s) => cleanUpProject(s, req))); } catch (err) { const ids = projects.map((s) => s.id); logger.error(`Error cleaning up projectId=${ids} err=`, err); @@ -226,70 +227,8 @@ export function triggerCleanUpProjectsJob( return [projects, jobPromise]; } -async function cleanUpProject(db: DB, project: Project, req: Request) { - let [assets] = await db.asset.find({ - filters: { - projectId: project.id, - deleted: false, - }, - }); - - let [streams] = await db.stream.find({ - filters: { - projectId: project.id, - deleted: false, - }, - }); - - let [signingKeys] = await db.signingKey.find({ - filters: { - projectId: project.id, - deleted: false, - }, - }); - - let [webhooks] = await db.webhook.find({ - filters: { - projectId: project.id, - deleted: false, - }, - }); - - let [sessions] = await db.session.find({ - filters: { - projectId: project.id, - deleted: false, - }, - }); - - for (const asset of assets) { - await req.taskScheduler.deleteAsset(asset.id); - } - - for (const stream of streams) { - await db.stream.update(stream.id, { - deleted: true, - }); - } - - for (const signingKey of signingKeys) { - await db.signingKey.update(signingKey.id, { - deleted: true, - disabled: true, - }); - } - - for (const webhook of webhooks) { - await db.webhook.update(webhook.id, { - deleted: true, - }); - } - - for (const session of sessions) { - await db.session.update(session.id, { - deleted: true, - }); - } +async function cleanUpProject(project: Project, req: Request) { + await deleteAllOwnedObjects(req, { projectId: project.id, deleted: false }); } export default app; diff --git a/packages/api/src/controllers/user.ts b/packages/api/src/controllers/user.ts index 510bdfe59..98a258e8d 100644 --- a/packages/api/src/controllers/user.ts +++ b/packages/api/src/controllers/user.ts @@ -30,6 +30,7 @@ import { InternalServerError, NotFoundError } from "../store/errors"; import { WithID } from "../store/types"; import { FieldsMap, + deleteAllOwnedObjects, makeNextHREF, parseFilters, parseOrder, @@ -296,11 +297,9 @@ app.delete("/:id", authorizer({ anyAdmin: true }), async (req, res) => { res.status(404); return res.json({ errors: ["user not found"] }); } - await db.user.delete(id); - // TODO: remove all streams owned by user - // TODO: remove all assets owned by user - // TODO: remove the stripe account + await db.user.markDeleted(id); + await deleteAllOwnedObjects(req, { userId: id, deleted: false }); res.status(204); res.end();