Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: support @local adaptors #808

Merged
merged 16 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integration-tests/worker/monorepo/packages/common/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
function fortyTwo() {
return (state) => {
state.data = 42;
return state;
};
}

module.exports = { fortyTwo };
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "@openfn/language-common",
"private": true,
"version": "1.0.0",
"dependencies": {},
"devDependencies": {}
}
26 changes: 25 additions & 1 deletion integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ test.before(async () => {
maxWorkers: 1,
repoDir: path.resolve('tmp/repo/integration'),
};
const workerArgs = { runPublicKey: keys.public };
const workerArgs = {
runPublicKey: keys.public,
monorepoDir: path.resolve('monorepo'),
};

({ worker, engine, engineLogger } = await initWorker(
lightningPort,
Expand Down Expand Up @@ -599,6 +602,27 @@ test.serial('Include timestamps on basically everything', (t) => {
});
});

test.serial('use local adaptor versions from monorepo', (t) => {
return new Promise(async (done) => {
lightning.once('run:complete', (evt) => {
const result = lightning.getResult('a1');
t.deepEqual(result, { data: 42 });
done();
});

lightning.enqueueRun({
id: 'a1',
jobs: [
{
id: 'j1',
body: 'fortyTwo()',
adaptor: '@openfn/language-common@local',
},
],
});
});
});

test.serial("Don't send adaptor logs to stdout", (t) => {
return new Promise(async (done) => {
// We have to create a new worker with a different repo for this one
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Patch Changes

- Engine: don't try to autoinstall adaptor versions with @local
- Updated dependencies [3463ff9]
- Updated dependencies [7a85894]
- Updated dependencies [b6de2c4]
Expand Down
5 changes: 5 additions & 0 deletions packages/engine-multi/src/api/autoinstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ const autoinstall = async (context: ExecutionContext): Promise<ModulePaths> => {
(context.versions[name] as string[]).push(v);
}

if (v === 'local') {
logger.info('Using local version of ', a);
continue;
}

// important: write back to paths with the RAW specifier
paths[a] = {
path: `${repoDir}/node_modules/${alias}`,
Expand Down
1 change: 1 addition & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- fd0e499: Support collections
- bcd82e9: Accept collection version at startup (as arg or auto-looked-up from npm)
- Support @local adaptor versions (which map to the monorepo)

### Patch Changes

Expand Down
9 changes: 5 additions & 4 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type ServerOptions = {
payloadLimitMb?: number; // max memory limit for socket payload (ie, step:complete, log)
collectionsVersion?: string;
collectionsUrl?: string;
monorepoDir?: string;
};

// this is the server/koa API
Expand Down Expand Up @@ -241,10 +242,10 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) {
logger
);

const { plan, options, input } = convertRun(
run,
app.options.collectionsVersion
);
const { plan, options, input } = convertRun(run, {
collectionsVersion: app.options.collectionsVersion,
monorepoPath: app.options.monorepoDir,
});
logger.debug('converted run body into execution plan:', plan);

// Setup collections
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const [minBackoff, maxBackoff] = args.backoff

function engineReady(engine: any) {
logger.debug('Creating worker instance');

const workerOptions: ServerOptions = {
port: args.port,
lightning: args.lightning,
Expand All @@ -44,6 +43,7 @@ function engineReady(engine: any) {
payloadLimitMb: args.payloadMemory,
collectionsVersion: args.collectionsVersion,
collectionsUrl: args.collectionsUrl,
monorepoDir: args.monorepoDir,
};

if (args.lightningPublicKey) {
Expand Down
32 changes: 20 additions & 12 deletions packages/ws-worker/src/util/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ const DEFAULT_SOCKET_TIMEOUT_SECONDS = 10;

type Args = {
_: string[];
port?: number;
backoff: string;
capacity?: number;
collectionsUrl?: string;
collectionsVersion?: string;
lightning?: string;
repoDir?: string;
secret?: string;
loop?: boolean;
log?: LogLevel;
lightningPublicKey?: string;
log?: LogLevel;
loop?: boolean;
maxRunDurationSeconds: number;
mock?: boolean;
backoff: string;
capacity?: number;
runMemory?: number;
monorepoDir?: string;
payloadMemory?: number;
statePropsToRemove?: string[];
maxRunDurationSeconds: number;
port?: number;
repoDir?: string;
runMemory?: number;
secret?: string;
socketTimeoutSeconds?: number;
collectionsVersion?: string;
collectionsUrl?: string;
statePropsToRemove?: string[];
};

type ArgTypes = string | string[] | number | undefined;
Expand Down Expand Up @@ -66,6 +67,7 @@ export default function parseArgs(argv: string[]): Args {
WORKER_SECRET,
WORKER_STATE_PROPS_TO_REMOVE,
WORKER_SOCKET_TIMEOUT_SECONDS,
OPENFN_ADAPTORS_REPO,
} = process.env;

const parser = yargs(hideBin(argv))
Expand All @@ -85,6 +87,11 @@ export default function parseArgs(argv: string[]): Args {
description:
'Path to the runtime repo (where modules will be installed). Env: WORKER_REPO_DIR',
})
.option('monorepo-dir', {
alias: 'm',
description:
'Path to the adaptors mono repo, from where @local adaptors will be loaded. Env: OPENFN_ADAPTORS_REPO',
})
.option('secret', {
alias: 's',
description:
Expand Down Expand Up @@ -162,6 +169,7 @@ export default function parseArgs(argv: string[]): Args {
'ws://localhost:4000/worker'
),
repoDir: setArg(args.repoDir, WORKER_REPO_DIR),
monorepoDir: setArg(args.monorepoDir, OPENFN_ADAPTORS_REPO),
secret: setArg(args.secret, WORKER_SECRET),
lightningPublicKey: setArg(
args.lightningPublicKey,
Expand Down
70 changes: 51 additions & 19 deletions packages/ws-worker/src/util/convert-lightning-plan.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import crypto from 'node:crypto';
import path from 'node:path';
import type {
Step,
StepId,
Expand All @@ -12,6 +13,7 @@ import type {
} from '@openfn/lexicon';
import { LightningPlan, LightningEdge } from '@openfn/lexicon/lightning';
import { ExecuteOptions } from '@openfn/engine-multi';
import { getNameAndVersion } from '@openfn/runtime';

export const conditions: Record<string, (upstreamId: string) => string | null> =
{
Expand Down Expand Up @@ -39,35 +41,60 @@ const mapTriggerEdgeCondition = (edge: LightningEdge) => {
return condition;
};

// This function will look at every step and decide whether the collections adaptor
// should be added to the array
const appendCollectionsAdaptor = (
plan: ExecutionPlan,
collectionsVersion: string = 'latest'
) => {
let hasCollections;
plan.workflow.steps.forEach((step) => {
const job = step as Job;
if (job.expression?.match(/(collections\.)/)) {
hasCollections = true;
job.adaptors ??= [];
job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`);
}
});
return hasCollections;
};

// Options which relate to this execution but are not part of the plan
export type WorkerRunOptions = ExecuteOptions & {
// Defaults to true - must be explicity false to stop dataclips being sent
outputDataclips?: boolean;
payloadLimitMb?: number;
};

type ConversionOptions = {
collectionsVersion?: string;
monorepoPath?: string;
};

export default (
run: LightningPlan,
collectionsVersion?: string
options: ConversionOptions = {}
): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy<State> } => {
const { collectionsVersion, monorepoPath } = options;

const appendLocalVersions = (job: Job) => {
if (monorepoPath && job.adaptors!) {
for (const adaptor of job.adaptors) {
const { name, version } = getNameAndVersion(adaptor);
if (monorepoPath && version === 'local') {
const shortName = name.replace('@openfn/language-', '');
const localPath = path.resolve(monorepoPath, 'packages', shortName);
job.linker ??= {};
job.linker[name] = {
path: localPath,
version: 'local',
};
}
}
}
return job;
};

// This function will look at every step and decide whether the collections adaptor
// should be added to the array
const appendCollectionsAdaptor = (
plan: ExecutionPlan,
collectionsVersion: string = 'latest'
) => {
let hasCollections;
plan.workflow.steps.forEach((step) => {
const job = step as Job;
if (job.expression?.match(/(collections\.)/)) {
hasCollections = true;
job.adaptors ??= [];
job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`);
}
});
return hasCollections;
};

// Some options get mapped straight through to the runtime's workflow options
const runtimeOpts: Omit<WorkflowOptions, 'timeout'> = {};

Expand Down Expand Up @@ -202,6 +229,11 @@ export default (
}
}

// Find any @local versions and set them up properly
for (const step of plan.workflow.steps) {
appendLocalVersions(step as Job);
}

return {
plan: plan as ExecutionPlan,
options: engineOpts,
Expand Down
3 changes: 2 additions & 1 deletion packages/ws-worker/test/lightning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ test.before(async () => {
secret: 'abc',
maxWorkflows: 1,
collectionsVersion: '1.0.0',
collectionsUrl: 'www',
// Note that if this is not passed,
// JWT verification will be skipped
runPublicKey: keys.public,
Expand Down Expand Up @@ -272,7 +273,7 @@ test.serial('should run a run with the collections adaptor', async (t) => {
};

lng.waitForResult(run.id).then((result: any) => {
t.is(result.collections_endpoint, urls.lng);
t.is(result.collections_endpoint, 'www');
t.is(typeof result.collections_token, 'string');
done();
});
Expand Down
50 changes: 47 additions & 3 deletions packages/ws-worker/test/util/convert-lightning-plan.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,9 @@ test('append the collections adaptor to jobs that use it', (t) => {
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('a', 'b')],
};
const { plan } = convertPlan(run as LightningPlan, '1.0.0');
const { plan } = convertPlan(run as LightningPlan, {
collectionsVersion: '1.0.0',
});

const [_t, a, b] = plan.workflow.steps;

Expand All @@ -625,13 +627,15 @@ test('append the collections credential to jobs that use it', (t) => {
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('a', 'b')],
};
const { plan } = convertPlan(run as LightningPlan, '1.0.0');
const { plan } = convertPlan(run as LightningPlan, {
collectionsVersion: '1.0.0',
});

const creds = plan.workflow.credentials;

t.deepEqual(creds, {
collections_token: true,
collections_endpoint: 'https://app.openfn.org',
collections_endpoint: true,
});
});

Expand All @@ -642,6 +646,7 @@ test("Don't set up collections if no version is passed", (t) => {
createNode({
id: 'a',
body: 'collections.each("c", "k", (state) => state)',
adaptor: 'common',
}),
],
triggers: [{ id: 't', type: 'cron' }],
Expand All @@ -652,4 +657,43 @@ test("Don't set up collections if no version is passed", (t) => {
const [_t, a] = plan.workflow.steps;

t.deepEqual((a as Job).adaptors, ['common']);
t.falsy(plan.workflow.credentials);
});

test('Use local paths', (t) => {
const run: Partial<LightningPlan> = {
id: 'w',
jobs: [
createNode({
id: 'a',
body: 'collections.each("c", "k", (state) => state)',
adaptor: 'common@local',
}),
],
triggers: [{ id: 't', type: 'cron' }],
edges: [createEdge('t', 'a')],
};

const { plan } = convertPlan(run as LightningPlan, {
collectionsVersion: 'local',
monorepoPath: '/adaptors',
});

const [_t, a] = plan.workflow.steps as any[];

t.deepEqual(a.adaptors, [
'common@local',
'@openfn/language-collections@local',
]);
t.deepEqual(a.linker, {
// The adaptor is not exapanded into long form, could be a problem
common: {
path: '/adaptors/packages/common',
version: 'local',
},
'@openfn/language-collections': {
path: '/adaptors/packages/collections',
version: 'local',
},
});
});