Skip to content

Commit

Permalink
Support file path in job body (#742)
Browse files Browse the repository at this point in the history
* support file paths

* add changeset file

* add test

* refactor for easy readability

* update fixtures to use body content

* move relevant functions to pull.ts
  • Loading branch information
midigofrank authored Aug 14, 2024
1 parent 7f14570 commit b7fc4d0
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 34 deletions.
6 changes: 6 additions & 0 deletions .changeset/swift-dingos-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/cli': minor
'@openfn/deploy': minor
---

Deploy: allow job body to be loaded from a file path in workflow.yaml
24 changes: 15 additions & 9 deletions packages/cli/src/pull/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
getProject,
getSpec,
getStateFromProjectPayload,
syncRemoteSpec,
} from '@openfn/deploy';
import type { Logger } from '../util/logger';
import { PullOptions } from '../pull/command';
Expand Down Expand Up @@ -47,12 +48,6 @@ async function pullHandler(options: PullOptions, logger: Logger) {
// Build the state.json
const state = getStateFromProjectPayload(project!);

// Write the final project state to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);

logger.always('Downloading the project spec (as YAML) from the server.');
// Get the project.yaml from Lightning
const queryParams = new URLSearchParams();
Expand Down Expand Up @@ -85,9 +80,20 @@ async function pullHandler(options: PullOptions, logger: Logger) {
const resolvedPath = path.resolve(config.specPath);
logger.debug('reading spec from', resolvedPath);

// Write the yaml to disk
// @ts-ignore
await fs.writeFile(resolvedPath, res.body);
const updatedSpec = await syncRemoteSpec(
await res.text(),
state,
config,
logger
);

// Write the final project state and yaml to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);

await fs.writeFile(resolvedPath, updatedSpec);

// Read the spec back in a parsed yaml
const spec = await getSpec(resolvedPath);
Expand Down
4 changes: 3 additions & 1 deletion packages/deploy/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
toProjectPayload,
getStateFromProjectPayload,
} from './stateTransform';
import { syncRemoteSpec } from './pull';
import { deployProject, getProject } from './client';
import { DeployError } from './deployError';
import { Logger } from '@openfn/logger';
Expand All @@ -33,6 +34,7 @@ export {
mergeSpecIntoState,
mergeProjectPayloadIntoState,
getStateFromProjectPayload,
syncRemoteSpec,
};

export async function getConfig(path?: string): Promise<DeployConfig> {
Expand Down Expand Up @@ -91,7 +93,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise<void> {
export async function getSpec(path: string) {
try {
const body = await readFile(path, 'utf8');
return parseAndValidate(body);
return parseAndValidate(body, path);
} catch (error: any) {
if (error.code === 'ENOENT') {
throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR');
Expand Down
122 changes: 122 additions & 0 deletions packages/deploy/src/pull.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import YAML, { Pair, Scalar, isPair, isScalar } from 'yaml';
import { DeployConfig, ProjectState, SpecJob } from './types';
import { Logger } from '@openfn/logger';
import { writeFile } from 'fs/promises';
import path from 'path';
import { getState, getSpec } from './index';

async function getAllSpecJobs(
config: DeployConfig,
logger: Logger
): Promise<SpecJob[]> {
const jobs: SpecJob[] = [];

try {
const state = await getState(config.statePath);
const spec = await getSpec(config.specPath);

for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) {
if (workflow.jobs) {
for (const [jobKey, specJob] of Object.entries(workflow.jobs)) {
const stateJob = state.workflows[workflowKey]?.jobs[jobKey];
stateJob &&
jobs.push({
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
});
}
}
}
} catch (error: any) {
logger.debug(`Could not read the spec and state: ${error.message}`);
}

return jobs;
}

async function extractJobsToDisk(
specBody: string,
state: ProjectState,
oldJobs: SpecJob[],
config: DeployConfig
): Promise<string> {
function isPairWithScalarKey(
node: any
): node is Pair & { key: Scalar & { value: string } } {
return (
isPair(node) && isScalar(node.key) && typeof node.key.value === 'string'
);
}

const doc = YAML.parseDocument(specBody);

await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (
!pair.key ||
pair.key.value !== 'body' ||
!isScalar(pair.value) ||
pairPath.length <= 6
) {
return;
}

const jobPair = pairPath[pairPath.length - 2];
const workflowPair = pairPath[pairPath.length - 6];

if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) {
return;
}

const jobKey = jobPair.key.value;
const workflowKey = workflowPair.key.value;

// find the job in the state
const stateJob = state.workflows[workflowKey]?.jobs[jobKey];

if (!stateJob) {
return;
}

// check if the state job is in the old spec jobs
const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id);

if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') {
return;
}

const oldSpecJobPath = oldSpecJob.body.path;

if (oldSpecJobPath) {
const basePath = path.dirname(config.specPath);
const resolvedPath = path.resolve(basePath, oldSpecJobPath);
await writeFile(resolvedPath, pair.value.value);

// set the body path in the spec
const map = doc.createNode({ path: oldSpecJobPath });

pair.value = map;
}
},
});

return doc.toString();
}

