Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support file path in job body #742

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/swift-dingos-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/cli': minor
'@openfn/deploy': minor
---

Deploy: allow job body to be loaded from a file path in workflow.yaml
24 changes: 15 additions & 9 deletions packages/cli/src/pull/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
getProject,
getSpec,
getStateFromProjectPayload,
syncRemoteSpec,
} from '@openfn/deploy';
import type { Logger } from '../util/logger';
import { PullOptions } from '../pull/command';
Expand Down Expand Up @@ -47,12 +48,6 @@ async function pullHandler(options: PullOptions, logger: Logger) {
// Build the state.json
const state = getStateFromProjectPayload(project!);

// Write the final project state to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);

logger.always('Downloading the project spec (as YAML) from the server.');
// Get the project.yaml from Lightning
const queryParams = new URLSearchParams();
Expand Down Expand Up @@ -85,9 +80,20 @@ async function pullHandler(options: PullOptions, logger: Logger) {
const resolvedPath = path.resolve(config.specPath);
logger.debug('reading spec from', resolvedPath);

// Write the yaml to disk
// @ts-ignore
await fs.writeFile(resolvedPath, res.body);
const updatedSpec = await syncRemoteSpec(
await res.text(),
state,
config,
logger
);

// Write the final project state and yaml to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);

await fs.writeFile(resolvedPath, updatedSpec);

// Read the spec back in a parsed yaml
const spec = await getSpec(resolvedPath);
Expand Down
4 changes: 3 additions & 1 deletion packages/deploy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
toProjectPayload,
getStateFromProjectPayload,
} from './stateTransform';
import { syncRemoteSpec } from './pull';
import { deployProject, getProject } from './client';
import { DeployError } from './deployError';
import { Logger } from '@openfn/logger';
Expand All @@ -33,6 +34,7 @@ export {
mergeSpecIntoState,
mergeProjectPayloadIntoState,
getStateFromProjectPayload,
syncRemoteSpec,
};

export async function getConfig(path?: string): Promise<DeployConfig> {
Expand Down Expand Up @@ -91,7 +93,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise<void> {
export async function getSpec(path: string) {
try {
const body = await readFile(path, 'utf8');
return parseAndValidate(body);
return parseAndValidate(body, path);
} catch (error: any) {
if (error.code === 'ENOENT') {
throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR');
Expand Down
122 changes: 122 additions & 0 deletions packages/deploy/src/pull.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import YAML, { Pair, Scalar, isPair, isScalar } from 'yaml';
import { DeployConfig, ProjectState, SpecJob } from './types';
import { Logger } from '@openfn/logger';
import { writeFile } from 'fs/promises';
import path from 'path';
import { getState, getSpec } from './index';

async function getAllSpecJobs(
config: DeployConfig,
logger: Logger
): Promise<SpecJob[]> {
const jobs: SpecJob[] = [];

try {
const state = await getState(config.statePath);
const spec = await getSpec(config.specPath);

for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) {
if (workflow.jobs) {
for (const [jobKey, specJob] of Object.entries(workflow.jobs)) {
const stateJob = state.workflows[workflowKey]?.jobs[jobKey];
stateJob &&
jobs.push({
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
});
}
}
}
} catch (error: any) {
logger.debug(`Could not read the spec and state: ${error.message}`);
}

return jobs;
}

async function extractJobsToDisk(
specBody: string,
state: ProjectState,
oldJobs: SpecJob[],
config: DeployConfig
): Promise<string> {
function isPairWithScalarKey(
node: any
): node is Pair & { key: Scalar & { value: string } } {
return (
isPair(node) && isScalar(node.key) && typeof node.key.value === 'string'
);
}

const doc = YAML.parseDocument(specBody);

await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (
!pair.key ||
pair.key.value !== 'body' ||
!isScalar(pair.value) ||
pairPath.length <= 6
) {
return;
}

const jobPair = pairPath[pairPath.length - 2];
const workflowPair = pairPath[pairPath.length - 6];

if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) {
return;
}

const jobKey = jobPair.key.value;
const workflowKey = workflowPair.key.value;

// find the job in the state
const stateJob = state.workflows[workflowKey]?.jobs[jobKey];

if (!stateJob) {
return;
}

// check if the state job is in the old spec jobs
const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id);

if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') {
return;
}

const oldSpecJobPath = oldSpecJob.body.path;

if (oldSpecJobPath) {
const basePath = path.dirname(config.specPath);
const resolvedPath = path.resolve(basePath, oldSpecJobPath);
await writeFile(resolvedPath, pair.value.value);

// set the body path in the spec
const map = doc.createNode({ path: oldSpecJobPath });

pair.value = map;
}
},
});

return doc.toString();
}

