From 69a37b3039ab918d72d4b77d182570ccaaa1d289 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Fri, 9 Aug 2024 23:10:08 +0300 Subject: [PATCH 1/6] support file paths --- packages/cli/src/pull/handler.ts | 23 ++++-- packages/deploy/src/index.ts | 55 ++++++++++++- packages/deploy/src/stateTransform.ts | 13 +++- packages/deploy/src/types.ts | 22 +++++- packages/deploy/src/validator.ts | 107 ++++++++++++++++++++++++-- 5 files changed, 198 insertions(+), 22 deletions(-) diff --git a/packages/cli/src/pull/handler.ts b/packages/cli/src/pull/handler.ts index f8087fea7..5f9e1ca84 100644 --- a/packages/cli/src/pull/handler.ts +++ b/packages/cli/src/pull/handler.ts @@ -6,6 +6,7 @@ import { getProject, getSpec, getStateFromProjectPayload, + updatePulledSpecJobBodyPath, } from '@openfn/deploy'; import type { Logger } from '../util/logger'; import { PullOptions } from '../pull/command'; @@ -47,11 +48,7 @@ async function pullHandler(options: PullOptions, logger: Logger) { // Build the state.json const state = getStateFromProjectPayload(project!); - // Write the final project state to disk - await fs.writeFile( - path.resolve(config.statePath), - JSON.stringify(state, null, 2) - ); + // defer writing to disk until we have the spec logger.always('Downloading the project spec (as YAML) from the server.'); // Get the project.yaml from Lightning @@ -85,9 +82,21 @@ async function pullHandler(options: PullOptions, logger: Logger) { const resolvedPath = path.resolve(config.specPath); logger.debug('reading spec from', resolvedPath); - // Write the yaml to disk // @ts-ignore - await fs.writeFile(resolvedPath, res.body); + const updatedSpec = await updatePulledSpecJobBodyPath( + await res.text(), + state, + config, + logger + ); + + // Write the final project state and yaml to disk + await fs.writeFile( + path.resolve(config.statePath), + JSON.stringify(state, null, 2) + ); + + await fs.writeFile(resolvedPath, updatedSpec); // Read the spec back in a parsed yaml const spec = await getSpec(resolvedPath); diff --git a/packages/deploy/src/index.ts b/packages/deploy/src/index.ts index 9a4877688..d0943bd6f 100644 --- a/packages/deploy/src/index.ts +++ b/packages/deploy/src/index.ts @@ -1,8 +1,8 @@ import { confirm } from '@inquirer/prompts'; import { inspect } from 'node:util'; -import { DeployConfig, ProjectState } from './types'; +import { DeployConfig, ProjectState, SpecJob } from './types'; import { readFile, writeFile } from 'fs/promises'; -import { parseAndValidate } from './validator'; +import { parseAndValidate, addSpecJobBodyPath } from './validator'; import jsondiff from 'json-diff'; import { mergeProjectPayloadIntoState, @@ -91,7 +91,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise { export async function getSpec(path: string) { try { const body = await readFile(path, 'utf8'); - return parseAndValidate(body); + return await parseAndValidate(body, path); } catch (error: any) { if (error.code === 'ENOENT') { throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR'); @@ -103,6 +103,55 @@ export async function getSpec(path: string) { // ============================================= +async function getAllSpecJobs( + config: DeployConfig, + logger: Logger +): Promise { + const jobs: SpecJob[] = []; + + try { + const [state, spec] = await Promise.all([ + getState(config.statePath), + getSpec(config.specPath), + ]); + + for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) { + if (workflow.jobs) { + for (const [jobKey, specJob] of Object.entries(workflow.jobs)) { + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + stateJob && + jobs.push({ + id: stateJob.id, + name: specJob.name, + adaptor: specJob.adaptor, + body: specJob.body, + }); + } + } + } + } catch (error: any) { + logger.debug(`Could not read the spec and state: ${error.message}`); + } + + return jobs; +} + +export async function updatePulledSpecJobBodyPath( + newSpecBody: string, + newState: ProjectState, + config: DeployConfig, + logger: Logger +): Promise { + try { + const oldSpecJobs = await getAllSpecJobs(config, logger); + + return await addSpecJobBodyPath(newSpecBody, newState, oldSpecJobs, config); + } catch (error: any) { + logger.warn(`Could not update spec job body paths: ${error.message}`); + return newSpecBody; + } +} + export async function deploy(config: DeployConfig, logger: Logger) { const [state, spec] = await Promise.all([ getState(config.statePath), diff --git a/packages/deploy/src/stateTransform.ts b/packages/deploy/src/stateTransform.ts index 62ca4c21a..d1b11b936 100644 --- a/packages/deploy/src/stateTransform.ts +++ b/packages/deploy/src/stateTransform.ts @@ -5,6 +5,7 @@ import { ProjectSpec, ProjectState, SpecEdge, + SpecJobBody, StateEdge, WorkflowSpec, WorkflowState, @@ -20,6 +21,14 @@ import { import { DeployError } from './deployError'; import { Logger } from '@openfn/logger/dist'; +function stringifyJobBody(body: SpecJobBody): string { + if (typeof body === 'object') { + return body.content; + } else { + return body; + } +} + function mergeJobs( stateJobs: WorkflowState['jobs'], specJobs: WorkflowSpec['jobs'] @@ -33,7 +42,7 @@ function mergeJobs( id: crypto.randomUUID(), name: specJob.name, adaptor: specJob.adaptor, - body: specJob.body, + body: stringifyJobBody(specJob.body), }, ]; } @@ -49,7 +58,7 @@ function mergeJobs( id: stateJob.id, name: specJob.name, adaptor: specJob.adaptor, - body: specJob.body, + body: stringifyJobBody(specJob.body), }, ]; } diff --git a/packages/deploy/src/types.ts b/packages/deploy/src/types.ts index 754307b8e..a4ea7f52d 100644 --- a/packages/deploy/src/types.ts +++ b/packages/deploy/src/types.ts @@ -1,4 +1,4 @@ -export type Job = { +export type StateJob = { id?: string; name: string; adaptor: string; @@ -6,6 +6,20 @@ export type Job = { delete?: boolean; }; +export type SpecJobBody = + | string + | { + path?: string; + content: string; + }; + +export type SpecJob = { + id?: string; + name: string; + adaptor: string; + body: SpecJobBody; +}; + export type Trigger = { id?: string; type?: string; @@ -38,7 +52,7 @@ export type SpecEdge = { export type WorkflowSpec = { id?: string; name: string; - jobs?: Record; + jobs?: Record; triggers?: Record; edges?: Record; }; @@ -52,7 +66,7 @@ export interface ProjectSpec { export interface WorkflowState { id: string; name: string; - jobs: Record>; + jobs: Record>; triggers: Record>; edges: Record>; delete?: boolean; @@ -78,7 +92,7 @@ export interface ProjectPayload { id: string; name: string; project_id?: string; - jobs: Concrete[]; + jobs: Concrete[]; triggers: Concrete[]; edges: Concrete[]; }[]; diff --git a/packages/deploy/src/validator.ts b/packages/deploy/src/validator.ts index 6c93fa90a..c95aecb8b 100644 --- a/packages/deploy/src/validator.ts +++ b/packages/deploy/src/validator.ts @@ -1,5 +1,7 @@ -import YAML, { YAMLMap, isMap, isPair } from 'yaml'; -import { ProjectSpec } from './types'; +import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml'; +import { DeployConfig, ProjectSpec, ProjectState, SpecJob } from './types'; +import { readFile, writeFile } from 'fs/promises'; +import path from 'path'; export interface Error { context: any; @@ -8,12 +10,16 @@ export interface Error { range?: [number, number, number]; } -export function parseAndValidate(input: string): { +export async function parseAndValidate( + input: string, + specPath: string +): Promise<{ errors: Error[]; doc: ProjectSpec; -} { +}> { let errors: Error[] = []; const doc = YAML.parseDocument(input); + const basePath = path.dirname(specPath); function ensureUniqueId(key: string, arr: string[]) { if (arr.includes(key)) { @@ -97,8 +103,8 @@ export function parseAndValidate(input: string): { } } - YAML.visit(doc, { - Pair(_, pair: any) { + await YAML.visitAsync(doc, { + async Pair(_, pair: any, pairPath) { if (pair.key && pair.key.value === 'workflows') { if (pair.value.value === null) { errors.push({ @@ -124,6 +130,37 @@ export function parseAndValidate(input: string): { } } + if ( + pair.key && + pair.key.value === 'body' && + pairPath.length > 4 && + isMap(pair.value) + ) { + const pathValue = pair.value.get('path'); + const grandPair = pairPath[pairPath.length - 4]; + + if ( + isPair(grandPair) && + isScalar(grandPair.key) && + grandPair.key.value === 'jobs' && + typeof pathValue === 'string' + ) { + const filePath = path.resolve(basePath, pathValue); + try { + const content = await readFile(filePath, 'utf8'); + pair.value.set('content', content); + } catch (error: any) { + errors.push({ + path: `job/body/path`, + context: pair, + message: `Failed to read file ${pathValue}: ${error.message}`, + range: pair.value.range, + }); + } + return undefined; + } + } + if (pair.key && pair.key.value === 'condition_expression') { if (typeof pair.value.value !== 'string') { pair.value.value = String(pair.value.value); @@ -148,3 +185,61 @@ export function parseAndValidate(input: string): { return { errors, doc: doc.toJSON() as ProjectSpec }; } + +export async function addSpecJobBodyPath( + specBody: string, + state: ProjectState, + oldJobs: SpecJob[], + config: DeployConfig +): Promise { + const doc = YAML.parseDocument(specBody); + + await YAML.visitAsync(doc, { + async Pair(_, pair: any, pairPath) { + if ( + pair.key && + pair.key.value === 'body' && + isScalar(pair.value) && + pairPath.length > 6 + ) { + // the job + const job = pairPath[pairPath.length - 2]; + const jobKey = isPair(job) && isScalar(job.key) && job.key.value; + // the workflow + const workflow = pairPath[pairPath.length - 6]; + const workflowKey = + isPair(workflow) && isScalar(workflow.key) && workflow.key.value; + + // find the job in the state + const stateJob = + typeof jobKey === 'string' && + typeof workflowKey === 'string' && + state.workflows[workflowKey]?.jobs[jobKey]; + + // check if the state job is in the old spec jobs + const oldSpecJob = + stateJob && oldJobs.find((job) => job.id === stateJob.id); + + // get the file path from the old spec job + const oldSpecJobPath = + oldSpecJob && + typeof oldSpecJob?.body === 'object' && + oldSpecJob.body.path; + + if (oldSpecJobPath) { + const basePath = path.dirname(config.specPath); + const resolvedPath = path.resolve(basePath, oldSpecJobPath); + await writeFile(resolvedPath, pair.value.value); + + // set the body path in the spec + const map = doc.createNode({ path: oldSpecJobPath }); + + pair.value = map; + } + } + return undefined; + }, + }); + + return doc.toString(); +} From 2ca8bab8607c8d07f54a4594cee8f0f8a0f499f2 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Fri, 9 Aug 2024 23:15:59 +0300 Subject: [PATCH 2/6] add changeset file --- .changeset/swift-dingos-know.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/swift-dingos-know.md diff --git a/.changeset/swift-dingos-know.md b/.changeset/swift-dingos-know.md new file mode 100644 index 000000000..b10a90468 --- /dev/null +++ b/.changeset/swift-dingos-know.md @@ -0,0 +1,6 @@ +--- +'@openfn/cli': minor +'@openfn/deploy': minor +--- + +Support file paths for job body From 3440a4f4e79fe2b729f654fea7b2e7dc123fd0cd Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Sat, 10 Aug 2024 00:23:53 +0300 Subject: [PATCH 3/6] add test --- packages/deploy/test/validator.test.ts | 67 +++++++++++++++++++++----- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/packages/deploy/test/validator.test.ts b/packages/deploy/test/validator.test.ts index 18cdf6d23..67f8cbd38 100644 --- a/packages/deploy/test/validator.test.ts +++ b/packages/deploy/test/validator.test.ts @@ -1,11 +1,23 @@ import test from 'ava'; import { parseAndValidate } from '../src/validator'; +import fs from 'fs/promises'; +import path from 'path'; + +// Helper to create and clean up temporary test files +const createTempFile = async (t, content: string, ext = 'txt') => { + const fileName = `${t.title.replace(/\s+/g, '_')}-${Math.floor( + Math.random() * 20 + )}.${ext}`; + const filePath = path.resolve(fileName); + await fs.writeFile(filePath, content); + return filePath; +}; function findError(errors: any[], message: string) { return errors.find((e) => e.message === message); } -test('Workflows must be a map', (t) => { +test('Workflows must be a map', async (t) => { const doc = ` name: project-name workflows: @@ -13,7 +25,7 @@ test('Workflows must be a map', (t) => { - name: workflow-two `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'must be a map'); @@ -21,7 +33,7 @@ test('Workflows must be a map', (t) => { t.is(err.path, 'workflows'); }); -test('Workflows must have unique ids', (t) => { +test('Workflows must have unique ids', async (t) => { const doc = ` name: project-name workflows: @@ -33,14 +45,14 @@ test('Workflows must have unique ids', (t) => { name: workflow three `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'duplicate id: workflow-one'); t.truthy(err); t.is(err.path, 'workflow-one'); }); -test('Jobs must have unique ids within a workflow', (t) => { +test('Jobs must have unique ids within a workflow', async (t) => { const doc = ` name: project-name workflows: @@ -52,14 +64,14 @@ test('Jobs must have unique ids within a workflow', (t) => { bar: `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); const err = findError(results.errors, 'duplicate id: foo'); t.is(err.path, 'workflow-two/foo'); t.truthy(err); }); -test('Job ids can duplicate across workflows', (t) => { +test('Job ids can duplicate across workflows', async (t) => { const doc = ` name: project-name workflows: @@ -73,12 +85,12 @@ test('Job ids can duplicate across workflows', (t) => { foo: `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); t.is(results.errors.length, 0); }); -test('Workflow edges are parsed correctly', (t) => { +test('Workflow edges are parsed correctly', async (t) => { const doc = ` name: project-name workflows: @@ -101,7 +113,7 @@ test('Workflow edges are parsed correctly', (t) => { condition_expression: true `; - const results = parseAndValidate(doc); + const results = await parseAndValidate(doc, 'spec.yaml'); t.assert( results.doc.workflows['workflow-one'].edges![ @@ -110,12 +122,12 @@ test('Workflow edges are parsed correctly', (t) => { ); }); -test('allow empty workflows', (t) => { +test('allow empty workflows', async (t) => { let doc = ` name: project-name `; - const result = parseAndValidate(doc); + const result = await parseAndValidate(doc, 'spec.yaml'); t.is(result.errors.length, 0); @@ -124,3 +136,34 @@ test('allow empty workflows', (t) => { workflows: {}, }); }); + +test('adds the file content into the job body from the specified path', async (t) => { + // Step 1: Create a temporary file that the YAML will reference + const fileContent = 'fn(state => state.data);'; + const filePath = await createTempFile(t, fileContent); + + // Step 2: YAML document that references the file + const doc = ` + name: project-name + workflows: + workflow-one: + name: workflow one + jobs: + job-one: + name: job one + adaptor: '@openfn/language-http@latest' + body: + path: ${path.basename(filePath)} + `; + + // Step 3: Run the parseAndValidate function + const results = await parseAndValidate(doc, 'spec.yaml'); + + // Step 4: Assert that the content from the file was merged into the spec + const jobBody = results.doc.workflows['workflow-one'].jobs!['job-one'].body; + + t.is(jobBody.content, fileContent); + + // Cleanup + await fs.rm(filePath); +}); From ba6251af540316355f22651a0a4202182ddbdfa8 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Sun, 11 Aug 2024 17:53:29 +0300 Subject: [PATCH 4/6] refactor for easy readability --- packages/deploy/src/validator.ts | 91 ++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/packages/deploy/src/validator.ts b/packages/deploy/src/validator.ts index c95aecb8b..90786bd56 100644 --- a/packages/deploy/src/validator.ts +++ b/packages/deploy/src/validator.ts @@ -1,4 +1,4 @@ -import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml'; +import YAML, { YAMLMap, Pair, Scalar, isMap, isPair, isScalar } from 'yaml'; import { DeployConfig, ProjectSpec, ProjectState, SpecJob } from './types'; import { readFile, writeFile } from 'fs/promises'; import path from 'path'; @@ -192,52 +192,63 @@ export async function addSpecJobBodyPath( oldJobs: SpecJob[], config: DeployConfig ): Promise { + function isPairWithScalarKey( + node: any + ): node is Pair & { key: Scalar & { value: string } } { + return ( + isPair(node) && isScalar(node.key) && typeof node.key.value === 'string' + ); + } + const doc = YAML.parseDocument(specBody); await YAML.visitAsync(doc, { async Pair(_, pair: any, pairPath) { if ( - pair.key && - pair.key.value === 'body' && - isScalar(pair.value) && - pairPath.length > 6 + !pair.key || + pair.key.value !== 'body' || + !isScalar(pair.value) || + pairPath.length <= 6 ) { - // the job - const job = pairPath[pairPath.length - 2]; - const jobKey = isPair(job) && isScalar(job.key) && job.key.value; - // the workflow - const workflow = pairPath[pairPath.length - 6]; - const workflowKey = - isPair(workflow) && isScalar(workflow.key) && workflow.key.value; - - // find the job in the state - const stateJob = - typeof jobKey === 'string' && - typeof workflowKey === 'string' && - state.workflows[workflowKey]?.jobs[jobKey]; - - // check if the state job is in the old spec jobs - const oldSpecJob = - stateJob && oldJobs.find((job) => job.id === stateJob.id); - - // get the file path from the old spec job - const oldSpecJobPath = - oldSpecJob && - typeof oldSpecJob?.body === 'object' && - oldSpecJob.body.path; - - if (oldSpecJobPath) { - const basePath = path.dirname(config.specPath); - const resolvedPath = path.resolve(basePath, oldSpecJobPath); - await writeFile(resolvedPath, pair.value.value); - - // set the body path in the spec - const map = doc.createNode({ path: oldSpecJobPath }); - - pair.value = map; - } + return; + } + + const jobPair = pairPath[pairPath.length - 2]; + const workflowPair = pairPath[pairPath.length - 6]; + + if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) { + return; + } + + const jobKey = jobPair.key.value; + const workflowKey = workflowPair.key.value; + + // find the job in the state + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + + if (!stateJob) { + return; + } + + // check if the state job is in the old spec jobs + const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id); + + if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') { + return; + } + + const oldSpecJobPath = oldSpecJob.body.path; + + if (oldSpecJobPath) { + const basePath = path.dirname(config.specPath); + const resolvedPath = path.resolve(basePath, oldSpecJobPath); + await writeFile(resolvedPath, pair.value.value); + + // set the body path in the spec + const map = doc.createNode({ path: oldSpecJobPath }); + + pair.value = map; } - return undefined; }, }); From b51407d271a30cc9169462eaa11654f0608d634c Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Sun, 11 Aug 2024 18:03:06 +0300 Subject: [PATCH 5/6] update fixtures to use body content --- packages/deploy/test/fixtures.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/deploy/test/fixtures.ts b/packages/deploy/test/fixtures.ts index a639f7b2a..c97ed0182 100644 --- a/packages/deploy/test/fixtures.ts +++ b/packages/deploy/test/fixtures.ts @@ -11,7 +11,10 @@ export function fullExampleSpec() { 'job-a': { name: 'job a', adaptor: '@openfn/language-common@latest', - body: '', + body: { + path: 'somefile.js', + content: '', + }, }, 'job-b': { name: 'job b', From 222d05f60243b43ab714855380fde939bcb1d7c0 Mon Sep 17 00:00:00 2001 From: Frank Midigo Date: Wed, 14 Aug 2024 11:16:39 +0300 Subject: [PATCH 6/6] move relevant functions to pull.ts --- .changeset/swift-dingos-know.md | 2 +- packages/cli/src/pull/handler.ts | 7 +- packages/deploy/src/index.ts | 57 +----------- packages/deploy/src/pull.ts | 122 +++++++++++++++++++++++++ packages/deploy/src/validator.ts | 77 +--------------- packages/deploy/test/util.ts | 14 +++ packages/deploy/test/validator.test.ts | 24 ++--- 7 files changed, 155 insertions(+), 148 deletions(-) create mode 100644 packages/deploy/src/pull.ts create mode 100644 packages/deploy/test/util.ts diff --git a/.changeset/swift-dingos-know.md b/.changeset/swift-dingos-know.md index b10a90468..936a56b3a 100644 --- a/.changeset/swift-dingos-know.md +++ b/.changeset/swift-dingos-know.md @@ -3,4 +3,4 @@ '@openfn/deploy': minor --- -Support file paths for job body +Deploy: allow job body to be loaded from a file path in workflow.yaml diff --git a/packages/cli/src/pull/handler.ts b/packages/cli/src/pull/handler.ts index 5f9e1ca84..fab775124 100644 --- a/packages/cli/src/pull/handler.ts +++ b/packages/cli/src/pull/handler.ts @@ -6,7 +6,7 @@ import { getProject, getSpec, getStateFromProjectPayload, - updatePulledSpecJobBodyPath, + syncRemoteSpec, } from '@openfn/deploy'; import type { Logger } from '../util/logger'; import { PullOptions } from '../pull/command'; @@ -48,8 +48,6 @@ async function pullHandler(options: PullOptions, logger: Logger) { // Build the state.json const state = getStateFromProjectPayload(project!); - // defer writing to disk until we have the spec - logger.always('Downloading the project spec (as YAML) from the server.'); // Get the project.yaml from Lightning const queryParams = new URLSearchParams(); @@ -82,8 +80,7 @@ async function pullHandler(options: PullOptions, logger: Logger) { const resolvedPath = path.resolve(config.specPath); logger.debug('reading spec from', resolvedPath); - // @ts-ignore - const updatedSpec = await updatePulledSpecJobBodyPath( + const updatedSpec = await syncRemoteSpec( await res.text(), state, config, diff --git a/packages/deploy/src/index.ts b/packages/deploy/src/index.ts index d0943bd6f..1c22fbd97 100644 --- a/packages/deploy/src/index.ts +++ b/packages/deploy/src/index.ts @@ -1,8 +1,8 @@ import { confirm } from '@inquirer/prompts'; import { inspect } from 'node:util'; -import { DeployConfig, ProjectState, SpecJob } from './types'; +import { DeployConfig, ProjectState } from './types'; import { readFile, writeFile } from 'fs/promises'; -import { parseAndValidate, addSpecJobBodyPath } from './validator'; +import { parseAndValidate } from './validator'; import jsondiff from 'json-diff'; import { mergeProjectPayloadIntoState, @@ -10,6 +10,7 @@ import { toProjectPayload, getStateFromProjectPayload, } from './stateTransform'; +import { syncRemoteSpec } from './pull'; import { deployProject, getProject } from './client'; import { DeployError } from './deployError'; import { Logger } from '@openfn/logger'; @@ -33,6 +34,7 @@ export { mergeSpecIntoState, mergeProjectPayloadIntoState, getStateFromProjectPayload, + syncRemoteSpec, }; export async function getConfig(path?: string): Promise { @@ -91,7 +93,7 @@ function writeState(config: DeployConfig, nextState: {}): Promise { export async function getSpec(path: string) { try { const body = await readFile(path, 'utf8'); - return await parseAndValidate(body, path); + return parseAndValidate(body, path); } catch (error: any) { if (error.code === 'ENOENT') { throw new DeployError(`File not found: ${path}`, 'SPEC_ERROR'); @@ -103,55 +105,6 @@ export async function getSpec(path: string) { // ============================================= -async function getAllSpecJobs( - config: DeployConfig, - logger: Logger -): Promise { - const jobs: SpecJob[] = []; - - try { - const [state, spec] = await Promise.all([ - getState(config.statePath), - getSpec(config.specPath), - ]); - - for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) { - if (workflow.jobs) { - for (const [jobKey, specJob] of Object.entries(workflow.jobs)) { - const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; - stateJob && - jobs.push({ - id: stateJob.id, - name: specJob.name, - adaptor: specJob.adaptor, - body: specJob.body, - }); - } - } - } - } catch (error: any) { - logger.debug(`Could not read the spec and state: ${error.message}`); - } - - return jobs; -} - -export async function updatePulledSpecJobBodyPath( - newSpecBody: string, - newState: ProjectState, - config: DeployConfig, - logger: Logger -): Promise { - try { - const oldSpecJobs = await getAllSpecJobs(config, logger); - - return await addSpecJobBodyPath(newSpecBody, newState, oldSpecJobs, config); - } catch (error: any) { - logger.warn(`Could not update spec job body paths: ${error.message}`); - return newSpecBody; - } -} - export async function deploy(config: DeployConfig, logger: Logger) { const [state, spec] = await Promise.all([ getState(config.statePath), diff --git a/packages/deploy/src/pull.ts b/packages/deploy/src/pull.ts new file mode 100644 index 000000000..4df7b7f11 --- /dev/null +++ b/packages/deploy/src/pull.ts @@ -0,0 +1,122 @@ +import YAML, { Pair, Scalar, isPair, isScalar } from 'yaml'; +import { DeployConfig, ProjectState, SpecJob } from './types'; +import { Logger } from '@openfn/logger'; +import { writeFile } from 'fs/promises'; +import path from 'path'; +import { getState, getSpec } from './index'; + +async function getAllSpecJobs( + config: DeployConfig, + logger: Logger +): Promise { + const jobs: SpecJob[] = []; + + try { + const state = await getState(config.statePath); + const spec = await getSpec(config.specPath); + + for (const [workflowKey, workflow] of Object.entries(spec.doc.workflows)) { + if (workflow.jobs) { + for (const [jobKey, specJob] of Object.entries(workflow.jobs)) { + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + stateJob && + jobs.push({ + id: stateJob.id, + name: specJob.name, + adaptor: specJob.adaptor, + body: specJob.body, + }); + } + } + } + } catch (error: any) { + logger.debug(`Could not read the spec and state: ${error.message}`); + } + + return jobs; +} + +async function extractJobsToDisk( + specBody: string, + state: ProjectState, + oldJobs: SpecJob[], + config: DeployConfig +): Promise { + function isPairWithScalarKey( + node: any + ): node is Pair & { key: Scalar & { value: string } } { + return ( + isPair(node) && isScalar(node.key) && typeof node.key.value === 'string' + ); + } + + const doc = YAML.parseDocument(specBody); + + await YAML.visitAsync(doc, { + async Pair(_, pair: any, pairPath) { + if ( + !pair.key || + pair.key.value !== 'body' || + !isScalar(pair.value) || + pairPath.length <= 6 + ) { + return; + } + + const jobPair = pairPath[pairPath.length - 2]; + const workflowPair = pairPath[pairPath.length - 6]; + + if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) { + return; + } + + const jobKey = jobPair.key.value; + const workflowKey = workflowPair.key.value; + + // find the job in the state + const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; + + if (!stateJob) { + return; + } + + // check if the state job is in the old spec jobs + const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id); + + if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') { + return; + } + + const oldSpecJobPath = oldSpecJob.body.path; + + if (oldSpecJobPath) { + const basePath = path.dirname(config.specPath); + const resolvedPath = path.resolve(basePath, oldSpecJobPath); + await writeFile(resolvedPath, pair.value.value); + + // set the body path in the spec + const map = doc.createNode({ path: oldSpecJobPath }); + + pair.value = map; + } + }, + }); + + return doc.toString(); +} + +export async function syncRemoteSpec( + remoteSpecBody: string, + newState: ProjectState, + config: DeployConfig, + logger: Logger +): Promise { + try { + const oldSpecJobs = await getAllSpecJobs(config, logger); + + return extractJobsToDisk(remoteSpecBody, newState, oldSpecJobs, config); + } catch (error: any) { + logger.warn(`Could not update spec job body paths: ${error.message}`); + return remoteSpecBody; + } +} diff --git a/packages/deploy/src/validator.ts b/packages/deploy/src/validator.ts index 90786bd56..79e218ab2 100644 --- a/packages/deploy/src/validator.ts +++ b/packages/deploy/src/validator.ts @@ -1,6 +1,6 @@ -import YAML, { YAMLMap, Pair, Scalar, isMap, isPair, isScalar } from 'yaml'; -import { DeployConfig, ProjectSpec, ProjectState, SpecJob } from './types'; -import { readFile, writeFile } from 'fs/promises'; +import YAML, { YAMLMap, isMap, isPair, isScalar } from 'yaml'; +import { ProjectSpec } from './types'; +import { readFile } from 'fs/promises'; import path from 'path'; export interface Error { @@ -12,7 +12,7 @@ export interface Error { export async function parseAndValidate( input: string, - specPath: string + specPath: string = '.' ): Promise<{ errors: Error[]; doc: ProjectSpec; @@ -185,72 +185,3 @@ export async function parseAndValidate( return { errors, doc: doc.toJSON() as ProjectSpec }; } - -export async function addSpecJobBodyPath( - specBody: string, - state: ProjectState, - oldJobs: SpecJob[], - config: DeployConfig -): Promise { - function isPairWithScalarKey( - node: any - ): node is Pair & { key: Scalar & { value: string } } { - return ( - isPair(node) && isScalar(node.key) && typeof node.key.value === 'string' - ); - } - - const doc = YAML.parseDocument(specBody); - - await YAML.visitAsync(doc, { - async Pair(_, pair: any, pairPath) { - if ( - !pair.key || - pair.key.value !== 'body' || - !isScalar(pair.value) || - pairPath.length <= 6 - ) { - return; - } - - const jobPair = pairPath[pairPath.length - 2]; - const workflowPair = pairPath[pairPath.length - 6]; - - if (!isPairWithScalarKey(jobPair) || !isPairWithScalarKey(workflowPair)) { - return; - } - - const jobKey = jobPair.key.value; - const workflowKey = workflowPair.key.value; - - // find the job in the state - const stateJob = state.workflows[workflowKey]?.jobs[jobKey]; - - if (!stateJob) { - return; - } - - // check if the state job is in the old spec jobs - const oldSpecJob = oldJobs.find((job) => job.id === stateJob.id); - - if (!oldSpecJob || typeof oldSpecJob?.body !== 'object') { - return; - } - - const oldSpecJobPath = oldSpecJob.body.path; - - if (oldSpecJobPath) { - const basePath = path.dirname(config.specPath); - const resolvedPath = path.resolve(basePath, oldSpecJobPath); - await writeFile(resolvedPath, pair.value.value); - - // set the body path in the spec - const map = doc.createNode({ path: oldSpecJobPath }); - - pair.value = map; - } - }, - }); - - return doc.toString(); -} diff --git a/packages/deploy/test/util.ts b/packages/deploy/test/util.ts new file mode 100644 index 000000000..1ce2ad371 --- /dev/null +++ b/packages/deploy/test/util.ts @@ -0,0 +1,14 @@ +import mock from 'mock-fs'; +import path from 'node:path'; + +export const mockFs = (files: Record) => { + const pnpm = path.resolve('../../node_modules/.pnpm'); + mock({ + [pnpm]: mock.load(pnpm, {}), + ...files, + }); +}; + +export const resetMockFs = () => { + mock.restore(); +}; diff --git a/packages/deploy/test/validator.test.ts b/packages/deploy/test/validator.test.ts index 67f8cbd38..2e7ea582f 100644 --- a/packages/deploy/test/validator.test.ts +++ b/packages/deploy/test/validator.test.ts @@ -1,22 +1,13 @@ import test from 'ava'; import { parseAndValidate } from '../src/validator'; -import fs from 'fs/promises'; -import path from 'path'; - -// Helper to create and clean up temporary test files -const createTempFile = async (t, content: string, ext = 'txt') => { - const fileName = `${t.title.replace(/\s+/g, '_')}-${Math.floor( - Math.random() * 20 - )}.${ext}`; - const filePath = path.resolve(fileName); - await fs.writeFile(filePath, content); - return filePath; -}; +import { mockFs, resetMockFs } from './util'; function findError(errors: any[], message: string) { return errors.find((e) => e.message === message); } +test.after(resetMockFs); + test('Workflows must be a map', async (t) => { const doc = ` name: project-name @@ -140,7 +131,9 @@ test('allow empty workflows', async (t) => { test('adds the file content into the job body from the specified path', async (t) => { // Step 1: Create a temporary file that the YAML will reference const fileContent = 'fn(state => state.data);'; - const filePath = await createTempFile(t, fileContent); + mockFs({ + '/jobBody.js': fileContent, + }); // Step 2: YAML document that references the file const doc = ` @@ -153,7 +146,7 @@ test('adds the file content into the job body from the specified path', async (t name: job one adaptor: '@openfn/language-http@latest' body: - path: ${path.basename(filePath)} + path: /jobBody.js `; // Step 3: Run the parseAndValidate function @@ -163,7 +156,4 @@ test('adds the file content into the job body from the specified path', async (t const jobBody = results.doc.workflows['workflow-one'].jobs!['job-one'].body; t.is(jobBody.content, fileContent); - - // Cleanup - await fs.rm(filePath); });