diff --git a/garden-service/src/commands/dev.ts b/garden-service/src/commands/dev.ts index a7d16aed31..42df173647 100644 --- a/garden-service/src/commands/dev.ts +++ b/garden-service/src/commands/dev.ts @@ -116,7 +116,7 @@ export class DevCommand extends Command { fromWatch: watch, hotReloadServiceNames, force: watch, - forceBuild: watch, + forceBuild: false, includeDependants: watch, })) } diff --git a/garden-service/src/task-graph.ts b/garden-service/src/task-graph.ts index cf3804eada..9f21a7b0fc 100644 --- a/garden-service/src/task-graph.ts +++ b/garden-service/src/task-graph.ts @@ -9,7 +9,8 @@ import * as Bluebird from "bluebird" import * as PQueue from "p-queue" import chalk from "chalk" -import { merge, padEnd, pick } from "lodash" +import * as yaml from "js-yaml" +import { merge, padEnd, pick, flatten } from "lodash" import { BaseTask, TaskDefinitionError } from "./tasks/base" import { LogEntry } from "./logger/log-entry" @@ -26,9 +27,9 @@ export interface TaskResult { error?: Error } -/* - When multiple tasks with the same baseKey are completed during a call to processTasks, - the result from the last processed is used (hence only one key-value pair here per baseKey). +/** + * When multiple tasks with the same baseKey are completed during a call to processTasks, + * the result from the last processed is used (hence only one key-value pair here per baseKey). */ export interface TaskResults { [baseKey: string]: TaskResult @@ -43,6 +44,12 @@ export class TaskGraph { private inProgress: TaskNodeMap private logEntryMap: LogEntryMap + /** + * A given task instance (uniquely identified by its key) should always return the same + * list of dependencies (by baseKey) from its getDependencies method. + */ + private taskDependencyCache: { [key: string]: Set } // sets of baseKeys + private resultCache: ResultCache private opQueue: PQueue @@ -50,6 +57,7 @@ export class TaskGraph { this.roots = new TaskNodeMap() this.index = new TaskNodeMap() this.inProgress = new TaskNodeMap() + this.taskDependencyCache = {} this.resultCache = new ResultCache() this.opQueue = new PQueue({ concurrency: 1 }) this.logEntryMap = {} @@ -63,55 +71,64 @@ export class TaskGraph { return this.opQueue.add(() => this.processTasksInternal()) } - private async addTaskInternal(task: BaseTask) { - const predecessor = this.getPredecessor(task) - let node = this.getNode(task) - - if (predecessor) { - /* - predecessor is already in the graph, having the same baseKey as task, - but a different key (see the getPredecessor method below). - */ - if (this.inProgress.contains(predecessor)) { - this.index.addNode(node) - /* - We transition - [dependencies] > predecessor > [dependants] - to - [dependencies] > predecessor > node > [dependants] - */ - this.inherit(predecessor, node) - return - } else { - node = predecessor // No need to add a new TaskNode. - } + /** + * Rebuilds the dependency relationships between the TaskNodes in this.index, and updates this.roots accordingly. + */ + private async rebuild() { + const taskNodes = this.index.getNodes() + + // this.taskDependencyCache will already have been populated at this point (happens in addTaskInternal). + for (const node of taskNodes) { + node.clear() + const taskDeps = this.taskDependencyCache[node.getKey()] || new Set() + node.setDependencies(taskNodes.filter(n => taskDeps.has(n.getBaseKey()))) } - this.index.addNode(node) + const newRootNodes = taskNodes.filter(n => n.getDependencies().length === 0) + this.roots.clear() + this.roots.setNodes(newRootNodes) + } + private async addTaskInternal(task: BaseTask) { this.garden.events.emit("taskPending", { addedAt: new Date(), - key: node.getKey(), + key: task.getKey(), version: task.version, }) + await this.addNodeWithDependencies(task) + await this.rebuild() + } - await this.addDependencies(node) + private getNode(task: BaseTask): TaskNode | null { + const key = task.getKey() + const baseKey = task.getBaseKey() + const existing = this.index.getNodes() + .filter(n => n.getBaseKey() === baseKey && n.getKey() !== key) + .reverse()[0] - if (node.getDependencies().length === 0) { - this.roots.addNode(node) + if (existing) { + // A task with the same baseKey is already pending. + return existing } else { - await this.addDependants(node) + const cachedResultExists = !!this.resultCache.get(task.getBaseKey(), task.version.versionString) + if (cachedResultExists && !task.force) { + // No need to add task or its dependencies. + return null + } else { + return new TaskNode((task)) + } } } - private getNode(task: BaseTask): TaskNode { - const existing = this.index.getNode(task) - return existing || new TaskNode(task) - } - /* - Process the graph until it's complete + /** + * Process the graph until it's complete. */ private async processTasksInternal(): Promise { + this.log.silly("") + this.log.silly("TaskGraph: this.index before processing") + this.log.silly("---------------------------------------") + this.log.silly(yaml.safeDump(this.index.inspect(), { noRefs: true, skipInvalid: true })) + const _this = this const results: TaskResults = {} @@ -127,6 +144,7 @@ export class TaskGraph { .slice(0, _this.concurrency - this.inProgress.length) batch.forEach(n => this.inProgress.addNode(n)) + await this.rebuild() this.initLogging() @@ -155,13 +173,13 @@ export class TaskGraph { result = { type, description, error } this.garden.events.emit("taskError", result) this.logTaskError(node, error) - this.cancelDependants(node) + await this.cancelDependants(node) } finally { results[baseKey] = result this.resultCache.put(baseKey, task.version.versionString, result) } } finally { - this.completeTask(node, !result.error) + await this.completeTask(node, !result.error) } return loop() @@ -170,92 +188,59 @@ export class TaskGraph { await loop() + await this.rebuild() + return results } - private completeTask(node: TaskNode, success: boolean) { - if (node.getDependencies().length > 0) { - throw new TaskGraphError(`Task ${node.getKey()} still has unprocessed dependencies`) - } - - for (let d of node.getDependants()) { - d.removeDependency(node) - - if (d.getDependencies().length === 0) { - this.roots.addNode(d) - } + private addNode(task: BaseTask): TaskNode | null { + const node = this.getNode(task) + if (node) { + this.index.addNode(node) } - - this.remove(node) - this.logTaskComplete(node, success) - } - - private getPredecessor(task: BaseTask): TaskNode | null { - const key = task.getKey() - const baseKey = task.getBaseKey() - const predecessors = this.index.getNodes() - .filter(n => n.getBaseKey() === baseKey && n.getKey() !== key) - .reverse() - return predecessors[0] || null + return node } - private async addDependencies(node: TaskNode) { - const task = node.task - for (const d of await task.getDependencies()) { + private async addNodeWithDependencies(task: BaseTask) { + const node = this.addNode(task) - if (!d.force && this.resultCache.get(d.getBaseKey(), d.version.versionString)) { - continue + if (node) { + const depTasks = await node.task.getDependencies() + this.taskDependencyCache[node.getKey()] = new Set(depTasks.map(d => d.getBaseKey())) + for (const dep of depTasks) { + await this.addNodeWithDependencies(dep) } - - const dependency = this.getPredecessor(d) || this.getNode(d) - this.index.addNode(dependency) - node.addDependency(dependency) - } } - private async addDependants(node: TaskNode) { - const nodeDependencies = node.getDependencies() - for (const d of nodeDependencies) { - const dependant = this.getPredecessor(d.task) || d - await this.addTaskInternal(dependant.task) - dependant.addDependant(node) + private async completeTask(node: TaskNode, success: boolean) { + if (node.getDependencies().length > 0) { + throw new TaskGraphError(`Task ${node.getKey()} still has unprocessed dependencies`) } - } - - private inherit(oldNode: TaskNode, newNode: TaskNode) { - oldNode.getDependants().forEach(node => { - newNode.addDependant(node) - oldNode.removeDependant(node) - node.removeDependency(oldNode) - node.addDependency(newNode) - }) - newNode.addDependency(oldNode) - oldNode.addDependant(newNode) + this.remove(node) + this.logTaskComplete(node, success) + await this.rebuild() } - // Should only be called when node is not a dependant for any task. private remove(node: TaskNode) { - this.roots.removeNode(node) this.index.removeNode(node) this.inProgress.removeNode(node) } // Recursively remove node's dependants, without removing node. - private cancelDependants(node: TaskNode) { - const remover = (n) => { - for (const dependant of n.getDependants()) { - this.logTaskComplete(n, false) - remover(dependant) - } - this.remove(n) + private async cancelDependants(node: TaskNode) { + for (const dependant of this.getDependants(node)) { + this.logTaskComplete(dependant, false) + this.remove(dependant) } + await this.rebuild() + } - for (const dependant of node.getDependants()) { - node.removeDependant(dependant) - remover(dependant) - } + private getDependants(node: TaskNode): TaskNode[] { + const dependants = this.index.getNodes().filter(n => n.getDependencies() + .find(d => d.getBaseKey() === node.getBaseKey())) + return dependants.concat(flatten(dependants.map(d => this.getDependants(d)))) } // Logging @@ -342,6 +327,12 @@ class TaskNodeMap { } } + setNodes(nodes: TaskNode[]): void { + for (const node of nodes) { + this.addNode(node) + } + } + getNodes(): TaskNode[] { return Array.from(this.index.values()) } @@ -350,44 +341,45 @@ class TaskNodeMap { return this.index.has(node.getKey()) } + clear() { + this.index.clear() + this.length = 0 + } + + // For testing/debugging purposes + inspect(): object { + const out = {} + this.index.forEach((node, key) => { + out[key] = node.inspect() + }) + return out + } + } class TaskNode { task: BaseTask private dependencies: TaskNodeMap - private dependants: TaskNodeMap constructor(task: BaseTask) { this.task = task this.dependencies = new TaskNodeMap() - this.dependants = new TaskNodeMap() - } - - addDependency(node: TaskNode) { - this.dependencies.addNode(node) - } - - addDependant(node: TaskNode) { - this.dependants.addNode(node) } - removeDependency(node: TaskNode) { - this.dependencies.removeNode(node) + clear() { + this.dependencies.clear() } - removeDependant(node: TaskNode) { - this.dependants.removeNode(node) + setDependencies(nodes: TaskNode[]) { + for (const node of nodes) { + this.dependencies.addNode(node) + } } - getDependencies() { return this.dependencies.getNodes() } - getDependants() { - return this.dependants.getNodes() - } - getBaseKey() { return this.task.getBaseKey() } @@ -408,8 +400,7 @@ class TaskNode { inspect(): object { return { key: this.getKey(), - dependencies: this.getDependencies().map(d => d.getKey()), - dependants: this.getDependants().map(d => d.getKey()), + dependencies: this.getDependencies().map(d => d.inspect()), } } @@ -431,14 +422,14 @@ interface CachedResult { } class ResultCache { - /* - By design, at most one TaskResult (the most recently processed) is cached for a given baseKey. - - Invariant: No concurrent calls are made to this class' instance methods, since they - only happen within TaskGraph's addTaskInternal and processTasksInternal methods, - which are never executed concurrently, since they are executed sequentially by the - operation queue. - */ + /** + * By design, at most one TaskResult (the most recently processed) is cached for a given baseKey. + * + * Invariant: No concurrent calls are made to this class' instance methods, since they + * only happen within TaskGraph's addTaskInternal and processTasksInternal methods, + * which are never executed concurrently, since they are executed sequentially by the + * operation queue. + */ private cache: { [key: string]: CachedResult } constructor() { diff --git a/garden-service/test/src/task-graph.ts b/garden-service/test/src/task-graph.ts index f5ecb93201..80ace5f75c 100644 --- a/garden-service/test/src/task-graph.ts +++ b/garden-service/test/src/task-graph.ts @@ -32,6 +32,7 @@ class TestTask extends BaseTask { constructor( garden: Garden, name: string, + force, options?: TestTaskOptions, ) { super({ @@ -42,6 +43,7 @@ class TestTask extends BaseTask { dirtyTimestamp: 6789, dependencyVersions: {}, }, + force, }) if (!options) { @@ -96,7 +98,7 @@ describe("task-graph", () => { it("should successfully process a single task without dependencies", async () => { const garden = await getGarden() const graph = new TaskGraph(garden, garden.log) - const task = new TestTask(garden, "a") + const task = new TestTask(garden, "a", false) await graph.addTask(task) const results = await graph.processTasks() @@ -121,7 +123,7 @@ describe("task-graph", () => { const garden = await getGarden() const graph = new TaskGraph(garden, garden.log) - const task = new TestTask(garden, "a") + const task = new TestTask(garden, "a", false) await graph.addTask(task) @@ -135,7 +137,7 @@ describe("task-graph", () => { const garden = await getGarden() const graph = new TaskGraph(garden, garden.log) - const task = new TestTask(garden, "a") + const task = new TestTask(garden, "a", false) await graph.addTask(task) const result = await graph.processTasks() @@ -176,12 +178,16 @@ describe("task-graph", () => { const opts = { callback } - const taskA = new TestTask(garden, "a", { ...opts }) - const taskB = new TestTask(garden, "b", { ...opts, dependencies: [taskA] }) - const taskC = new TestTask(garden, "c", { ...opts, dependencies: [taskB] }) - const taskD = new TestTask(garden, "d", { ...opts, dependencies: [taskB, taskC] }) + const taskA = new TestTask(garden, "a", false, { ...opts, dependencies: [], id: "a1" }) + const taskB = new TestTask(garden, "b", false, { ...opts, dependencies: [taskA], id: "b1" }) + const taskC = new TestTask(garden, "c", false, { ...opts, dependencies: [taskB], id: "c1" }) + const taskD = new TestTask(garden, "d", false, { ...opts, dependencies: [taskB, taskC], id: "d1" }) // we should be able to add tasks multiple times and in any order + + await graph.addTask(taskA) + await graph.addTask(taskB) + await graph.addTask(taskC) await graph.addTask(taskC) await graph.addTask(taskD) await graph.addTask(taskA) @@ -194,29 +200,54 @@ describe("task-graph", () => { const results = await graph.processTasks() + // repeat + + const repeatCallbackResults = {} + const repeatResultOrder: string[] = [] + + const repeatCallback = async (key: string, result: any) => { + repeatResultOrder.push(key) + repeatCallbackResults[key] = result + } + + const repeatOpts = { callback: repeatCallback } + + const repeatTaskA = new TestTask(garden, "a", false, { ...repeatOpts, dependencies: [], id: "a2" }) + const repeatTaskB = new TestTask(garden, "b", false, { ...repeatOpts, dependencies: [repeatTaskA], id: "b2" }) + const repeatTaskC = new TestTask(garden, "c", true, { ...repeatOpts, dependencies: [repeatTaskB], id: "c2" }) + + const repeatTaskAforced = new TestTask(garden, "a", true, { ...repeatOpts, dependencies: [], id: "a2f" }) + const repeatTaskBforced = new TestTask(garden, "b", true, { ...repeatOpts, dependencies: [repeatTaskA], id: "b2f" }) + + await graph.addTask(repeatTaskBforced) + await graph.addTask(repeatTaskAforced) + await graph.addTask(repeatTaskC) + + await graph.processTasks() + const resultA: TaskResult = { type: "test", - description: "a", + description: "a.a1", output: { - result: "result-a", + result: "result-a.a1", dependencyResults: {}, }, dependencyResults: {}, } const resultB: TaskResult = { type: "test", - description: "b", + description: "b.b1", output: { - result: "result-b", + result: "result-b.b1", dependencyResults: { a: resultA }, }, dependencyResults: { a: resultA }, } const resultC: TaskResult = { type: "test", - description: "c", + description: "c.c1", output: { - result: "result-c", + result: "result-c.c1", dependencyResults: { b: resultB }, }, dependencyResults: { b: resultB }, @@ -228,9 +259,9 @@ describe("task-graph", () => { c: resultC, d: { type: "test", - description: "d", + description: "d.d1", output: { - result: "result-d", + result: "result-d.d1", dependencyResults: { b: resultB, c: resultC, @@ -243,15 +274,24 @@ describe("task-graph", () => { }, } - expect(results).to.eql(expected) - expect(resultOrder).to.eql(["a", "b", "c", "d"]) + expect(results).to.eql(expected, "Wrong results after initial add and process") + expect(resultOrder).to.eql(["a.a1", "b.b1", "c.c1", "d.d1"], "Wrong result order after initial add and process") expect(callbackResults).to.eql({ - a: "result-a", - b: "result-b", - c: "result-c", - d: "result-d", - }) + "a.a1": "result-a.a1", + "b.b1": "result-b.b1", + "c.c1": "result-c.c1", + "d.d1": "result-d.d1", + }, "Wrong callbackResults after initial add and process") + + expect(repeatResultOrder).to.eql(["a.a2f", "b.b2f", "c.c2"], "Wrong result order after repeat add & process") + + expect(repeatCallbackResults).to.eql({ + "a.a2f": "result-a.a2f", + "b.b2f": "result-b.b2f", + "c.c2": "result-c.c2", + }, "Wrong callbackResults after repeat add & process") + }) it("should recursively cancel a task's dependants when it throws an error", async () => { @@ -266,10 +306,10 @@ describe("task-graph", () => { const opts = { callback } - const taskA = new TestTask(garden, "a", { ...opts }) - const taskB = new TestTask(garden, "b", { callback, throwError: true, dependencies: [taskA] }) - const taskC = new TestTask(garden, "c", { ...opts, dependencies: [taskB] }) - const taskD = new TestTask(garden, "d", { ...opts, dependencies: [taskB, taskC] }) + const taskA = new TestTask(garden, "a", false, { ...opts }) + const taskB = new TestTask(garden, "b", false, { callback, throwError: true, dependencies: [taskA] }) + const taskC = new TestTask(garden, "c", false, { ...opts, dependencies: [taskB] }) + const taskD = new TestTask(garden, "d", false, { ...opts, dependencies: [taskB, taskC] }) await graph.addTask(taskA) await graph.addTask(taskB) @@ -293,118 +333,5 @@ describe("task-graph", () => { expect(resultOrder).to.eql(["a", "b"]) }) - it.skip( - "should process a task as an inheritor of an existing, in-progress task when they have the same base key", - async () => { - const garden = await getGarden() - const graph = new TaskGraph(garden, garden.log) - - let callbackResults = {} - let resultOrder: string[] = [] - - let parentTaskStarted = false - let inheritorAdded = false - - const intervalMs = 10 - - const inheritorAddedPromise = new Promise(resolve => { - setInterval(() => { - if (inheritorAdded) { - resolve() - } - }, intervalMs) - }) - - const parentTaskStartedPromise = new Promise(resolve => { - setInterval(() => { - if (parentTaskStarted) { - resolve() - } - }, intervalMs) - }) - - const defaultCallback = async (key: string, result: any) => { - resultOrder.push(key) - callbackResults[key] = result - } - - const parentCallback = async (key: string, result: any) => { - parentTaskStarted = true - await inheritorAddedPromise - resultOrder.push(key) - callbackResults[key] = result - } - - const dependencyA = new TestTask(garden, "dependencyA", { callback: defaultCallback }) - const dependencyB = new TestTask(garden, "dependencyB", { callback: defaultCallback }) - const parentTask = new TestTask( - garden, - "sharedName", - { callback: parentCallback, id: "1", dependencies: [dependencyA, dependencyB] }, - ) - const dependantA = new TestTask(garden, "dependantA", { callback: defaultCallback, dependencies: [parentTask] }) - const dependantB = new TestTask(garden, "dependantB", { callback: defaultCallback, dependencies: [parentTask] }) - - const inheritorTask = new TestTask( - garden, - "sharedName", - { callback: defaultCallback, id: "2", dependencies: [dependencyA, dependencyB] }, - ) - - await graph.addTask(dependencyA) - await graph.addTask(dependencyB) - await graph.addTask(parentTask) - await graph.addTask(dependantA) - await graph.addTask(dependantB) - - const resultsPromise = graph.processTasks() - await parentTaskStartedPromise - await graph.addTask(inheritorTask) - inheritorAdded = true - const results = await resultsPromise - - expect(resultOrder).to.eql([ - "dependencyA", - "dependencyB", - "sharedName.1", - "sharedName.2", - "dependantA", - "dependantB", - ]) - - const resultDependencyA = { - output: "result-dependencyA", - dependencyResults: {}, - } - - const resultDependencyB = { - output: "result-dependencyB", - dependencyResults: {}, - } - - const resultSharedName = { - output: "result-sharedName.2", - dependencyResults: { dependencyA: resultDependencyA, dependencyB: resultDependencyB }, - } - - expect(results).to.eql({ - dependencyA: { output: "result-dependencyA", dependencyResults: {} }, - dependencyB: { output: "result-dependencyB", dependencyResults: {} }, - sharedName: { - output: "result-sharedName.2", - dependencyResults: { dependencyA: resultDependencyA, dependencyB: resultDependencyB }, - }, - dependantA: - { - result: "result-dependantA", - dependencyResults: { sharedName: resultSharedName }, - }, - dependantB: - { - result: "result-dependantB", - dependencyResults: { sharedName: resultSharedName }, - }, - }) - }) }) }) diff --git a/garden-service/test/src/tasks/helpers.ts b/garden-service/test/src/tasks/helpers.ts index 50f1192599..c683164d6e 100644 --- a/garden-service/test/src/tasks/helpers.ts +++ b/garden-service/test/src/tasks/helpers.ts @@ -141,7 +141,11 @@ describe("TaskHelpers", () => { }, { moduleName: "service-dependant", - withoutDependencies: ["build.service-dependant", "deploy.service-dependant", "task.dependant-task"], + withoutDependencies: [ + "build.service-dependant", + "deploy.service-dependant", + "task.dependant-task", + ], withDependencies: [ "deploy.good-morning",