Skip to content

Commit

Permalink
fix(core): Fix task runner sending too many offers
Browse files Browse the repository at this point in the history
  • Loading branch information
tomi committed Dec 31, 2024
1 parent 0860fbe commit 5b74cac
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
import { WebSocket } from 'ws';

import { TaskRunner } from '@/task-runner';
import { TaskRunner, type TaskRunnerOpts } from '@/task-runner';

class TestRunner extends TaskRunner {}

jest.mock('ws');

describe('TestRunner', () => {
let runner: TestRunner;

const newTestRunner = (opts: Partial<TaskRunnerOpts> = {}) =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
...opts,
});

afterEach(() => {
runner?.clearIdleTimer();
});

describe('constructor', () => {
afterEach(() => {
jest.clearAllMocks();
});

it('should correctly construct WebSocket URI with provided taskBrokerUri', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});

expect(WebSocket).toHaveBeenCalledWith(
Expand All @@ -38,25 +50,11 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);

runner.clearIdleTimer();
});

it('should handle different taskBrokerUri formats correctly', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});

expect(WebSocket).toHaveBeenCalledWith(
Expand All @@ -68,50 +66,141 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);

runner.clearIdleTimer();
});

it('should throw an error if taskBrokerUri is invalid', () => {
expect(
() =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
}),
expect(() =>
newTestRunner({
taskBrokerUri: 'not-a-valid-uri',
}),
).toThrowError(/Invalid URL/);
});
});

describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = new TestRunner({
describe('sendOffers', () => {
beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
jest.clearAllTimers();
});

it('should not send offers if canSendOffers is false', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
maxConcurrency: 2,
});
const sendSpy = jest.spyOn(runner, 'send');
expect(runner.canSendOffers).toBe(false);

runner.sendOffers();

expect(sendSpy).toHaveBeenCalledTimes(0);
});

it('should enable sending of offer on runnerregistered message', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});

expect(runner.canSendOffers).toBe(true);
});

it('should send maxConcurrency offers when there are no offers', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});

const sendSpy = jest.spyOn(runner, 'send');

runner.sendOffers();
runner.sendOffers();

expect(sendSpy).toHaveBeenCalledTimes(2);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
});

it('should send up to maxConcurrency offers when there is a running task', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
runner.runningTasks.set('test-task', {
taskId: 'test-task',
active: true,
cancelled: false,
});
const sendSpy = jest.spyOn(runner, 'send');

runner.sendOffers();

expect(sendSpy).toHaveBeenCalledTimes(1);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
});

it('should delete stale offers and send new ones', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});

const sendSpy = jest.spyOn(runner, 'send');

runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
sendSpy.mockClear();

jest.advanceTimersByTime(6000);
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
});
});

describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = newTestRunner();

const taskId = 'test-task';
runner.runningTasks.set(taskId, {
taskId,
Expand Down
11 changes: 7 additions & 4 deletions packages/@n8n/task-runner/src/task-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ export abstract class TaskRunner extends EventEmitter {
sendOffers() {
this.deleteStaleOffers();

const offersToSend =
this.maxConcurrency -
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
if (!this.canSendOffers) {
return;
}

const offersToSend = this.maxConcurrency - (this.openOffers.size + this.runningTasks.size);

for (let i = 0; i < offersToSend; i++) {
// Add a bit of randomness so that not all offers expire at the same time
Expand Down Expand Up @@ -255,7 +257,7 @@ export abstract class TaskRunner extends EventEmitter {
}

hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency;
return this.runningTasks.size < this.maxConcurrency;
}

offerAccepted(offerId: string, taskId: string) {
Expand All @@ -267,6 +269,7 @@ export abstract class TaskRunner extends EventEmitter {
});
return;
}

const offer = this.openOffers.get(offerId);
if (!offer) {
this.send({
Expand Down

0 comments on commit 5b74cac

Please sign in to comment.