Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Concurrent Download Support for artifacts #11531

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package taskrunner
import (
"context"
"fmt"
"sync"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand All @@ -25,24 +26,18 @@ func newArtifactHook(e ti.EventEmitter, logger log.Logger) *artifactHook {
return h
}

func (*artifactHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "artifacts"
}

func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if len(req.Task.Artifacts) == 0 {
resp.Done = true
return nil
func (h *artifactHook) createWorkers(req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse, noOfWorkers int, jobsChannel chan *structs.TaskArtifact, errorChannel chan error) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go h.doWork(req, resp, jobsChannel, errorChannel, &wg)
}
wg.Wait()
close(errorChannel)
}

// Initialize hook state to store download progress
resp.State = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

for _, artifact := range req.Task.Artifacts {
func (h *artifactHook) doWork(req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse, jobs chan *structs.TaskArtifact, errorChannel chan error, wg *sync.WaitGroup) {
for artifact := range jobs {
gowthamgts marked this conversation as resolved.
Show resolved Hide resolved
aid := artifact.Hash()
if req.PreviousState[aid] != "" {
h.logger.Trace("skipping already downloaded artifact", "artifact", artifact.GetterSource)
Expand All @@ -60,14 +55,58 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
)
herr := NewHookError(wrapped, structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))

return herr
errorChannel <- herr
continue
}

// Mark artifact as downloaded to avoid re-downloading due to
// retries caused by subsequent artifacts failing. Any
// non-empty value works.
resp.State[aid] = "1"
}
wg.Done()
}

func (*artifactHook) Name() string {
// Copied in client/state when upgrading from <0.9 schemas, so if you
// change it here you also must change it there.
return "artifacts"
}

func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if len(req.Task.Artifacts) == 0 {
resp.Done = true
return nil
}

// Initialize hook state to store download progress
resp.State = make(map[string]string, len(req.Task.Artifacts))

h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskDownloadingArtifacts))

// maxConcurrency denotes the number of workers that will download artifacts in parallel
maxConcurrency := 3

// jobsChannel is a buffered channel which will have all the artifacts that needs to be processed
jobsChannel := make(chan *structs.TaskArtifact, maxConcurrency)
// Push all artifact requests to job channel
go func() {
for _, artifact := range req.Task.Artifacts {
jobsChannel <- artifact
gowthamgts marked this conversation as resolved.
Show resolved Hide resolved
}
close(jobsChannel)
}()

errorChannel := make(chan error, maxConcurrency)
// create workers and process artifacts
h.createWorkers(req, resp, maxConcurrency, jobsChannel, errorChannel)

// Iterate over the errorChannel and if there is an error, return it
for err := range errorChannel {
if err != nil {
return err
}
}

resp.Done = true
return nil
Expand Down