Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Commit

Permalink
Support redeployability
Browse files Browse the repository at this point in the history
We create a configuration called rootURL that specificies the root URL
of the taskcluster service.
  • Loading branch information
Wander Lairson Costa committed Jun 15, 2018
1 parent d3a0d81 commit 365246e
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 44 deletions.
2 changes: 1 addition & 1 deletion engines/docker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p engineProvider) NewEngine(options engines.EngineOptions) (engines.Engine
Environment: env,
monitor: monitor,
networks: network.NewPool(client, monitor.WithPrefix("network-pool")),
imageCache: imagecache.New(client, env.GarbageCollector, monitor.WithPrefix("image-cache")),
imageCache: imagecache.New(client, env, monitor.WithPrefix("image-cache")),
}, nil
}

Expand Down
15 changes: 13 additions & 2 deletions engines/docker/imagecache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package imagecache

import (
"fmt"
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/caching"
Expand All @@ -14,20 +15,30 @@ import (
// to match the interface of fetcher.Context
type cachingContextWithQueue struct {
caching.Context
queue func() client.Queue
queue func() client.Queue
rootURL *url.URL
}

func (c cachingContextWithQueue) Queue() client.Queue {
return c.queue()
}

func (c cachingContextWithQueue) RootURL() *url.URL {
return c.rootURL
}

// taskContextWithProgress wraps TaskContext to satisfy the caching.Context
// interface, by adding a Progress() function
type taskContextWithProgress struct {
*runtime.TaskContext
Prefix string
Prefix string
rootURL *url.URL
}

func (c taskContextWithProgress) Progress(description string, percent float64) {
c.Log(fmt.Sprintf("%s: %s %.0f %%", c.Prefix, description, percent*100))
}

func (c taskContextWithProgress) RootURL() *url.URL {
return c.rootURL
}
13 changes: 7 additions & 6 deletions engines/docker/imagecache/imagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/taskcluster/taskcluster-worker/runtime/caching"
"github.com/taskcluster/taskcluster-worker/runtime/client"
"github.com/taskcluster/taskcluster-worker/runtime/fetcher"
"github.com/taskcluster/taskcluster-worker/runtime/gc"
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

Expand All @@ -28,15 +27,17 @@ type ImageCache struct {
cache *caching.Cache
docker *docker.Client
monitor runtime.Monitor
env *runtime.Environment
}

// New creates a new ImageCache object
func New(d *docker.Client, tracker gc.ResourceTracker, monitor runtime.Monitor) *ImageCache {
func New(d *docker.Client, env *runtime.Environment, monitor runtime.Monitor) *ImageCache {
ic := &ImageCache{
docker: d,
monitor: monitor,
env: env,
}
ic.cache = caching.New(ic.constructor, true, tracker, monitor)
ic.cache = caching.New(ic.constructor, true, env.GarbageCollector, monitor)
return ic
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func (ic *ImageCache) constructor(ctx caching.Context, opts interface{}) (cachin
}

// Load from reference
return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue}, options.reference)
return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue, ic.env.RootURL}, options.reference)
}

// ImageHandle wraps caching.Handle such that we don't need to do any casting
Expand All @@ -109,7 +110,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{}
options.Image = s
} else {
prefix = "Fetching image"
ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference"}, imagePayload)
ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference", ic.env.RootURL}, imagePayload)
if err != nil {
return nil, runtime.NewMalformedPayloadError(fmt.Sprintf(
"unable to resolve docker image reference error: %s", err.Error(),
Expand All @@ -132,7 +133,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{}
options.queue = ctx.Queue
}

handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix}, options)
handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix, ic.env.RootURL}, options)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions engines/qemu/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package qemuengine

