From 365246e333a002f4f87677cc664bdc40a3498460 Mon Sep 17 00:00:00 2001 From: Wander Lairson Costa Date: Fri, 15 Jun 2018 18:41:29 +0000 Subject: [PATCH] Support redeployability We create a configuration called rootURL that specificies the root URL of the taskcluster service. --- engines/docker/engine.go | 2 +- engines/docker/imagecache/context.go | 15 +++++++++++++-- engines/docker/imagecache/imagecache.go | 13 +++++++------ engines/qemu/fetcher.go | 6 ++++++ engines/qemu/sandboxbuilder.go | 2 +- examples/macosx-config.yml | 1 - examples/mock-config.yml | 1 - examples/script-config.yml | 1 - plugins/cache/cache.go | 2 +- plugins/cache/constructor.go | 13 ++++++++++++- plugins/interactive/interactive.go | 13 +++---------- plugins/livelog/livelog.go | 7 ++++++- runtime/environment.go | 19 +++++++++++++++++++ runtime/fetcher/artifactfetcher.go | 4 ++-- runtime/fetcher/context.go | 13 ++++++++++--- test/testdata/worker-config.yml | 1 - worker/config.go | 12 +++++------- worker/worker.go | 17 +++++++++++++---- worker/worker_test.go | 2 +- 19 files changed, 100 insertions(+), 44 deletions(-) diff --git a/engines/docker/engine.go b/engines/docker/engine.go index 8d4d0cf5..a09719d8 100644 --- a/engines/docker/engine.go +++ b/engines/docker/engine.go @@ -60,7 +60,7 @@ func (p engineProvider) NewEngine(options engines.EngineOptions) (engines.Engine Environment: env, monitor: monitor, networks: network.NewPool(client, monitor.WithPrefix("network-pool")), - imageCache: imagecache.New(client, env.GarbageCollector, monitor.WithPrefix("image-cache")), + imageCache: imagecache.New(client, env, monitor.WithPrefix("image-cache")), }, nil } diff --git a/engines/docker/imagecache/context.go b/engines/docker/imagecache/context.go index 08ec1cb4..88d53896 100644 --- a/engines/docker/imagecache/context.go +++ b/engines/docker/imagecache/context.go @@ -4,6 +4,7 @@ package imagecache import ( "fmt" + "net/url" "github.com/taskcluster/taskcluster-worker/runtime" "github.com/taskcluster/taskcluster-worker/runtime/caching" @@ -14,20 +15,30 @@ import ( // to match the interface of fetcher.Context type cachingContextWithQueue struct { caching.Context - queue func() client.Queue + queue func() client.Queue + rootURL *url.URL } func (c cachingContextWithQueue) Queue() client.Queue { return c.queue() } +func (c cachingContextWithQueue) RootURL() *url.URL { + return c.rootURL +} + // taskContextWithProgress wraps TaskContext to satisfy the caching.Context // interface, by adding a Progress() function type taskContextWithProgress struct { *runtime.TaskContext - Prefix string + Prefix string + rootURL *url.URL } func (c taskContextWithProgress) Progress(description string, percent float64) { c.Log(fmt.Sprintf("%s: %s %.0f %%", c.Prefix, description, percent*100)) } + +func (c taskContextWithProgress) RootURL() *url.URL { + return c.rootURL +} diff --git a/engines/docker/imagecache/imagecache.go b/engines/docker/imagecache/imagecache.go index b71ca64e..62bbd757 100644 --- a/engines/docker/imagecache/imagecache.go +++ b/engines/docker/imagecache/imagecache.go @@ -18,7 +18,6 @@ import ( "github.com/taskcluster/taskcluster-worker/runtime/caching" "github.com/taskcluster/taskcluster-worker/runtime/client" "github.com/taskcluster/taskcluster-worker/runtime/fetcher" - "github.com/taskcluster/taskcluster-worker/runtime/gc" "github.com/taskcluster/taskcluster-worker/runtime/util" ) @@ -28,15 +27,17 @@ type ImageCache struct { cache *caching.Cache docker *docker.Client monitor runtime.Monitor + env *runtime.Environment } // New creates a new ImageCache object -func New(d *docker.Client, tracker gc.ResourceTracker, monitor runtime.Monitor) *ImageCache { +func New(d *docker.Client, env *runtime.Environment, monitor runtime.Monitor) *ImageCache { ic := &ImageCache{ docker: d, monitor: monitor, + env: env, } - ic.cache = caching.New(ic.constructor, true, tracker, monitor) + ic.cache = caching.New(ic.constructor, true, env.GarbageCollector, monitor) return ic } @@ -83,7 +84,7 @@ func (ic *ImageCache) constructor(ctx caching.Context, opts interface{}) (cachin } // Load from reference - return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue}, options.reference) + return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue, ic.env.RootURL}, options.reference) } // ImageHandle wraps caching.Handle such that we don't need to do any casting @@ -109,7 +110,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{} options.Image = s } else { prefix = "Fetching image" - ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference"}, imagePayload) + ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference", ic.env.RootURL}, imagePayload) if err != nil { return nil, runtime.NewMalformedPayloadError(fmt.Sprintf( "unable to resolve docker image reference error: %s", err.Error(), @@ -132,7 +133,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{} options.queue = ctx.Queue } - handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix}, options) + handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix, ic.env.RootURL}, options) if err != nil { return nil, err } diff --git a/engines/qemu/fetcher.go b/engines/qemu/fetcher.go index a46adff4..d86ac9bc 100644 --- a/engines/qemu/fetcher.go +++ b/engines/qemu/fetcher.go @@ -2,6 +2,7 @@ package qemuengine import ( "fmt" + "net/url" "github.com/taskcluster/taskcluster-worker/runtime" "github.com/taskcluster/taskcluster-worker/runtime/fetcher" @@ -21,8 +22,13 @@ var imageFetcher = fetcher.Combine( type fetchImageContext struct { *runtime.TaskContext + rootURL *url.URL } func (c fetchImageContext) Progress(description string, percent float64) { c.Log(fmt.Sprintf("Fetching image: %s - %.0f %%", description, percent*100)) } + +func (c fetchImageContext) RootURL() *url.URL { + return c.rootURL +} diff --git a/engines/qemu/sandboxbuilder.go b/engines/qemu/sandboxbuilder.go index 94fb7e5d..6919212a 100644 --- a/engines/qemu/sandboxbuilder.go +++ b/engines/qemu/sandboxbuilder.go @@ -58,7 +58,7 @@ func newSandboxBuilder( var scopeSets [][]string var inst *image.Instance - ctx := &fetchImageContext{c} + ctx := &fetchImageContext{c, e.Environment.RootURL} ref, err := imageFetcher.NewReference(ctx, payload.Image) if err != nil { goto handleErr diff --git a/examples/macosx-config.yml b/examples/macosx-config.yml index 64a047dc..3219766d 100644 --- a/examples/macosx-config.yml +++ b/examples/macosx-config.yml @@ -31,7 +31,6 @@ config: TERM_PROGRAM: {$env: TERM_PROGRAM} SHELL: /bin/bash pollingInterval: 10 - queueBaseUrl: https://queue.taskcluster.net/v1 reclaimOffset: 120 temporaryFolder: /tmp/tc-worker-tmp serverIp: 127.0.0.1 diff --git a/examples/mock-config.yml b/examples/mock-config.yml index 6fbb0849..101f9fa3 100644 --- a/examples/mock-config.yml +++ b/examples/mock-config.yml @@ -15,7 +15,6 @@ logLevel: debug plugins: disabled: [] pollingInterval: 1 -queueBaseUrl: https://queue.taskcluster.net/v1 reclaimOffset: 120 temporaryFolder: /tmp/tc-worker-tmp serverIp: 127.0.0.1 diff --git a/examples/script-config.yml b/examples/script-config.yml index 7c67dde1..3ab5517f 100644 --- a/examples/script-config.yml +++ b/examples/script-config.yml @@ -39,7 +39,6 @@ config: - artifacts - env pollingInterval: 5 - queueBaseUrl: https://queue.taskcluster.net/v1 reclaimOffset: 120 temporaryFolder: /tmp/tc-worker/ serverIp: 127.0.0.1 diff --git a/plugins/cache/cache.go b/plugins/cache/cache.go index 2fc5415f..df684ac0 100644 --- a/plugins/cache/cache.go +++ b/plugins/cache/cache.go @@ -120,7 +120,7 @@ func (p *plugin) getVolume(ctx *runtime.TaskContext, options payloadEntry) (*cac } // Create a context with progress reporting capability - progressCtx := progressContext{ctx, options.Name} + progressCtx := progressContext{ctx, options.Name, p.environment.RootURL} // First we resolve the reference, if there is any var ref fetcher.Reference diff --git a/plugins/cache/constructor.go b/plugins/cache/constructor.go index dc4e87be..01c80b22 100644 --- a/plugins/cache/constructor.go +++ b/plugins/cache/constructor.go @@ -3,6 +3,7 @@ package cache import ( "fmt" "io" + "net/url" "time" "github.com/pkg/errors" @@ -26,15 +27,21 @@ type cacheOptions struct { type preloadFetchContext struct { caching.Context InitialTaskContext *runtime.TaskContext + rootURL *url.URL } func (c *preloadFetchContext) Queue() client.Queue { return c.InitialTaskContext.Queue() } +func (c *preloadFetchContext) RootURL() *url.URL { + return c.rootURL +} + type progressContext struct { *runtime.TaskContext - Name string + Name string + rootURL *url.URL } func (c *progressContext) Progress(description string, percent float64) { @@ -45,6 +52,10 @@ func (c *progressContext) Progress(description string, percent float64) { } } +func (c *progressContext) RootURL() *url.URL { + return c.rootURL +} + func constructor(ctx caching.Context, opts interface{}) (caching.Resource, error) { options := opts.(cacheOptions) // must of this type diff --git a/plugins/interactive/interactive.go b/plugins/interactive/interactive.go index c59f6fe5..e581c5a3 100644 --- a/plugins/interactive/interactive.go +++ b/plugins/interactive/interactive.go @@ -21,14 +21,6 @@ import ( // configured or given in the task definition const defaultArtifactPrefix = "private/interactive/" -// defaultShellToolURL is the default URL for the tool that can connect to the -// shell socket and display an interactive shell session. -const defaultShellToolURL = "https://tools.taskcluster.net/shell/" - -// defaultShellToolURL is the default URL for the tool that can list displays -// and connect to the display socket with interactive noVNC session. -const defaultDisplayToolURL = "https://tools.taskcluster.net/display/" - type provider struct { plugins.PluginProviderBase } @@ -39,15 +31,16 @@ func (provider) ConfigSchema() schematypes.Schema { func (provider) NewPlugin(options plugins.PluginOptions) (plugins.Plugin, error) { var c config schematypes.MustValidateAndMap(configSchema, options.Config, &c) + toolsURL := options.Environment.GetServiceURL("tools") if c.ArtifactPrefix == "" { c.ArtifactPrefix = defaultArtifactPrefix } if c.ShellToolURL == "" { - c.ShellToolURL = defaultShellToolURL + c.ShellToolURL = fmt.Sprintf("%s/shell", toolsURL) } if c.DisplayToolURL == "" { - c.DisplayToolURL = defaultDisplayToolURL + c.DisplayToolURL = fmt.Sprintf("%s/display", toolsURL) } // IF no WebHookServer is available we disabling the interactive plugin diff --git a/plugins/livelog/livelog.go b/plugins/livelog/livelog.go index ce667a3b..85a7a38d 100644 --- a/plugins/livelog/livelog.go +++ b/plugins/livelog/livelog.go @@ -202,7 +202,12 @@ func (tp *taskPlugin) uploadLog() error { return err // Upload error isn't fatal } - backingURL := fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log", tp.context.TaskInfo.TaskID, tp.context.TaskInfo.RunID) + backingURL := fmt.Sprintf( + "%s/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log", + tp.environment.GetServiceURL("queue"), + tp.context.TaskInfo.TaskID, + tp.context.TaskInfo.RunID, + ) err = tp.context.CreateRedirectArtifact(runtime.RedirectArtifact{ Name: "public/logs/live.log", Mimetype: "text/plain; charset=utf-8", diff --git a/runtime/environment.go b/runtime/environment.go index 66a7ec6d..aca78582 100644 --- a/runtime/environment.go +++ b/runtime/environment.go @@ -1,6 +1,8 @@ package runtime import ( + "net/url" + "github.com/taskcluster/taskcluster-worker/runtime/gc" "github.com/taskcluster/taskcluster-worker/runtime/webhookserver" ) @@ -19,4 +21,21 @@ type Environment struct { WorkerType string WorkerGroup string WorkerID string + RootURL *url.URL +} + +// GetServiceURL takes a service name and returns the full taskcluster +// URL of it. +func GetServiceURL(rootURL *url.URL, serviceName string) string { + copyURL, err := url.Parse(rootURL.String()) + if err != nil { + panic("WAT???") + } + copyURL.Host = serviceName + "." + copyURL.Host + return copyURL.String() +} + +// GetServiceURL returns the URL of the given service +func (env *Environment) GetServiceURL(serviceName string) string { + return GetServiceURL(env.RootURL, serviceName) } diff --git a/runtime/fetcher/artifactfetcher.go b/runtime/fetcher/artifactfetcher.go index 435c67e1..e722b271 100644 --- a/runtime/fetcher/artifactfetcher.go +++ b/runtime/fetcher/artifactfetcher.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" schematypes "github.com/taskcluster/go-schematypes" "github.com/taskcluster/httpbackoff" + "github.com/taskcluster/taskcluster-worker/runtime" "github.com/taskcluster/taskcluster-worker/runtime/util" ) @@ -95,8 +96,7 @@ func (r *artifactReference) Fetch(ctx Context, target WriteReseter) error { // Construct URL var u string if r.isPublic() { - // TODO: Get queueBaseUrl from TaskContext somehow... and facilitate mocking in tests - u = fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/%s", r.TaskID, r.RunID, r.Artifact) + u = fmt.Sprintf("%s/v1/task/%s/runs/%d/artifacts/%s", runtime.GetServiceURL(ctx.RootURL(), "queue"), r.TaskID, r.RunID, r.Artifact) } else { u2, err := ctx.Queue().GetArtifact_SignedURL(r.TaskID, strconv.Itoa(r.RunID), r.Artifact, 25*time.Minute) if err != nil { diff --git a/runtime/fetcher/context.go b/runtime/fetcher/context.go index 4dc910f1..b6bef857 100644 --- a/runtime/fetcher/context.go +++ b/runtime/fetcher/context.go @@ -2,6 +2,7 @@ package fetcher import ( "context" + "net/url" "time" "github.com/taskcluster/taskcluster-worker/runtime/client" @@ -21,11 +22,13 @@ type Context interface { // consumers may wish to round to one decimal using "%.0f" formatting. // Progress reports won't be sent more than once every 5 seconds. Progress(description string, percent float64) + RootURL() *url.URL } type contextWithCancel struct { context.Context - parent Context + parent Context + rootURL *url.URL } func (c *contextWithCancel) Queue() client.Queue { @@ -36,8 +39,12 @@ func (c *contextWithCancel) Progress(description string, percent float64) { c.parent.Progress(description, percent) } +func (c *contextWithCancel) RootURL() *url.URL { + return c.rootURL +} + // WithCancel returns a Context and a cancel function similar to context.WithCancel -func WithCancel(ctx Context) (Context, func()) { +func WithCancel(ctx Context, rootURL *url.URL) (Context, func()) { child, cancel := context.WithCancel(ctx) - return &contextWithCancel{child, ctx}, cancel + return &contextWithCancel{child, ctx, rootURL}, cancel } diff --git a/test/testdata/worker-config.yml b/test/testdata/worker-config.yml index 5a57b645..845cd97b 100644 --- a/test/testdata/worker-config.yml +++ b/test/testdata/worker-config.yml @@ -21,7 +21,6 @@ config: groups: [] logLevel: debug pollingInterval: 1 - queueBaseUrl: https://queue.taskcluster.net/v1 reclaimOffset: 120 temporaryFolder: /var/tmp/tc-worker-tmp serverIp: 127.0.0.1 diff --git a/worker/config.go b/worker/config.go index 67ded398..8bae037b 100644 --- a/worker/config.go +++ b/worker/config.go @@ -34,8 +34,7 @@ type configType struct { MinimumMemory int64 `json:"minimumMemory"` Monitor interface{} `json:"monitor"` Credentials tcclient.Credentials `json:"credentials"` - QueueBaseURL string `json:"queueBaseUrl"` - AuthBaseURL string `json:"authBaseUrl"` + RootURL string `json:"rootURL"` WorkerOptions options `json:"worker"` } @@ -224,11 +223,10 @@ func ConfigSchema() schematypes.Object { Minimum: 0, Maximum: math.MaxInt64, }, - "monitor": monitoring.ConfigSchema, - "credentials": credentialsSchema, - "queueBaseUrl": schematypes.String{}, - "authBaseUrl": schematypes.String{}, - "worker": optionsSchema, + "rootURL": schematypes.String{}, + "monitor": monitoring.ConfigSchema, + "credentials": credentialsSchema, + "worker": optionsSchema, }, Required: []string{ "engine", diff --git a/worker/worker.go b/worker/worker.go index cd3fb5ca..d9f26293 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/url" "os" "strconv" "time" @@ -51,18 +52,25 @@ func New(config interface{}) (w *Worker, err error) { var c configType schematypes.MustValidateAndMap(ConfigSchema(), config, &c) + if c.RootURL == "" { + c.RootURL = "https://taskcluster.net" + } + + rootURL, err := url.Parse(c.RootURL) + if err != nil { + return nil, err + } + // Create monitor a := tcauth.New(&c.Credentials) - if c.AuthBaseURL != "" { - a.BaseURL = c.AuthBaseURL - } + a.BaseURL = runtime.GetServiceURL(rootURL, "auth") monitor := monitoring.New(c.Monitor, a) // Create worker w = &Worker{ monitor: monitor.WithPrefix("worker"), garbageCollector: gc.New(c.TemporaryFolder, c.MinimumDiskSpace, c.MinimumMemory), - queueBaseURL: c.QueueBaseURL, + queueBaseURL: runtime.GetServiceURL(rootURL, "queue"), options: c.WorkerOptions, } @@ -102,6 +110,7 @@ func New(config interface{}) (w *Worker, err error) { WorkerID: c.WorkerOptions.WorkerID, ProvisionerID: c.WorkerOptions.ProvisionerID, WorkerType: c.WorkerOptions.WorkerType, + RootURL: rootURL, } // Create engine diff --git a/worker/worker_test.go b/worker/worker_test.go index 12834846..16d51c39 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -40,7 +40,6 @@ func setupTestWorker(t *testing.T, queueBaseURL string, concurrency int) *Worker "clientId": "my-test-client-id", "accessToken": "my-super-secret-access-token" }, - "queueBaseUrl": "` + queueBaseURL + `", "worker": { "provisionerId": "test-provisioner-id", "workerType": "test-worker-type", @@ -55,6 +54,7 @@ func setupTestWorker(t *testing.T, queueBaseURL string, concurrency int) *Worker var config interface{} require.NoError(t, json.Unmarshal([]byte(raw), &config)) w, err := New(config) + w.queueBaseURL = queueBaseURL require.NoError(t, err) return w }