Skip to content

Commit

Permalink
restructure: orchestrator package
Browse files Browse the repository at this point in the history
  • Loading branch information
kristofferlind committed Nov 6, 2022
1 parent 291d974 commit 8b11d73
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 566 deletions.
103 changes: 0 additions & 103 deletions all.go

This file was deleted.

70 changes: 0 additions & 70 deletions changed.go

This file was deleted.

9 changes: 9 additions & 0 deletions internal/orchestrator/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package orchestrator

type job struct {
componentName string
action string
commands []string
workingDirectory string
variables map[string]string
}
94 changes: 94 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package orchestrator

import (
"log"
"os"
"sync"
"time"

"github.com/kristofferlind/knega/internal/configuration"
)

func Execute(action string, skipExisting bool) error {
startTime := time.Now()
repository := configuration.GetRepository()
components := repository.FindComponentsWithAction(action)

var componentsToExecute []configuration.Component
if skipExisting {
componentsToExecute = getComponentsWithChanges(components)
} else {
componentsToExecute = components
}

var jobs []job
for _, component := range componentsToExecute {
job := job{
componentName: component.Name,
action: action,
commands: component.Actions[action].Commands,
workingDirectory: component.Path,
variables: map[string]string{
"ROOT": repository.RootPath,
"COMPONENT_NAME": component.Name,
"INPUTS_HASH": component.InputsHash,
},
}

jobs = append(jobs, job)
}

if len(jobs) > 0 {
createPipeline(jobs)
}

endTime := time.Now()
executionTime := endTime.Sub(startTime)
log.Printf("Total time taken: %s", executionTime)

return nil
}

func getComponentsWithChanges(components []configuration.Component) []configuration.Component {
var asyncChangedChecks sync.WaitGroup
var checkedComponents []configuration.Component
var changedComponents []configuration.Component

checkedComponentsChannel := make(chan configuration.Component, len(components))

for _, component := range components {
asyncChangedChecks.Add(1)

go func(changedComponentsChannel chan<- configuration.Component, component configuration.Component) {
component.HasChanges()
changedComponentsChannel <- component
asyncChangedChecks.Done()
}(checkedComponentsChannel, component)
}

asyncChangedChecks.Wait()
close(checkedComponentsChannel)

for checkedComponent := range checkedComponentsChannel {
checkedComponents = append(checkedComponents, checkedComponent)
if checkedComponent.ChangeStatus != configuration.Pristine {
changedComponents = append(changedComponents, checkedComponent)
}
}

printStatus(checkedComponents)

return changedComponents
}

func getMin(a int, b int) int {
if a <= b {
return a
}
return b
}

func IsTrace() bool {
traceValue := os.Getenv("KNEGA_TRACE")
return traceValue == "true"
}
52 changes: 52 additions & 0 deletions internal/orchestrator/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package orchestrator

import (
"log"
"sync"
"time"

"github.com/kristofferlind/knega/internal/command"
"github.com/kristofferlind/knega/internal/configuration"
)

func createPipeline(jobs []job) {
jobsCount := len(jobs)
repository := configuration.GetRepository()
threads := getMin(repository.Concurrency, jobsCount)

log.Printf("Created %v jobs and %v workers", jobsCount, threads)

queue := make(chan job, jobsCount)
for _, job := range jobs {
queue <- job
}

close(queue)

var asyncWorkers sync.WaitGroup

for workerId := 1; workerId <= threads; workerId++ {
asyncWorkers.Add(1)
go createWorker(&asyncWorkers, queue)
}

asyncWorkers.Wait()
}

func createWorker(asyncWorkers *sync.WaitGroup, jobs <-chan job) {
for job := range jobs {
jobStartTime := time.Now()

for _, jobCommand := range job.commands {
output := command.ExecuteStopOnError(jobCommand, job.workingDirectory, job.variables)
if IsTrace() {
log.Printf("%s", output)
}
}

jobEndTime := time.Now()
jobTime := jobEndTime.Sub(jobStartTime)
log.Printf("%s completed in %s", job.componentName, jobTime)
}
asyncWorkers.Done()
}
30 changes: 30 additions & 0 deletions internal/orchestrator/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package orchestrator

import (
"fmt"
"os"
"sort"
"text/tabwriter"

"github.com/kristofferlind/knega/internal/command"
"github.com/kristofferlind/knega/internal/configuration"
)

func printStatus(components []configuration.Component) {
sort.Slice(components, func(i, j int) bool {
return components[i].Name < components[j].Name
})
writer := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.StripEscape)
for _, component := range components {
if component.ChangeStatus == configuration.Unknown {
fmt.Fprintf(writer, "%s: \t "+command.Warning("Unknown")+"\t %s\n", component.Name, component.InputsHash[0:19])
}
if component.ChangeStatus == configuration.Dirty {
fmt.Fprintf(writer, "%s: \t "+command.Fatal("Rebuild required")+"\t %s\n", component.Name, component.InputsHash[0:19])
}
if component.ChangeStatus == configuration.Pristine {
fmt.Fprintf(writer, "%s: \t "+command.Success("Artifacts found")+"\t %s\n", component.Name, component.InputsHash[0:19])
}
}
writer.Flush()
}
Loading

0 comments on commit 8b11d73

Please sign in to comment.