import (
"fmt"
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/fetcher"
Expand All @@ -21,8 +22,13 @@ var imageFetcher = fetcher.Combine(

type fetchImageContext struct {
*runtime.TaskContext
rootURL *url.URL
}

func (c fetchImageContext) Progress(description string, percent float64) {
c.Log(fmt.Sprintf("Fetching image: %s - %.0f %%", description, percent*100))
}

func (c fetchImageContext) RootURL() *url.URL {
return c.rootURL
}
2 changes: 1 addition & 1 deletion engines/qemu/sandboxbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newSandboxBuilder(
var scopeSets [][]string
var inst *image.Instance

ctx := &fetchImageContext{c}
ctx := &fetchImageContext{c, e.Environment.RootURL}
ref, err := imageFetcher.NewReference(ctx, payload.Image)
if err != nil {
goto handleErr
Expand Down
1 change: 0 additions & 1 deletion examples/macosx-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ config:
TERM_PROGRAM: {$env: TERM_PROGRAM}
SHELL: /bin/bash
pollingInterval: 10
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
1 change: 0 additions & 1 deletion examples/mock-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ logLevel: debug
plugins:
disabled: []
pollingInterval: 1
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
1 change: 0 additions & 1 deletion examples/script-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ config:
- artifacts
- env
pollingInterval: 5
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker/
serverIp: 127.0.0.1
Expand Down
2 changes: 1 addition & 1 deletion plugins/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *plugin) getVolume(ctx *runtime.TaskContext, options payloadEntry) (*cac
}

// Create a context with progress reporting capability
progressCtx := progressContext{ctx, options.Name}
progressCtx := progressContext{ctx, options.Name, p.environment.RootURL}

// First we resolve the reference, if there is any
var ref fetcher.Reference
Expand Down
13 changes: 12 additions & 1 deletion plugins/cache/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"fmt"
"io"
"net/url"
"time"

"github.com/pkg/errors"
Expand All @@ -26,15 +27,21 @@ type cacheOptions struct {
type preloadFetchContext struct {
caching.Context
InitialTaskContext *runtime.TaskContext
rootURL *url.URL
}

func (c *preloadFetchContext) Queue() client.Queue {
return c.InitialTaskContext.Queue()
}

func (c *preloadFetchContext) RootURL() *url.URL {
return c.rootURL
}

type progressContext struct {
*runtime.TaskContext
Name string
Name string
rootURL *url.URL
}

func (c *progressContext) Progress(description string, percent float64) {
Expand All @@ -45,6 +52,10 @@ func (c *progressContext) Progress(description string, percent float64) {
}
}

func (c *progressContext) RootURL() *url.URL {
return c.rootURL
}

func constructor(ctx caching.Context, opts interface{}) (caching.Resource, error) {
options := opts.(cacheOptions) // must of this type

Expand Down
13 changes: 3 additions & 10 deletions plugins/interactive/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ import (
// configured or given in the task definition
const defaultArtifactPrefix = "private/interactive/"

// defaultShellToolURL is the default URL for the tool that can connect to the
// shell socket and display an interactive shell session.
const defaultShellToolURL = "https://tools.taskcluster.net/shell/"

// defaultShellToolURL is the default URL for the tool that can list displays
// and connect to the display socket with interactive noVNC session.
const defaultDisplayToolURL = "https://tools.taskcluster.net/display/"

type provider struct {
plugins.PluginProviderBase
}
Expand All @@ -39,15 +31,16 @@ func (provider) ConfigSchema() schematypes.Schema {
func (provider) NewPlugin(options plugins.PluginOptions) (plugins.Plugin, error) {
var c config
schematypes.MustValidateAndMap(configSchema, options.Config, &c)
toolsURL := options.Environment.GetServiceURL("tools")

if c.ArtifactPrefix == "" {
c.ArtifactPrefix = defaultArtifactPrefix
}
if c.ShellToolURL == "" {
c.ShellToolURL = defaultShellToolURL
c.ShellToolURL = fmt.Sprintf("%s/shell", toolsURL)
}
if c.DisplayToolURL == "" {
c.DisplayToolURL = defaultDisplayToolURL
c.DisplayToolURL = fmt.Sprintf("%s/display", toolsURL)
}

// IF no WebHookServer is available we disabling the interactive plugin
Expand Down
7 changes: 6 additions & 1 deletion plugins/livelog/livelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ func (tp *taskPlugin) uploadLog() error {
return err // Upload error isn't fatal
}

backingURL := fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log", tp.context.TaskInfo.TaskID, tp.context.TaskInfo.RunID)
backingURL := fmt.Sprintf(
"%s/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log",
tp.environment.GetServiceURL("queue"),
tp.context.TaskInfo.TaskID,
tp.context.TaskInfo.RunID,
)
err = tp.context.CreateRedirectArtifact(runtime.RedirectArtifact{
Name: "public/logs/live.log",
Mimetype: "text/plain; charset=utf-8",
Expand Down
19 changes: 19 additions & 0 deletions runtime/environment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package runtime

import (
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime/gc"
"github.com/taskcluster/taskcluster-worker/runtime/webhookserver"
)
Expand All @@ -19,4 +21,21 @@ type Environment struct {
WorkerType string
WorkerGroup string
WorkerID string
RootURL *url.URL
}

// GetServiceURL takes a service name and returns the full taskcluster
// URL of it.
func GetServiceURL(rootURL *url.URL, serviceName string) string {
copyURL, err := url.Parse(rootURL.String())
if err != nil {
panic("WAT???")
}
copyURL.Host = serviceName + "." + copyURL.Host
return copyURL.String()
}

// GetServiceURL returns the URL of the given service
func (env *Environment) GetServiceURL(serviceName string) string {
return GetServiceURL(env.RootURL, serviceName)
}
4 changes: 2 additions & 2 deletions runtime/fetcher/artifactfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
schematypes "github.com/taskcluster/go-schematypes"
"github.com/taskcluster/httpbackoff"
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

Expand Down Expand Up @@ -95,8 +96,7 @@ func (r *artifactReference) Fetch(ctx Context, target WriteReseter) error {
// Construct URL
var u string
if r.isPublic() {
// TODO: Get queueBaseUrl from TaskContext somehow... and facilitate mocking in tests
u = fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/%s", r.TaskID, r.RunID, r.Artifact)
u = fmt.Sprintf("%s/v1/task/%s/runs/%d/artifacts/%s", runtime.GetServiceURL(ctx.RootURL(), "queue"), r.TaskID, r.RunID, r.Artifact)
} else {
u2, err := ctx.Queue().GetArtifact_SignedURL(r.TaskID, strconv.Itoa(r.RunID), r.Artifact, 25*time.Minute)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions runtime/fetcher/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fetcher

import (
"context"
"net/url"
"time"

"github.com/taskcluster/taskcluster-worker/runtime/client"
Expand All @@ -21,11 +22,13 @@ type Context interface {
// consumers may wish to round to one decimal using "%.0f" formatting.
// Progress reports won't be sent more than once every 5 seconds.
Progress(description string, percent float64)
RootURL() *url.URL
}

type contextWithCancel struct {
context.Context
parent Context
parent Context
rootURL *url.URL
}

func (c *contextWithCancel) Queue() client.Queue {
Expand All @@ -36,8 +39,12 @@ func (c *contextWithCancel) Progress(description string, percent float64) {
c.parent.Progress(description, percent)
}

func (c *contextWithCancel) RootURL() *url.URL {
return c.rootURL
}

// WithCancel returns a Context and a cancel function similar to context.WithCancel
func WithCancel(ctx Context) (Context, func()) {
func WithCancel(ctx Context, rootURL *url.URL) (Context, func()) {
child, cancel := context.WithCancel(ctx)
return &contextWithCancel{child, ctx}, cancel
return &contextWithCancel{child, ctx, rootURL}, cancel
}
1 change: 0 additions & 1 deletion test/testdata/worker-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ config:
groups: []
logLevel: debug
pollingInterval: 1
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /var/tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
12 changes: 5 additions & 7 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type configType struct {
MinimumMemory int64 `json:"minimumMemory"`
Monitor interface{} `json:"monitor"`
Credentials tcclient.Credentials `json:"credentials"`
QueueBaseURL string `json:"queueBaseUrl"`
AuthBaseURL string `json:"authBaseUrl"`
RootURL string `json:"rootURL"`
WorkerOptions options `json:"worker"`
}

Expand Down Expand Up @@ -224,11 +223,10 @@ func ConfigSchema() schematypes.Object {
Minimum: 0,
Maximum: math.MaxInt64,
},
"monitor": monitoring.ConfigSchema,
"credentials": credentialsSchema,
"queueBaseUrl": schematypes.String{},
"authBaseUrl": schematypes.String{},
"worker": optionsSchema,
"rootURL": schematypes.String{},
"monitor": monitoring.ConfigSchema,
"credentials": credentialsSchema,
"worker": optionsSchema,
},
Required: []string{
"engine",
Expand Down
Loading

0 comments on commit 365246e

Please sign in to comment.