Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: projects: long running job cleanup (draft) #2242

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions packages/api/src/controllers/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,43 @@ export function getProjectId(req: Request): string {
return projectId;
}

export async function deleteAllOwnedObjects(
req: Request,
params: {
projectId?: string;
userId?: string;
deleted?: boolean;
},
) {
const filters: any = {};
if (params.projectId) {
filters.projectId = params.projectId;
}
if (params.userId) {
filters.userId = params.userId;
}
if (params.deleted !== undefined) {
filters.deleted = params.deleted;
} else {
filters.deleted = false;
}

let [assets] = await db.asset.find({ filters });
let [streams] = await db.stream.find({ filters });
let [signingKeys] = await db.signingKey.find({ filters });
let [webhooks] = await db.webhook.find({ filters });
let [sessions] = await db.session.find({ filters });

for (const asset of assets) {
await req.taskScheduler.deleteAsset(asset.id);
}

await db.stream.markDeletedMany(streams.map((s) => s.id));
await db.signingKey.markDeletedMany(signingKeys.map((sk) => sk.id));
await db.webhook.markDeletedMany(webhooks.map((w) => w.id));
await db.session.markDeletedMany(sessions.map((s) => s.id));
}

export async function addDefaultProjectId(
body: any,
req: Request,
Expand Down
60 changes: 57 additions & 3 deletions packages/api/src/controllers/project.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Router } from "express";
import { Request, Router } from "express";
import { authorizer } from "../middleware";
import { db } from "../store";
import { db, jobsDb } from "../store";
import { v4 as uuid } from "uuid";
import {
deleteAllOwnedObjects,
makeNextHREF,
parseFilters,
parseOrder,
Expand All @@ -11,7 +12,11 @@
import { NotFoundError, ForbiddenError } from "../store/errors";
import sql from "sql-template-strings";
import { WithID } from "../store/types";
import { Project } from "../schema/types";
import { Asset, Project } from "../schema/types";
import { CliArgs } from "../parse-cli";
import Queue from "../store/queue";
import logger from "../logger";
import { DB } from "../store/db";

const app = Router();

Expand Down Expand Up @@ -177,4 +182,53 @@
res.end();
});

app.post(
"/job/projects-cleanup",
authorizer({ anyAdmin: true }),

Check failure

Code scanning / CodeQL

Missing rate limiting High

This route handler performs
authorization
, but is not rate-limited.
Comment on lines +185 to +187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw if you're creating this only because the other jobs have sth like it, it's unnecessary! The only reason they're like that is tech debt: we already had such APIs that were called from GitHub actions.

The cleanest way for a new job IMO will be to implement everything on the job. Then if later we want the API to trigger it manually, we can make an API that calls the job function (not the other way around).

Your call though, especially if you're already too far into having this API that it would be a big refactor to get rid of it now.

async (req, res) => {
// import the job dynamically to avoid circular dependencies
const { default: projectsCleanup } = await import(
"../jobs/projects-cleanup"
);

const limit = parseInt(req.query.limit?.toString()) || 1000;

const { cleanedUp } = await projectsCleanup(
{
...req.config,
projectsCleanupLimit: limit,
},
req,
{ jobsDb },
);

res.status(200);
res.json({ cleanedUp });
},
);

export function triggerCleanUpProjectsJob(
projects: Project[],
req: Request,
): [Project[], Promise<void>] {
if (!projects.length) {
return [projects, Promise.resolve()];
}

const jobPromise = Promise.resolve().then(async () => {
try {
await Promise.all(projects.map((s) => cleanUpProject(s, req)));
} catch (err) {
const ids = projects.map((s) => s.id);
logger.error(`Error cleaning up projectId=${ids} err=`, err);
}
});

return [projects, jobPromise];
}

async function cleanUpProject(project: Project, req: Request) {
await deleteAllOwnedObjects(req, { projectId: project.id, deleted: false });
}