export async function syncRemoteSpec(
remoteSpecBody: string,
newState: ProjectState,
config: DeployConfig,
logger: Logger
): Promise<string> {
try {
const oldSpecJobs = await getAllSpecJobs(config, logger);

return extractJobsToDisk(remoteSpecBody, newState, oldSpecJobs, config);
} catch (error: any) {
logger.warn(`Could not update spec job body paths: ${error.message}`);
return remoteSpecBody;
}
}
13 changes: 11 additions & 2 deletions packages/deploy/src/stateTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ProjectSpec,
ProjectState,
SpecEdge,
SpecJobBody,
StateEdge,
WorkflowSpec,
WorkflowState,
Expand All @@ -20,6 +21,14 @@ import {
import { DeployError } from './deployError';
import { Logger } from '@openfn/logger/dist';

function stringifyJobBody(body: SpecJobBody): string {
if (typeof body === 'object') {
return body.content;
} else {
return body;
}
}

function mergeJobs(
stateJobs: WorkflowState['jobs'],
specJobs: WorkflowSpec['jobs']
Expand All @@ -33,7 +42,7 @@ function mergeJobs(
id: crypto.randomUUID(),
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand All @@ -49,7 +58,7 @@ function mergeJobs(
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand Down
22 changes: 18 additions & 4 deletions packages/deploy/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
export type Job = {
export type StateJob = {
id?: string;
name: string;
adaptor: string;
body: string;
delete?: boolean;
};

export type SpecJobBody =
| string
| {
path?: string;
content: string;
};

export type SpecJob = {
id?: string;
name: string;
adaptor: string;
body: SpecJobBody;
};

export type Trigger = {
id?: string;
type?: string;
Expand Down Expand Up @@ -38,7 +52,7 @@ export type SpecEdge = {
export type WorkflowSpec = {
id?: string;
name: string;
jobs?: Record<string | symbol, Job>;
jobs?: Record<string | symbol, SpecJob>;
triggers?: Record<string | symbol, Trigger>;
edges?: Record<string | symbol, SpecEdge>;
};
Expand All @@ -52,7 +66,7 @@ export interface ProjectSpec {
export interface WorkflowState {
id: string;
name: string;
jobs: Record<string | symbol, Concrete<Job>>;
jobs: Record<string | symbol, Concrete<StateJob>>;
triggers: Record<string | symbol, Concrete<Trigger>>;
edges: Record<string | symbol, Concrete<StateEdge>>;
delete?: boolean;
Expand All @@ -78,7 +92,7 @@ export interface ProjectPayload {
id: string;
name: string;
project_id?: string;
jobs: Concrete<Job>[];
jobs: Concrete<StateJob>[];
triggers: Concrete<Trigger>[];
edges: Concrete<StateEdge>[];
}[];
Expand Down
47 changes: 42 additions & 5 deletions packages/deploy/src/validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import YAML, { YAMLMap, isMap, isPair } from 'yaml';
import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml';
import { ProjectSpec } from './types';
import { readFile } from 'fs/promises';
import path from 'path';

export interface Error {
context: any;
Expand All @@ -8,12 +10,16 @@ export interface Error {
range?: [number, number, number];
}

export function parseAndValidate(input: string): {
export async function parseAndValidate(
input: string,
specPath: string = '.'
): Promise<{
errors: Error[];
doc: ProjectSpec;
} {
}> {
let errors: Error[] = [];
const doc = YAML.parseDocument(input);
const basePath = path.dirname(specPath);

function ensureUniqueId(key: string, arr: string[]) {
if (arr.includes(key)) {
Expand Down Expand Up @@ -97,8 +103,8 @@ export function parseAndValidate(input: string): {
}
}

YAML.visit(doc, {
Pair(_, pair: any) {
await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (pair.key && pair.key.value === 'workflows') {
if (pair.value.value === null) {
errors.push({
Expand All @@ -124,6 +130,37 @@ export function parseAndValidate(input: string): {
}
}

if (
pair.key &&
pair.key.value === 'body' &&
pairPath.length > 4 &&
isMap(pair.value)
) {
const pathValue = pair.value.get('path');
const grandPair = pairPath[pairPath.length - 4];

if (
isPair(grandPair) &&
isScalar(grandPair.key) &&
grandPair.key.value === 'jobs' &&
typeof pathValue === 'string'
) {
const filePath = path.resolve(basePath, pathValue);
try {
const content = await readFile(filePath, 'utf8');
pair.value.set('content', content);
} catch (error: any) {
errors.push({
path: `job/body/path`,
context: pair,
message: `Failed to read file ${pathValue}: ${error.message}`,
range: pair.value.range,
});
}
return undefined;
}
}

if (pair.key && pair.key.value === 'condition_expression') {
if (typeof pair.value.value !== 'string') {
pair.value.value = String(pair.value.value);
Expand Down
5 changes: 4 additions & 1 deletion packages/deploy/test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ export function fullExampleSpec() {
'job-a': {
name: 'job a',
adaptor: '@openfn/language-common@latest',
body: '',
body: {
path: 'somefile.js',
content: '',
},
},
'job-b': {
name: 'job b',
Expand Down
14 changes: 14 additions & 0 deletions packages/deploy/test/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import mock from 'mock-fs';
import path from 'node:path';

export const mockFs = (files: Record<string, string>) => {
const pnpm = path.resolve('../../node_modules/.pnpm');
mock({
[pnpm]: mock.load(pnpm, {}),
...files,
});
};

export const resetMockFs = () => {
mock.restore();
};
Loading