Skip to content

Commit e7bca6f

Browse files
authored
fix: release concurrency system only consumes tokens when releasings are executed successfully (#1883)
* fix: release concurrency system only consumes tokens when releasings are executed successfully * Add release concurrency metrics sending to otel
1 parent 195c5d7 commit e7bca6f

File tree

6 files changed

+308
-26
lines changed

6 files changed

+308
-26
lines changed

.vscode/launch.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
"type": "node-terminal",
139139
"request": "launch",
140140
"name": "Debug RunEngine tests",
141-
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
141+
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should retrieve metrics for all queues via getQueueMetrics'",
142142
"cwd": "${workspaceFolder}/internal-packages/run-engine",
143143
"sourceMaps": true
144144
},

internal-packages/redis/src/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const defaultOptions: Partial<RedisOptions> = {
88
const delay = Math.min(times * 50, 1000);
99
return delay;
1010
},
11-
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 1 : 20,
11+
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20,
1212
};
1313

1414
const logger = new Logger("Redis", "debug");

internal-packages/run-engine/src/engine/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ export class RunEngine {
219219
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
220220
batchSize: options.releaseConcurrency?.batchSize ?? 10,
221221
executor: async (descriptor, snapshotId) => {
222-
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
222+
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
223223
snapshotId
224224
);
225225
},

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

+192-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis";
2-
import { Tracer } from "@internal/tracing";
2+
import { startSpan, Tracer } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
4-
import { setInterval } from "node:timers/promises";
54
import { z } from "zod";
5+
import { setInterval } from "node:timers/promises";
6+
import { flattenAttributes } from "@trigger.dev/core/v3";
67