export default app;
7 changes: 3 additions & 4 deletions packages/api/src/controllers/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { InternalServerError, NotFoundError } from "../store/errors";
import { WithID } from "../store/types";
import {
FieldsMap,
deleteAllOwnedObjects,
makeNextHREF,
parseFilters,
parseOrder,
Expand Down Expand Up @@ -296,11 +297,9 @@ app.delete("/:id", authorizer({ anyAdmin: true }), async (req, res) => {
res.status(404);
return res.json({ errors: ["user not found"] });
}
await db.user.delete(id);

// TODO: remove all streams owned by user
// TODO: remove all assets owned by user
// TODO: remove the stripe account
await db.user.markDeleted(id);
await deleteAllOwnedObjects(req, { userId: id, deleted: false });

res.status(204);
res.end();
Expand Down
129 changes: 129 additions & 0 deletions packages/api/src/jobs/projects-cleanup.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { v4 as uuid } from "uuid";

import * as appRouter from "../app-router";
import * as projectsController from "../controllers/project";
import { cache } from "../store/cache";
import { DB } from "../store/db";
import Queue, { RabbitQueue } from "../store/queue";
import { rabbitMgmt } from "../test-helpers";
import params, { testId } from "../test-params";
import projectsCleanup from "./projects-cleanup";
import { Request } from "express";

describe("projects-cleanup", () => {
// There are further functional tests under controllers/stream.test.ts "active clean-up"

let db: DB;
let initClientsSpy: jest.SpyInstance;

beforeAll(async () => {
db = new DB();
await db.start({ postgresUrl: params.postgresUrl });
await rabbitMgmt.createVhost(testId);
});

afterAll(async () => {
await rabbitMgmt.deleteVhost(testId);
});

let queue: Queue;

beforeEach(() => {
const originalInitClient = appRouter.initClients;
initClientsSpy = jest
.spyOn(appRouter, "initClients")
.mockImplementation(async (params, name) => {
const result = await originalInitClient(params, name);
queue = result.queue;
jest.spyOn(queue, "consume");
jest.spyOn(queue, "delayedPublishWebhook");
return result;
});
});

afterEach(() => {
jest.restoreAllMocks();
cache.storage = null;
queue?.close();
queue = null;
});

const mockProject = (deleted: boolean, deletedAt: number) => {
return {
id: uuid(),
name: "project1",
deleted,
deletedAt,
};
};

const mockStream = (projectId: string) => {
return {
id: uuid(),
name: "stream1",
playbackId: uuid(),
streamKey: uuid(),
projectId,
};
};

const mockAsset = (projectId: string) => {
return {
id: uuid(),
name: "asset1",
projectId,
source: {
type: "url" as const,
url: "someSource",
},
};
};

it("it should deleted related assets and streams", async () => {
const triggerSpy = jest
.spyOn(projectsController, "triggerCleanUpProjectsJob")
.mockImplementation(() => [[], Promise.resolve()]);
Comment on lines +83 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same regarding the "inverted" nature of the current jobs implementation. Let's make sure the "real" tests are implemented here, which check the effects of the deletion job. The API that should be the "side-kick" which calls the job, but the core logic of the job should be the job itself


const now = Date.now();
const project = await db.project.create(mockProject(false, now - 10000));

let streamsToDelete = [];
let assetsToDelete = [];
// 3 streams
for (let i = 0; i < 3; i++) {
const stream = await db.stream.create(mockStream(project.id));
streamsToDelete.push(stream.id);
}
// 3 assets
for (let i = 0; i < 3; i++) {
const asset = await db.asset.create(mockAsset(project.id));
assetsToDelete.push(asset.id);
}

let mockReq: Request = {
user: {
id: "test",
admin: false,
defaultProjectId: uuid(),
},
project: {
id: "test",
},
} as Request;

await projectsCleanup(params, mockReq);

expect(triggerSpy).toHaveBeenCalledTimes(1);

for (const streamId of streamsToDelete) {
let stream = await db.stream.get(streamId);
expect(stream.id).toBe(streamId);
expect(stream.deleted).toBe(true);
}
for (const assetId of assetsToDelete) {
let asset = await db.asset.get(assetId);
expect(asset.id).toBe(assetId);
expect(asset.deleted).toBe(true);
}
});
});
35 changes: 35 additions & 0 deletions packages/api/src/jobs/projects-cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sql from "sql-template-strings";
import { initClients } from "../app-router";
import { CliArgs } from "../parse-cli";
import { DB } from "../store/db";
import Queue from "../store/queue";
import { triggerCleanUpProjectsJob } from "../controllers/project";
import { Request } from "express";

// queries for all the deleted projects
// clean up logic for all related assets and streams
export default async function projectsCleanup(
config: CliArgs,
req: Request,
clients?: { jobsDb: DB },
) {
if (!config.ingest?.length) {
throw new Error("ingest not configured");
}
const { jobsDb } =
clients ?? (await initClients(config, "projects-cleanup-job"));
const { projectsCleanupLimit: limit, ingest } = config;

let [projects] = await jobsDb.stream.find([sql`data->>'deleted' = 'true'`], {
limit,
order: "data->>'lastSeen' DESC",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this exist? I thought it was an user/api-key/stream thing only

});
Comment on lines +23 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need sth like a "deleting phase" on the projects too, otherwise this will keep listing deleted files that have already been cleaned-up ad infinitum. Maybe a simple cleanedUp boolean in the project that gets set by this job?


const [cleanedUp, jobPromise] = triggerCleanUpProjectsJob(projects, req);
await jobPromise;

return {
cleanedUp,
logContext: `limit=${limit} numCleanedUp=${cleanedUp.length}`,
};
}
5 changes: 5 additions & 0 deletions packages/api/src/parse-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,11 @@ export default function parseCli(argv?: string | readonly string[]) {
type: "number",
default: 1000,
},
"projects-cleanup-limit": {
describe: "job/projects-cleanup: max number of projects to clean up",
type: "number",
default: 100,
},
"update-usage-from": {
describe:
"job/update-usage: unix millis timestamp for start time of update usage job",
Expand Down
Loading