export async function syncRemoteSpec(
remoteSpecBody: string,
newState: ProjectState,
config: DeployConfig,
logger: Logger
): Promise<string> {
try {
const oldSpecJobs = await getAllSpecJobs(config, logger);

return extractJobsToDisk(remoteSpecBody, newState, oldSpecJobs, config);
} catch (error: any) {
logger.warn(`Could not update spec job body paths: ${error.message}`);
return remoteSpecBody;
}
}
13 changes: 11 additions & 2 deletions packages/deploy/src/stateTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ProjectSpec,
ProjectState,
SpecEdge,
SpecJobBody,
StateEdge,
WorkflowSpec,
WorkflowState,
Expand All @@ -20,6 +21,14 @@ import {
import { DeployError } from './deployError';
import { Logger } from '@openfn/logger/dist';

function stringifyJobBody(body: SpecJobBody): string {
if (typeof body === 'object') {
return body.content;
} else {
return body;
}
}

function mergeJobs(
stateJobs: WorkflowState['jobs'],
specJobs: WorkflowSpec['jobs']
Expand All @@ -33,7 +42,7 @@ function mergeJobs(
id: crypto.randomUUID(),
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand All @@ -49,7 +58,7 @@ function mergeJobs(
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand Down
22 changes: 18 additions & 4 deletions packages/deploy/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
export type Job = {
export type StateJob = {
id?: string;
name: string;
adaptor: string;
body: string;
delete?: boolean;
};

export type SpecJobBody =
| string
| {
path?: string;
content: string;
};

export type SpecJob = {
id?: string;
name: string;
adaptor: string;
body: SpecJobBody;
};

export type Trigger = {
id?: string;
type?: string;
Expand Down Expand Up @@ -38,7 +52,7 @@ export type SpecEdge = {
export type WorkflowSpec = {
id?: string;
name: string;
jobs?: Record<string | symbol, Job>;
jobs?: Record<string | symbol, SpecJob>;
triggers?: Record<string | symbol, Trigger>;
edges?: Record<string | symbol, SpecEdge>;
};
Expand All @@ -52,7 +66,7 @@ export interface ProjectSpec {
export interface WorkflowState {
id: string;
name: string;
jobs: Record<string | symbol, Concrete<Job>>;
jobs: Record<string | symbol, Concrete<StateJob>>;
triggers: Record<string | symbol, Concrete<Trigger>>;
edges: Record<string | symbol, Concrete<StateEdge>>;
delete?: boolean;
Expand All @@ -78,7 +92,7 @@ export interface ProjectPayload {
id: string;
name: string;
project_id?: string;
jobs: Concrete<Job>[];
jobs: Concrete<StateJob>[];
triggers: Concrete<Trigger>[];
edges: Concrete<StateEdge>[];
}[];
Expand Down
47 changes: 42 additions & 5 deletions packages/deploy/src/validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import YAML, { YAMLMap, isMap, isPair } from 'yaml';
import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml';
import { ProjectSpec } from './types';
import { readFile } from 'fs/promises';
import path from 'path';

export interface Error {
context: any;
Expand All @@ -8,12 +10,16 @@ export interface Error {
range?: [number, number, number];
}

export function parseAndValidate(input: string): {
export async function parseAndValidate(
input: string,
specPath: string = '.'
): Promise<{
errors: Error[];
doc: ProjectSpec;
} {
}> {
let errors: Error[] = [];
const doc = YAML.parseDocument(input);
const basePath = path.dirname(specPath);

function ensureUniqueId(key: string, arr: string[]) {
if (arr.includes(key)) {
Expand Down Expand Up @@ -97,8 +103,8 @@ export function parseAndValidate(input: string): {
}
}

YAML.visit(doc, {
Pair(_, pair: any) {
await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (pair.key && pair.key.value === 'workflows') {
if (pair.value.value === null) {
errors.push({
Expand All @@ -124,6 +130,37 @@ export function parseAndValidate(input: string): {
}
}

if (
pair.key &&
pair.key.value === 'body' &&
pairPath.length > 4 &&
isMap(pair.value)
) {
const pathValue = pair.value.get('path');
const grandPair = pairPath[pairPath.length - 4];

if (
isPair(grandPair) &&
isScalar(grandPair.key) &&
grandPair.key.value === 'jobs' &&
typeof pathValue === 'string'
) {
const filePath = path.resolve(basePath, pathValue);
try {
const content = await readFile(filePath, 'utf8');
pair.value.set('content', content);
} catch (error: any) {
errors.push({
path: `job/body/path`,
context: pair,
message: `Failed to read file ${pathValue}: ${error.message}`,
range: pair.value.range,
});
}
return undefined;
}
}

if (pair.key && pair.key.value === 'condition_expression') {
if (typeof pair.value.value !== 'string') {
pair.value.value = String(pair.value.value);
Expand Down
5 changes: 4 additions & 1 deletion packages/deploy/test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ export function fullExampleSpec() {
'job-a': {
name: 'job a',
adaptor: '@openfn/language-common@latest',
body: '',
body: {
path: 'somefile.js',
content: '',
},
},
'job-b': {
name: 'job b',
Expand Down
14 changes: 14 additions & 0 deletions packages/deploy/test/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import mock from 'mock-fs';
import path from 'node:path';

export const mockFs = (files: Record<string, string>) => {
const pnpm = path.resolve('../../node_modules/.pnpm');
mock({
[pnpm]: mock.load(pnpm, {}),
...files,
});
};

export const resetMockFs = () => {
mock.restore();
};
Loading

0 comments on commit b7fc4d0

Please sign in to comment.