Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: set or lookup the collections adaptor version #804

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-taxis-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': minor
---

Support for collection versions (as arg or auto-looked-up on startup)
1 change: 1 addition & 0 deletions integration-tests/worker/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export const initWorker = async (
port: workerPort,
lightning: `ws://localhost:${lightningPort}/worker`,
secret: crypto.randomUUID(),
collectionsVersion: '1.0.0',
...workerArgs,
});

Expand Down
1 change: 1 addition & 0 deletions integration-tests/worker/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const spawnServer = (port: string | number = 1, args: string[] = []) => {
'--backoff 0.001/0.01',
'--log debug',
'-s secretsquirrel',
'--collections-token=1.0.0',
...args,
],
options
Expand Down
18 changes: 3 additions & 15 deletions packages/ws-worker/src/channels/run.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type { ExecutionPlan, Lazy, State } from '@openfn/lexicon';
import type { GetPlanReply, LightningPlan } from '@openfn/lexicon/lightning';
import type { Logger } from '@openfn/logger';

import { getWithReply } from '../util';
import convertRun, { WorkerRunOptions } from '../util/convert-lightning-plan';
import { GET_PLAN } from '../events';
import type { Channel, Socket } from '../types';

Expand All @@ -20,9 +18,7 @@ const joinRunChannel = (
) => {
return new Promise<{
channel: Channel;
plan: ExecutionPlan;
options: WorkerRunOptions;
input: Lazy<State>;
run: LightningPlan;
}>((resolve, reject) => {
// TMP - lightning seems to be sending two responses to me
// just for now, I'm gonna gate the handling here
Expand All @@ -38,9 +34,8 @@ const joinRunChannel = (
if (!didReceiveOk) {
didReceiveOk = true;
logger.success(`connected to ${channelName}`, e);
const { plan, options, input } = await loadRun(channel);
logger.debug('converted run as execution plan:', plan);
resolve({ channel, plan, options, input });
const run = await getWithReply<GetPlanReply>(channel, GET_PLAN);
resolve({ channel, run });
}
})
.receive('error', (err: any) => {
Expand All @@ -65,10 +60,3 @@ const joinRunChannel = (
};

export default joinRunChannel;

export async function loadRun(channel: Channel) {
// first we get the run body through the socket
const runBody = await getWithReply<GetPlanReply>(channel, GET_PLAN);
// then we generate the execution plan
return convertRun(runBody as LightningPlan);
}
52 changes: 45 additions & 7 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { EventEmitter } from 'node:events';

import { promisify } from 'node:util';
import { exec as _exec } from 'node:child_process';

import Koa from 'koa';
import bodyParser from 'koa-bodyparser';
import koaLogger from 'koa-logger';
Expand All @@ -18,6 +22,9 @@ import connectToWorkerQueue from './channels/worker-queue';
import type { Server } from 'http';
import type { RuntimeEngine } from '@openfn/engine-multi';
import type { Socket, Channel } from './types';
import { convertRun } from './util';

const exec = promisify(_exec);

export type ServerOptions = {
maxWorkflows?: number;
Expand All @@ -36,6 +43,7 @@ export type ServerOptions = {

socketTimeoutSeconds?: number;
payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
collectionsVersion?: string;
};

// this is the server/koa API
Expand All @@ -50,6 +58,9 @@ export interface ServerApp extends Koa {
engine: RuntimeEngine;
options: ServerOptions;
workloop?: Workloop;
// What version of the collections adaptor should we use?
// Can be set through CLI, or else it'll look up latest on startup
collectionsVersion?: string;

execute: ({ id, token }: ClaimRun) => Promise<void>;
destroy: () => void;
Expand Down Expand Up @@ -137,6 +148,24 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
.on('error', onError);
}

async function lookupCollectionsVersion(
options: ServerOptions,
logger: Logger
) {
if (options.collectionsVersion && options.collectionsVersion !== 'latest') {
logger.log(
'Using collections version from CLI/env: ',
options.collectionsVersion
);
return options.collectionsVersion;
}
const { stdout: version } = await exec(
'npm view @openfn/language-collections@next version'
);
logger.log('Using collections version from @latest: ', version);
return version;
}

function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
const logger = options.logger || createMockLogger();
const port = options.port || DEFAULT_PORT;
Expand Down Expand Up @@ -195,12 +224,18 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
const start = Date.now();
app.workflows[id] = true;

const {
channel: runChannel,
plan,
options = {},
input,
} = await joinRunChannel(app.socket, token, id, logger);
const { channel: runChannel, run } = await joinRunChannel(
app.socket,
token,
id,
logger
);

const { plan, options, input } = convertRun(
run,
app.options.collectionsVersion
);
logger.debug('converted run body into execution plan:', plan);

// Setup collections
if (plan.workflow.credentials?.collections_token) {
Expand Down Expand Up @@ -275,7 +310,10 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
app.use(router.routes());

if (options.lightning) {
connect(app, logger, options);
lookupCollectionsVersion(options, logger).then((version) => {
app.collectionsVersion = version;
connect(app, logger, options);
});
} else {
logger.warn('No lightning URL provided');
}
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function engineReady(engine: any) {
},
maxWorkflows: args.capacity,
payloadLimitMb: args.payloadMemory,
collectionsVersion: args.collectionsVersion,
};

if (args.lightningPublicKey) {
Expand Down
11 changes: 11 additions & 0 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Args = {
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
socketTimeoutSeconds?: number;
collectionsVersion?: string;
};

type ArgTypes = string | string[] | number | undefined;
Expand Down Expand Up @@ -51,6 +52,7 @@ export default function parseArgs(argv: string[]): Args {
const {
WORKER_BACKOFF,
WORKER_CAPACITY,
WORKER_COLLECTIONS_VERSION,
WORKER_LIGHTNING_PUBLIC_KEY,
WORKER_LIGHTNING_SERVICE_URL,
WORKER_LOG_LEVEL,
Expand Down Expand Up @@ -135,6 +137,11 @@ export default function parseArgs(argv: string[]): Args {
description:
'Default run timeout for the server, in seconds. Env: WORKER_MAX_RUN_DURATION_SECONDS',
type: 'number',
})
.option('collections-version', {
description:
'The verison of the collections adaptor to use for all runs on this worker instance.Env: WORKER_COLLECTIONS_VERSION',
type: 'string',
});

const args = parser.parse() as Args;
Expand Down Expand Up @@ -173,5 +180,9 @@ export default function parseArgs(argv: string[]): Args {
WORKER_SOCKET_TIMEOUT_SECONDS,
DEFAULT_SOCKET_TIMEOUT_SECONDS
),
collectionsVersion: setArg(
args.collectionsVersion,
WORKER_COLLECTIONS_VERSION
),
} as Args;
}
15 changes: 11 additions & 4 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ const mapTriggerEdgeCondition = (edge: LightningEdge) => {

// This function will look at every step and decide whether the collections adaptor
// should be added to the array
const appendCollectionsAdaptor = (plan: ExecutionPlan) => {
const appendCollectionsAdaptor = (
plan: ExecutionPlan,
collectionsVersion: string = 'latest'
) => {
let hasCollections;
plan.workflow.steps.forEach((step) => {
const job = step as Job;
if (job.expression?.match(/(collections\.)/)) {
hasCollections = true;
job.adaptors ??= [];
job.adaptors.push('@openfn/language-collections'); // what about version? Is this safe?
job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`);
}
});
return hasCollections;
Expand All @@ -62,7 +65,8 @@ export type WorkerRunOptions = ExecuteOptions & {
};

export default (
run: LightningPlan
run: LightningPlan,
collectionsVersion?: string
): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy<State> } => {
// Some options get mapped straight through to the runtime's workflow options
const runtimeOpts: Omit<WorkflowOptions, 'timeout'> = {};
Expand Down Expand Up @@ -185,7 +189,10 @@ export default (
plan.workflow.name = run.name;
}

const hasCollections = appendCollectionsAdaptor(plan as ExecutionPlan);
const hasCollections = appendCollectionsAdaptor(
plan as ExecutionPlan,
collectionsVersion
);
if (hasCollections) {
plan.workflow.credentials = {
collections_token: true,
Expand Down
65 changes: 5 additions & 60 deletions packages/ws-worker/test/channels/run.test.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,24 @@
import test from 'ava';
import { mockSocket, mockChannel } from '../../src/mock/sockets';
import joinRunChannel, { loadRun } from '../../src/channels/run';
import joinRunChannel from '../../src/channels/run';
import { GET_PLAN } from '../../src/events';
import { runs } from '../mock/data';
import { createMockLogger } from '@openfn/logger';

test('loadRun should get the run body', async (t) => {
const run = runs['run-1'];
let didCallGetRun = false;
const channel = mockChannel({
[GET_PLAN]: () => {
// TODO should be no payload (or empty payload)
didCallGetRun = true;
return run;
},
});

await loadRun(channel);
t.true(didCallGetRun);
});

test('loadRun should return an execution plan and options', async (t) => {
const run = {
...runs['run-1'],
options: {
sanitize: 'obfuscate',
run_timeout_ms: 10,
},
};

const channel = mockChannel({
[GET_PLAN]: () => run,
});

const { plan, options } = await loadRun(channel);
t.like(plan, {
id: 'run-1',
workflow: {
steps: [
{
id: 'job-1',
configuration: 'a',
expression: 'fn(a => a)',
adaptors: ['@openfn/language-common@1.0.0'],
},
],
},
});
t.is(options.sanitize, 'obfuscate');
t.is(options.runTimeoutMs, 10);
});

test('should join an run channel with a token', async (t) => {
test('should join a run channel with a token and return a raw lightning run', async (t) => {
const logger = createMockLogger();
const socket = mockSocket('www', {
'run:a': mockChannel({
// Note that the validation logic is all handled here
join: () => ({ status: 'ok' }),
[GET_PLAN]: () => ({
id: 'a',
options: { run_timeout_ms: 10 },
}),
[GET_PLAN]: () => runs['run-1'],
}),
});

const { channel, plan, options } = await joinRunChannel(
socket,
'x.y.z',
'a',
logger
);
const { channel, run } = await joinRunChannel(socket, 'x.y.z', 'a', logger);

t.truthy(channel);
t.deepEqual(plan, { id: 'a', workflow: { steps: [] }, options: {} });
t.deepEqual(options, { runTimeoutMs: 10 });
t.deepEqual(run, runs['run-1']);
});

test('should fail to join an run channel with an invalid token', async (t) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,14 @@ test('append the collections adaptor to jobs that use it', (t) => {
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('a', 'b')],
};
const { plan } = convertPlan(run as LightningPlan);
const { plan } = convertPlan(run as LightningPlan, '1.0.0');

const [_t, a, b] = plan.workflow.steps;

// @ts-ignore
t.deepEqual(a.adaptors, ['common']);
// @ts-ignore
t.deepEqual(b.adaptors, ['common', '@openfn/language-collections']);
t.deepEqual(b.adaptors, ['common', '@openfn/language-collections@1.0.0']);
});

test('append the collections credential to jobs that use it', (t) => {
Expand Down