Skip to content

Commit

Permalink
feat(job): provide skipAttempt option when manually moving a job (#2203)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 9, 2023
1 parent a16b8fd commit 0e88e4f
Show file tree
Hide file tree
Showing 24 changed files with 343 additions and 75 deletions.
1 change: 0 additions & 1 deletion docs/gitbook/guide/redis-tm-hosting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ description: >-
---

# Redis™ hosting

1 change: 0 additions & 1 deletion docs/gitbook/guide/redis-tm-hosting/aws-elasticache.md
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# AWS Elasticache

22 changes: 13 additions & 9 deletions docs/gitbook/guide/redis-tm-hosting/aws-memorydb.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,34 @@ AWS provides a Redis™ 7 compatible managed database that is easy to use and is

There are some considerations to take care when using MemoryDB though.

* MemoryDB only works in Cluster mode. So you need to use "hash tags" so that the queues get attached to a given cluster node ([read more here](../../bull/patterns/redis-cluster.md)).
* MemoryDB can only be accessed within an AWS VPC, so you cannot access the Redis™ cluster outside of AWS.
- MemoryDB only works in Cluster mode. So you need to use "hash tags" so that the queues get attached to a given cluster node ([read more here](../../bull/patterns/redis-cluster.md)).
- MemoryDB can only be accessed within an AWS VPC, so you cannot access the Redis™ cluster outside of AWS.

The easiest way to use MemoryDB with BullMQ is to first instantiate a IORedis Cluster instance, and then use that connection as an option to your workers or queue instances, for example:

```typescript
import { Cluster } from "ioredis"
import { Worker } from "bullmq"
import { Cluster } from 'ioredis';
import { Worker } from 'bullmq';

const connection = new Cluster(
[
{
host: "clustercfg.xxx.amazonaws.com",
host: 'clustercfg.xxx.amazonaws.com',
port: 6379,
},
],
{
tls: {},
}
},
);

const worker = new Worker("myqueue", async (job: Job) => {
// Do some usefull stuff
}, { connection });
const worker = new Worker(
'myqueue',
async (job: Job) => {
// Do some usefull stuff
},
{ connection },
);

// ...

Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/patterns/redis-cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ You can use two approaches in order to make the Queues compatible with Cluster.

```typescript
const queue = new Queue('cluster', {
prefix: '{myprefix}'
prefix: '{myprefix}',
});
```

Expand Down
12 changes: 9 additions & 3 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}, jo
self.delay = opts.get("delay", 0)
self.attempts = opts.get("attempts", 1)
self.attemptsMade = 0
self.attemptsStarted = 0
self.data = data
self.removeOnComplete = opts.get("removeOnComplete", True)
self.removeOnFail = opts.get("removeOnFail", False)
Expand Down Expand Up @@ -96,9 +97,9 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):

async with self.queue.redisConnection.conn.pipeline(transaction=True) as pipe:
await self.saveStacktrace(pipe, error_message)
if self.attemptsMade < self.opts['attempts'] and not self.discarded:
if (self.attemptsMade + 1) < self.opts.get('attempts') and not self.discarded:
delay = await Backoffs.calculate(
self.opts.get('backoff'), self.attemptsMade,
self.opts.get('backoff'), self.attemptsMade + 1,
err, self, self.queue.opts.get("settings") and self.queue.opts['settings'].get("backoffStrategy")
)
if delay == -1:
Expand Down Expand Up @@ -141,6 +142,8 @@ async def moveToFailed(self, err, token:str, fetchNext:bool = False):
if delay and type(delay) == int:
self.delay = delay

self.attemptsMade = self.attemptsMade + 1

async def saveStacktrace(self, pipe, err:str):
stacktrace = traceback.format_exc()
stackTraceLimit = self.opts.get("stackTraceLimit")
Expand Down Expand Up @@ -186,8 +189,11 @@ def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
if rawData.get("rjk"):
job.repeatJobKey = rawData.get("rjk")

if rawData.get("ats"):
job.attemptsStarted = int(rawData.get("ats"))

job.failedReason = rawData.get("failedReason")
job.attemptsMade = int(rawData.get("attemptsMade", "0"))
job.attemptsMade = int(rawData.get("attemptsMade") or rawData.get("atm") or "0")

returnvalue = rawData.get("returnvalue")
if type(returnvalue) == str:
Expand Down
15 changes: 8 additions & 7 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ def cleanJobsInSet(self, set: str, grace: int = 0, limit:int = 0):
keys, args = self.cleanJobsInSetArgs(set, grace, limit)
return self.commands["cleanJobsInSet"](keys=keys, args=args)

def moveToWaitingChildrenArgs(self, job_id, token, opts):
def moveToWaitingChildrenArgs(self, job_id, token, opts: dict = {}):
keys = [self.toKey(job_id) + ":lock",
self.keys['active'],
self.keys['waiting-children'],
self.toKey(job_id)]
child_key = opts.get("child") if opts else None
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id]
args = [token, get_parent_key(child_key) or "", round(time.time() * 1000), job_id,
"1" if opts.get("skipAttempt") else "0"]

return (keys, args)

Expand Down Expand Up @@ -239,7 +240,7 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str):

return (keys, args)

def retryJobArgs(self, job_id: str, lifo: bool, token: str):
def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
keys = self.getKeys(['active', 'wait', 'paused'])
keys.append(self.toKey(job_id))
keys.append(self.keys['meta'])
Expand All @@ -249,14 +250,14 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str):
keys.append(self.keys['pc'])
keys.append(self.keys['marker'])

push_cmd = "R" if lifo else "L"
push_cmd = "RPUSH" if lifo else "LPUSH"

args = [self.keys[''], round(time.time() * 1000), push_cmd,
job_id, token]
job_id, token, "1" if opts.get("skipAttempt") else "0"]

return (keys, args)

def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int = 0):
def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int = 0, opts: dict = {}):
max_timestamp = max(0, timestamp or 0)

if timestamp > 0:
Expand All @@ -268,7 +269,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int
keys.append(self.keys['meta'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
job_id, token, delay]
job_id, token, delay, "1" if opts.get("skipAttempt") else "0" ]

return (keys, args)

Expand Down
1 change: 1 addition & 0 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async def processJob(self, job: Job, token: str):
# nextJob = await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=not self.closing)
await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", False), token, self.opts, fetchNext=False)
job.returnvalue = result
job.attemptsMade = job.attemptsMade + 1
self.emit("completed", job, result)
except WaitingChildrenError:
return
Expand Down
8 changes: 4 additions & 4 deletions python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def process2(job: Job, token: str):
completedJob = await Job.fromId(queue, job.id)

self.assertEqual(completedJob.id, job.id)
self.assertEqual(completedJob.attemptsMade, 2)
self.assertEqual(completedJob.attemptsMade, 1)
self.assertEqual(completedJob.data, data)
self.assertEqual(completedJob.returnvalue, "done2")
self.assertNotEqual(completedJob.finishedOn, None)
Expand All @@ -213,7 +213,7 @@ async def test_retry_job_after_delay_with_fixed_backoff(self):
queue = Queue(queueName)

async def process1(job: Job, token: str):
if job.attemptsMade < 3:
if job.attemptsMade < 2:
raise Exception("Not yet!")
return None

Expand All @@ -236,12 +236,12 @@ def completing(job: Job, result):

await queue.close()
await worker.close()

async def test_retry_job_after_delay_with_custom_backoff(self):
queue = Queue(queueName)

async def process1(job: Job, token: str):
if job.attemptsMade < 3:
if job.attemptsMade < 2:
raise Exception("Not yet!")
return None

Expand Down
47 changes: 40 additions & 7 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
JobJson,
JobJsonRaw,
MinimalJob,
MoveToDelayedOpts,
MoveToWaitingChildrenOpts,
ParentKeys,
ParentOpts,
Expand Down Expand Up @@ -98,6 +99,12 @@ export class Job<
*/
timestamp: number;

/**
* Number of attempts when job is moved to active.
* @defaultValue 0
*/
attemptsStarted = 0;

/**
* Number of attempts after the job has failed.
* @defaultValue 0
Expand Down Expand Up @@ -311,7 +318,10 @@ export class Job<
}

job.failedReason = json.failedReason;
job.attemptsMade = parseInt(json.attemptsMade || '0');

job.attemptsStarted = parseInt(json.ats || '0');

job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');

job.stacktrace = getTraces(json.stacktrace);

Expand Down Expand Up @@ -588,6 +598,7 @@ export class Job<
this.finishedOn = args[
this.scripts.moveToFinishedKeys.length + 1
] as number;
this.attemptsMade += 1;

return result;
}
Expand Down Expand Up @@ -622,7 +633,7 @@ export class Job<
let moveToFailed = false;
let finishedOn, delay;
if (
this.attemptsMade < this.opts.attempts &&
this.attemptsMade + 1 < this.opts.attempts &&
!this.discarded &&
!(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')
) {
Expand All @@ -631,7 +642,7 @@ export class Job<
// Check if backoff is needed
delay = await Backoffs.calculate(
<BackoffOptions>this.opts.backoff,
this.attemptsMade,
this.attemptsMade + 1,
err,
this,
opts.settings && opts.settings.backoffStrategy,
Expand Down Expand Up @@ -693,6 +704,8 @@ export class Job<
if (delay && typeof delay === 'number') {
this.delay = delay;
}

this.attemptsMade += 1;
}

/**
Expand Down Expand Up @@ -1027,14 +1040,25 @@ export class Job<
* @param token - token to check job is locked by current worker
* @returns
*/
moveToDelayed(timestamp: number, token?: string): Promise<void> {
async moveToDelayed(
timestamp: number,
token?: string,
opts: MoveToDelayedOpts = {},
): Promise<void> {
const delay = timestamp - Date.now();
return this.scripts.moveToDelayed(
const movedToDelayed = await this.scripts.moveToDelayed(
this.id,
timestamp,
delay > 0 ? delay : 0,
token,
opts,
);

if (!opts.skipAttempt) {
this.attemptsMade += 1;
}

return movedToDelayed;
}

/**
Expand All @@ -1044,11 +1068,20 @@ export class Job<
* @param opts - The options bag for moving a job to waiting-children.
* @returns true if the job was moved
*/
moveToWaitingChildren(
async moveToWaitingChildren(
token: string,
opts: MoveToWaitingChildrenOpts = {},
): Promise<boolean> {
return this.scripts.moveToWaitingChildren(this.id, token, opts);
const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(
this.id,
token,
opts,
);
if (!opts.skipAttempt) {
this.attemptsMade += 1;
}

return movedToWaitingChildren;
}

/**
Expand Down
11 changes: 9 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
RedisClient,
WorkerOptions,
KeepJobs,
MoveToDelayedOpts,
RetryOpts,
} from '../interfaces';
import {
JobState,
Expand Down Expand Up @@ -358,7 +360,6 @@ export class Scripts {
limiter: opts.limiter,
lockDuration: opts.lockDuration,
attempts: job.opts.attempts,
attemptsMade: job.attemptsMade,
maxMetricsSize: opts.metrics?.maxDataPoints
? opts.metrics?.maxDataPoints
: '',
Expand Down Expand Up @@ -652,6 +653,7 @@ export class Scripts {
timestamp: number,
token: string,
delay: number,
opts: MoveToDelayedOpts = {},
): (string | number)[] {
//
// Bake in the job id first 12 bits into the timestamp
Expand Down Expand Up @@ -684,6 +686,7 @@ export class Scripts {
jobId,
token,
delay,
opts.skipAttempt ? '1' : '0',
]);
}

Expand Down Expand Up @@ -717,6 +720,7 @@ export class Scripts {
childKey ?? '',
JSON.stringify(timestamp),
jobId,
opts.skipAttempt ? '1' : '0',
]);
}

Expand All @@ -725,10 +729,11 @@ export class Scripts {
timestamp: number,
delay: number,
token = '0',
opts: MoveToDelayedOpts = {},
): Promise<void> {
const client = await this.queue.client;

const args = this.moveToDelayedArgs(jobId, timestamp, token, delay);
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await (<any>client).moveToDelayed(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'moveToDelayed', 'active');
Expand Down Expand Up @@ -797,6 +802,7 @@ export class Scripts {
jobId: string,
lifo: boolean,
token: string,
opts: RetryOpts = {},
): (string | number)[] {
const keys: (string | number)[] = [
this.queue.keys.active,
Expand All @@ -819,6 +825,7 @@ export class Scripts {
pushCmd,
jobId,
token,
opts.skipAttempt ? '1' : '0',
]);
}

Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ local function prepareJobForProcessing(keys, keyPrefix, targetKey, jobId, proces

rcall("XADD", keys[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "attemptsMade", 1)
rcall("HINCRBY", jobKey, "ats", 1)

return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
end
Loading

0 comments on commit 0e88e4f

Please sign in to comment.