Skip to content

Commit

Permalink
feat: Add an option to not keep failed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedBassem committed Nov 9, 2024
1 parent 7681d18 commit 6bfc81e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface SqliteQueueOptions {
defaultJobArgs: {
numRetries: number;
};
keepFailedJobs: boolean;
}

export interface EnqueueOptions {
Expand Down
98 changes: 98 additions & 0 deletions src/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* eslint-disable @typescript-eslint/require-await */
import { describe, expect, test } from "vitest";

import {
buildDBClient,
SqliteQueue
} from "./";

interface Work {
increment: number;
succeedAfter?: number;
blockForSec?: number;
}

describe("SqliteQueue", () => {
test("idempotency keys", async () => {
const queue = new SqliteQueue<Work>(
"queue1",
buildDBClient(":memory:", true),
{
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: false,
},
);

await queue.enqueue({ increment: 1 });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 3 }, { idempotencyKey: "3" });

expect(await queue.stats()).toEqual({
pending: 3,
running: 0,
pending_retry: 0,
failed: 0,
});

});

test("keep failed jobs", async () => {
const queueKeep = new SqliteQueue<Work>(
"queue1",
buildDBClient(":memory:", true),
{
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: true,
},
);

const queueDontKeep = new SqliteQueue<Work>(
"queue2",
buildDBClient(":memory:", true),
{
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: false,
},
);

const job1 = await queueKeep.enqueue({ increment: 1 });
const job2 = await queueDontKeep.enqueue({ increment: 1 });

expect(await queueKeep.stats()).toEqual({
pending: 1,
running: 0,
pending_retry: 0,
failed: 0,
});
expect(await queueDontKeep.stats()).toEqual({
pending: 1,
running: 0,
pending_retry: 0,
failed: 0,
});

queueKeep.finalize(job1!.id, job1!.allocationId, "failed");
queueDontKeep.finalize(job2!.id, job2!.allocationId, "failed");

expect(await queueKeep.stats()).toEqual({
pending: 0,
running: 0,
pending_retry: 0,
failed: 1,
});
expect(await queueDontKeep.stats()).toEqual({
pending: 0,
running: 0,
pending_retry: 0,
failed: 0,
});
});
});
2 changes: 1 addition & 1 deletion src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class SqliteQueue<T> {
alloctionId: string,
status: "completed" | "pending_retry" | "failed",
) {
if (status == "completed") {
if (status == "completed" || (status == "failed" && !this.options.keepFailedJobs)) {
await this.db
.delete(tasksTable)
.where(
Expand Down
33 changes: 7 additions & 26 deletions src/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: true,
},
);

Expand Down Expand Up @@ -210,6 +211,7 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 2,
},
keepFailedJobs: true,
},
);

Expand Down Expand Up @@ -247,6 +249,7 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: true,
},
);

Expand Down Expand Up @@ -282,6 +285,7 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: true,
},
);

Expand Down Expand Up @@ -326,6 +330,7 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: true,
},
);

Expand Down Expand Up @@ -387,11 +392,13 @@ describe("SqiteQueueRunner", () => {
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: true,
});
const queue2 = new SqliteQueue<Work>("queue2", db, {
defaultJobArgs: {
numRetries: 0,
},
keepFailedJobs: true,
});

const barrier = new Barrier(0);
Expand Down Expand Up @@ -439,30 +446,4 @@ describe("SqiteQueueRunner", () => {
expect(results.numCompleted).toEqual(1000);
expect(results.numFailed).toEqual(0);
});

test("idempotency keys", async () => {
const queue = new SqliteQueue<Work>(
"queue1",
buildDBClient(":memory:", true),
{
defaultJobArgs: {
numRetries: 0,
},
},
);

await queue.enqueue({ increment: 1 });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
await queue.enqueue({ increment: 3 }, { idempotencyKey: "3" });

expect(await queue.stats()).toEqual({
pending: 3,
running: 0,
pending_retry: 0,
failed: 0,
});

});
});

0 comments on commit 6bfc81e

Please sign in to comment.