Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support file path in job body #742

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/swift-dingos-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/cli': minor
'@openfn/deploy': minor
---

Support file paths for job body
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a slightly more user friendly release note for the CLI? Just needs a bit more context. Deploy: allow job body to be loaded from a file path in workflow.yaml would probably do

23 changes: 16 additions & 7 deletions packages/cli/src/pull/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
getProject,
getSpec,
getStateFromProjectPayload,
updatePulledSpecJobBodyPath,
} from '@openfn/deploy';
import type { Logger } from '../util/logger';
import { PullOptions } from '../pull/command';
Expand Down Expand Up @@ -47,11 +48,7 @@ async function pullHandler(options: PullOptions, logger: Logger) {
// Build the state.json
const state = getStateFromProjectPayload(project!);

// Write the final project state to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);
// defer writing to disk until we have the spec
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this comment to stay in the source, it's a bit confusing without the diff tbh


logger.always('Downloading the project spec (as YAML) from the server.');
// Get the project.yaml from Lightning
Expand Down Expand Up @@ -85,9 +82,21 @@ async function pullHandler(options: PullOptions, logger: Logger) {
const resolvedPath = path.resolve(config.specPath);
logger.debug('reading spec from', resolvedPath);

// Write the yaml to disk
// @ts-ignore
await fs.writeFile(resolvedPath, res.body);
const updatedSpec = await updatePulledSpecJobBodyPath(
await res.text(),
state,
config,
logger
);

// Write the final project state and yaml to disk
await fs.writeFile(
path.resolve(config.statePath),
JSON.stringify(state, null, 2)
);

await fs.writeFile(resolvedPath, updatedSpec);

// Read the spec back in a parsed yaml
const spec = await getSpec(resolvedPath);
Expand Down
55 changes: 52 additions & 3 deletions packages/deploy/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { confirm } from '@inquirer/prompts';
import { inspect } from 'node:util';
import { DeployConfig, ProjectState } from './types';
import { DeployConfig, ProjectState, SpecJob } from './types';
import { readFile, writeFile } from 'fs/promises';
import { parseAndValidate } from './validator';
import { parseAndValidate, addSpecJobBodyPath } from './validator';
import jsondiff from 'json-diff';
import {
mergeProjectPayloadIntoState,
Expand Down Expand Up @@ -91,7 +91,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise<void> {
export async function getSpec(path: string) {
try {
const body = await readFile(path, 'utf8');
return parseAndValidate(body);
return await parseAndValidate(body, path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't return and await please!

An async function returns a promise, so you can just return the parseAndValidate promise straight out of the function

} catch (error: any) {
if (error.code === 'ENOENT') {
throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR');
Expand All @@ -103,6 +103,55 @@ export async function getSpec(path: string) {

// =============================================

async function getAllSpecJobs(
config: DeployConfig,
logger: Logger
): Promise<SpecJob[]> {
const jobs: SpecJob[] = [];

try {
const [state, spec] = await Promise.all([
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't think this is cleaner?

const state = await  getState(config.statePath)
const spec = await getSpec(config.specPath)

Up to you!

getState(config.statePath),
getSpec(config.specPath),
]);

for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) {
if (workflow.jobs) {
for (const [jobKey, specJob] of Object.entries(workflow.jobs)) {
const stateJob = state.workflows[workflowKey]?.jobs[jobKey];
stateJob &&
jobs.push({
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
});
}
}
}
} catch (error: any) {
logger.debug(`Could not read the spec and state: ${error.message}`);
}

return jobs;
}

export async function updatePulledSpecJobBodyPath(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask for a better function name here? And maybe on addSpecJobBodyPath ?

This is really complex stuff and it's taken me ages to get any kind of feeling for what's happening here 😅

Basically what these functions are doing is pulling out job bodies from the newly pulled spec and writing them to disk if the state (?) includes a path. Right?

So something like extractJobsToDisk might be a better name for addSpecJobBodyPath

Probably updatePulledSpecJobBodyPath should be moved to a new file packages/deploy/src/pull.ts, since it only affects pulls the context helps understandability. A name like syncRemoteSpec or something might be a good general name for what this is trying to do

newSpecBody: string,
newState: ProjectState,
config: DeployConfig,
logger: Logger
): Promise<string> {
try {
const oldSpecJobs = await getAllSpecJobs(config, logger);

return await addSpecJobBodyPath(newSpecBody, newState, oldSpecJobs, config);
} catch (error: any) {
logger.warn(`Could not update spec job body paths: ${error.message}`);
return newSpecBody;
}
}

export async function deploy(config: DeployConfig, logger: Logger) {
const [state, spec] = await Promise.all([
getState(config.statePath),
Expand Down
13 changes: 11 additions & 2 deletions packages/deploy/src/stateTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ProjectSpec,
ProjectState,
SpecEdge,
SpecJobBody,
StateEdge,
WorkflowSpec,
WorkflowState,
Expand All @@ -20,6 +21,14 @@ import {
import { DeployError } from './deployError';
import { Logger } from '@openfn/logger/dist';

function stringifyJobBody(body: SpecJobBody): string {
if (typeof body === 'object') {
return body.content;
} else {
return body;
}
}

function mergeJobs(
stateJobs: WorkflowState['jobs'],
specJobs: WorkflowSpec['jobs']
Expand All @@ -33,7 +42,7 @@ function mergeJobs(
id: crypto.randomUUID(),
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand All @@ -49,7 +58,7 @@ function mergeJobs(
id: stateJob.id,
name: specJob.name,
adaptor: specJob.adaptor,
body: specJob.body,
body: stringifyJobBody(specJob.body),
},
];
}
Expand Down
22 changes: 18 additions & 4 deletions packages/deploy/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
export type Job = {
export type StateJob = {
id?: string;
name: string;
adaptor: string;
body: string;
delete?: boolean;
};

export type SpecJobBody =
| string
| {
path?: string;
content: string;
};

export type SpecJob = {
id?: string;
name: string;
adaptor: string;
body: SpecJobBody;
};

export type Trigger = {
id?: string;
type?: string;
Expand Down Expand Up @@ -38,7 +52,7 @@ export type SpecEdge = {
export type WorkflowSpec = {
id?: string;
name: string;
jobs?: Record<string | symbol, Job>;
jobs?: Record<string | symbol, SpecJob>;
triggers?: Record<string | symbol, Trigger>;
edges?: Record<string | symbol, SpecEdge>;
};
Expand All @@ -52,7 +66,7 @@ export interface ProjectSpec {
export interface WorkflowState {
id: string;
name: string;
jobs: Record<string | symbol, Concrete<Job>>;
jobs: Record<string | symbol, Concrete<StateJob>>;
triggers: Record<string | symbol, Concrete<Trigger>>;
edges: Record<string | symbol, Concrete<StateEdge>>;
delete?: boolean;
Expand All @@ -78,7 +92,7 @@ export interface ProjectPayload {
id: string;
name: string;
project_id?: string;
jobs: Concrete<Job>[];
jobs: Concrete<StateJob>[];
triggers: Concrete<Trigger>[];
edges: Concrete<StateEdge>[];
}[];
Expand Down
107 changes: 101 additions & 6 deletions packages/deploy/src/validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import YAML, { YAMLMap, isMap, isPair } from 'yaml';
import { ProjectSpec } from './types';
import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml';
import { DeployConfig, ProjectSpec, ProjectState, SpecJob } from './types';
import { readFile, writeFile } from 'fs/promises';
import path from 'path';

export interface Error {
context: any;
Expand All @@ -8,12 +10,16 @@ export interface Error {
range?: [number, number, number];
}

export function parseAndValidate(input: string): {
export async function parseAndValidate(
input: string,
specPath: string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should default specPath to '.' - that way you don't break unit tests which don't use it

): Promise<{
errors: Error[];
doc: ProjectSpec;
} {
}> {
let errors: Error[] = [];
const doc = YAML.parseDocument(input);
const basePath = path.dirname(specPath);

function ensureUniqueId(key: string, arr: string[]) {
if (arr.includes(key)) {
Expand Down Expand Up @@ -97,8 +103,8 @@ export function parseAndValidate(input: string): {
}
}

YAML.visit(doc, {
Pair(_, pair: any) {
await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (pair.key && pair.key.value === 'workflows') {
if (pair.value.value === null) {
errors.push({
Expand All @@ -124,6 +130,37 @@ export function parseAndValidate(input: string): {
}
}

if (
pair.key &&
pair.key.value === 'body' &&
pairPath.length > 4 &&
isMap(pair.value)
) {
const pathValue = pair.value.get('path');
const grandPair = pairPath[pairPath.length - 4];

if (
isPair(grandPair) &&
isScalar(grandPair.key) &&
grandPair.key.value === 'jobs' &&
typeof pathValue === 'string'
) {
const filePath = path.resolve(basePath, pathValue);
try {
const content = await readFile(filePath, 'utf8');
pair.value.set('content', content);
} catch (error: any) {
errors.push({
path: `job/body/path`,
context: pair,
message: `Failed to read file ${pathValue}: ${error.message}`,
range: pair.value.range,
});
}
return undefined;
}
}

if (pair.key && pair.key.value === 'condition_expression') {
if (typeof pair.value.value !== 'string') {
pair.value.value = String(pair.value.value);
Expand All @@ -148,3 +185,61 @@ export function parseAndValidate(input: string): {

return { errors, doc: doc.toJSON() as ProjectSpec };
}

export async function addSpecJobBodyPath(
specBody: string,
state: ProjectState,
oldJobs: SpecJob[],
config: DeployConfig
): Promise<string> {
const doc = YAML.parseDocument(specBody);

await YAML.visitAsync(doc, {
async Pair(_, pair: any, pairPath) {
if (
pair.key &&
pair.key.value === 'body' &&
isScalar(pair.value) &&
pairPath.length > 6
) {
// the job
const job = pairPath[pairPath.length - 2];
const jobKey = isPair(job) && isScalar(job.key) && job.key.value;
// the workflow
const workflow = pairPath[pairPath.length - 6];
const workflowKey =
isPair(workflow) && isScalar(workflow.key) && workflow.key.value;

// find the job in the state
const stateJob =
typeof jobKey === 'string' &&
typeof workflowKey === 'string' &&
state.workflows[workflowKey]?.jobs[jobKey];

// check if the state job is in the old spec jobs
const oldSpecJob =
stateJob && oldJobs.find((job) => job.id === stateJob.id);

// get the file path from the old spec job
const oldSpecJobPath =
oldSpecJob &&
typeof oldSpecJob?.body === 'object' &&
oldSpecJob.body.path;

if (oldSpecJobPath) {
const basePath = path.dirname(config.specPath);
const resolvedPath = path.resolve(basePath, oldSpecJobPath);
await writeFile(resolvedPath, pair.value.value);

// set the body path in the spec
const map = doc.createNode({ path: oldSpecJobPath });

pair.value = map;
}
}
return undefined;
},
});

return doc.toString();
}
Loading