78
export type ReleaseConcurrencyQueueRetryOptions = {
89
maxRetries?: number;
@@ -15,7 +16,10 @@ export type ReleaseConcurrencyQueueRetryOptions = {
1516

1617
export type ReleaseConcurrencyQueueOptions<T> = {
1718
redis: RedisOptions;
18-
executor: (releaseQueue: T, releaserId: string) => Promise<void>;
19+
/**
20+
* @returns true if the run was successful, false if the token should be returned to the bucket
21+
*/
22+
executor: (releaseQueue: T, releaserId: string) => Promise<boolean>;
1923
keys: {
2024
fromDescriptor: (releaseQueue: T) => string;
2125
toDescriptor: (releaseQueue: string) => T;
@@ -78,6 +82,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
7882

7983
if (!options.disableConsumers) {
8084
this.#startConsumers();
85+
this.#startMetricsProducer();
8186
}
8287
}
8388

@@ -119,7 +124,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
119124
String(Date.now())
120125
);
121126

122-
this.logger.debug("Consumed token in attemptToRelease", {
127+
this.logger.info("Consumed token in attemptToRelease", {
123128
releaseQueueDescriptor,
124129
releaserId,
125130
maxTokens,
@@ -270,7 +275,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
270275
return false;
271276
}
272277

273-
await Promise.all(
278+
await Promise.allSettled(
274279
result.map(([queue, releaserId, metadata]) => {
275280
const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata));
276281
const releaseQueueDescriptor = this.keys.toDescriptor(queue);
@@ -283,9 +288,29 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
283288

284289
async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) {
285290
try {
286-
this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId });
291+
this.logger.info("Calling executor for release", { releaseQueueDescriptor, releaserId });
292+
293+
const released = await this.options.executor(releaseQueueDescriptor, releaserId);
287294

288-
await this.options.executor(releaseQueueDescriptor, releaserId);
295+
if (released) {
296+
this.logger.info("Executor released concurrency", { releaseQueueDescriptor, releaserId });
297+
} else {
298+
this.logger.info("Executor did not release concurrency", {
299+
releaseQueueDescriptor,
300+
releaserId,
301+
});
302+
303+
// Return the token but don't requeue
304+
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
305+
await this.redis.returnTokenOnly(
306+
this.masterQueuesKey,
307+
this.#bucketKey(releaseQueue),
308+
this.#queueKey(releaseQueue),
309+
this.#metadataKey(releaseQueue),
310+
releaseQueue,
311+
releaserId
312+
);
313+
}
289314
} catch (error) {
290315
this.logger.error("Error executing run:", { error });
291316

@@ -374,6 +399,30 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
374399
}
375400
}
376401

402+
async #startMetricsProducer() {
403+
try {
404+
// Produce metrics every 60 seconds, using a tracer span
405+
for await (const _ of setInterval(60_000)) {
406+
const metrics = await this.getQueueMetrics();
407+
this.logger.info("Queue metrics:", { metrics });
408+
409+
await startSpan(
410+
this.options.tracer,
411+
"ReleaseConcurrencyTokenBucketQueue.metrics",
412+
async (span) => {},
413+
{
414+
attributes: {
415+
...flattenAttributes(metrics, "queues"),
416+
forceRecording: true,
417+
},
418+
}
419+
);
420+
}
421+
} catch (error) {
422+
this.logger.error("Error starting metrics producer:", { error });
423+
}
424+
}
425+
377426
#calculateBackoffScore(item: QueueItemMetadata): string {
378427
const delay = Math.min(
379428
this.backoff.maxDelay,
@@ -382,6 +431,137 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
382431
return String(Date.now() + delay);
383432
}
384433

434+
async getQueueMetrics(): Promise<
435+
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
436+
> {
437+
const streamRedis = this.redis.duplicate();
438+
const queuePattern = `${this.keyPrefix}*:queue`;
439+
const stream = streamRedis.scanStream({
440+
match: queuePattern,
441+
type: "zset",
442+
count: 100,
443+
});
444+
445+
let resolvePromise: (
446+
value: Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
447+
) => void;
448+
let rejectPromise: (reason?: any) => void;
449+
450+
const promise = new Promise<
451+
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
452+
>((resolve, reject) => {
453+
resolvePromise = resolve;
454+
rejectPromise = reject;
455+
});
456+
457+
const metrics: Map<
458+
string,
459+
{ releaseQueue: string; currentTokens: number; queueLength: number }
460+
> = new Map();
461+
462+
async function getMetricsForKeys(queueKeys: string[]) {
463+
if (queueKeys.length === 0) {
464+
return [];
465+
}
466+
467+
const pipeline = streamRedis.pipeline();
468+
469+
queueKeys.forEach((queueKey) => {
470+
const releaseQueue = queueKey
471+
.replace(":queue", "")
472+
.replace(streamRedis.options.keyPrefix ?? "", "");
473+
const bucketKey = `${releaseQueue}:bucket`;
474+
475+
pipeline.get(bucketKey);
476+
pipeline.zcard(`${releaseQueue}:queue`);
477+
});
478+
479+
const result = await pipeline.exec();
480+
481+
if (!result) {
482+
return [];
483+
}
484+
485+
const results = result.map(([resultError, queueLengthOrCurrentTokens]) => {
486+
if (resultError) {
487+
return null;
488+
}
489+
490+
return queueLengthOrCurrentTokens ? Number(queueLengthOrCurrentTokens) : 0;
491+
});
492+
493+
// Now zip the results with the queue keys
494+
const zippedResults = queueKeys.map((queueKey, index) => {
495+
const releaseQueue = queueKey
496+
.replace(":queue", "")
497+
.replace(streamRedis.options.keyPrefix ?? "", "");
498+
499+
// Current tokens are at indexes 0, 2, 4, 6, etc.
500+
// Queue length are at indexes 1, 3, 5, 7, etc.
501+
502+
const currentTokens = results[index * 2];
503+
const queueLength = results[index * 2 + 1];
504+
505+
if (typeof currentTokens !== "number" || typeof queueLength !== "number") {
506+
return null;
507+
}
508+
509+
return {
510+
releaseQueue,
511+
currentTokens: currentTokens,
512+
queueLength: queueLength,
513+
};
514+
});
515+
516+
return zippedResults.filter((result) => result !== null);
517+
}
518+
519+
stream.on("end", () => {
520+
streamRedis.quit();
521+
resolvePromise(Array.from(metrics.values()));
522+
});
523+
524+
stream.on("error", (error) => {
525+
this.logger.error("Error getting queue metrics:", { error });
526+
527+
stream.pause();
528+
streamRedis.quit();
529+
rejectPromise(error);
530+
});
531+
532+
stream.on("data", async (keys) => {
533+
stream.pause();
534+
535+
const uniqueKeys = Array.from(new Set<string>(keys));
536+
537+
if (uniqueKeys.length === 0) {
538+
stream.resume();
539+
return;
540+
}
541+
542+
const unresolvedKeys = uniqueKeys.filter((key) => !metrics.has(key));
543+
544+
if (unresolvedKeys.length === 0) {
545+
stream.resume();
546+
return;
547+
}
548+
549+
this.logger.debug("Fetching queue metrics for keys", { keys: uniqueKeys });
550+
551+
await getMetricsForKeys(unresolvedKeys).then((results) => {
552+
results.forEach((result) => {
553+
if (result) {
554+
metrics.set(result.releaseQueue, result);
555+
}
556+
});
557+
558+
stream.resume();
559+
});
560+
});
561+
562+
return promise;
563+
}
564+
385565
#registerCommands() {
386566
this.redis.defineCommand("consumeToken", {
387567
numberOfKeys: 4,
@@ -401,7 +581,9 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
401581
402582
-- If we have enough tokens, then consume them
403583
if currentTokens >= 1 then
404-
redis.call("SET", bucketKey, currentTokens - 1)
584+
local newCurrentTokens = currentTokens - 1
585+
586+
redis.call("SET", bucketKey, newCurrentTokens)
405587
redis.call("ZREM", queueKey, releaserId)
406588
407589
-- Clean up metadata when successfully consuming
@@ -411,8 +593,8 @@ if currentTokens >= 1 then
411593
local queueLength = redis.call("ZCARD", queueKey)
412594
413595
-- If we still have tokens and items in queue, update available queues
414-
if currentTokens > 0 and queueLength > 0 then
415-
redis.call("ZADD", masterQueuesKey, currentTokens, releaseQueue)
596+
if newCurrentTokens > 0 and queueLength > 0 then
597+
redis.call("ZADD", masterQueuesKey, newCurrentTokens, releaseQueue)
416598
else
417599
redis.call("ZREM", masterQueuesKey, releaseQueue)
418600
end

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts

+18-13
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ export class ReleaseConcurrencySystem {
104104
);
105105
}
106106

107-
public async executeReleaseConcurrencyForSnapshot(snapshotId: string) {
107+
public async executeReleaseConcurrencyForSnapshot(snapshotId: string): Promise<boolean> {
108108
if (!this.releaseConcurrencyQueue) {
109-
return;
109+
return false;
110110
}
111111

112112
this.$.logger.debug("Executing released concurrency", {
@@ -136,14 +136,14 @@ export class ReleaseConcurrencySystem {
136136
snapshotId,
137137
});
138138

139-
return;
139+
return false;
140140
}
141141

142142
// - Runlock the run
143143
// - Get latest snapshot
144144
// - If the run is non suspended or going to be, then bail
145145
// - If the run is suspended or going to be, then release the concurrency
146-
await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
146+
return await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
147147
const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId);
148148

149149
const isValidSnapshot =
@@ -159,7 +159,7 @@ export class ReleaseConcurrencySystem {
159159
snapshot,
160160
});
161161

162-
return;
162+
return false;
163163
}
164164

165165
if (!canReleaseConcurrency(latestSnapshot.executionStatus)) {
@@ -168,20 +168,21 @@ export class ReleaseConcurrencySystem {
168168
snapshot: latestSnapshot,
169169
});
170170

171-
return;
171+
return false;
172172
}
173173

174174
const metadata = this.#parseMetadata(snapshot.metadata);
175175

176176
if (typeof metadata.releaseConcurrency === "boolean") {
177177
if (metadata.releaseConcurrency) {
178-
return await this.$.runQueue.releaseAllConcurrency(
179-
snapshot.organizationId,
180-
snapshot.runId
181-
);
178+
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
179+
180+
return true;
182181
}
183182

184-
return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
183+
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
184+
185+
return true;
185186
}
186187

187188
// Get the locked queue
@@ -198,10 +199,14 @@ export class ReleaseConcurrencySystem {
198199
(typeof taskQueue.concurrencyLimit === "undefined" ||
199200
taskQueue.releaseConcurrencyOnWaitpoint)
200201
) {
201-
return await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
202+
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
203+
204+
return true;
202205
}
203206

204-
return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
207+
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
208+
209+
return true;
205210
});
206211
}
207212

0 commit comments

Comments
 (0)