Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Support redeployability #394

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engines/docker/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p engineProvider) NewEngine(options engines.EngineOptions) (engines.Engine
Environment: env,
monitor: monitor,
networks: network.NewPool(client, monitor.WithPrefix("network-pool")),
imageCache: imagecache.New(client, env.GarbageCollector, monitor.WithPrefix("image-cache")),
imageCache: imagecache.New(client, env, monitor.WithPrefix("image-cache")),
}, nil
}

Expand Down
15 changes: 13 additions & 2 deletions engines/docker/imagecache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package imagecache

import (
"fmt"
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/caching"
Expand All @@ -14,20 +15,30 @@ import (
// to match the interface of fetcher.Context
type cachingContextWithQueue struct {
caching.Context
queue func() client.Queue
queue func() client.Queue
rootURL *url.URL
}

func (c cachingContextWithQueue) Queue() client.Queue {
return c.queue()
}

func (c cachingContextWithQueue) RootURL() *url.URL {
return c.rootURL
}

// taskContextWithProgress wraps TaskContext to satisfy the caching.Context
// interface, by adding a Progress() function
type taskContextWithProgress struct {
*runtime.TaskContext
Prefix string
Prefix string
rootURL *url.URL
}

func (c taskContextWithProgress) Progress(description string, percent float64) {
c.Log(fmt.Sprintf("%s: %s %.0f %%", c.Prefix, description, percent*100))
}

func (c taskContextWithProgress) RootURL() *url.URL {
return c.rootURL
}
13 changes: 7 additions & 6 deletions engines/docker/imagecache/imagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/taskcluster/taskcluster-worker/runtime/caching"
"github.com/taskcluster/taskcluster-worker/runtime/client"
"github.com/taskcluster/taskcluster-worker/runtime/fetcher"
"github.com/taskcluster/taskcluster-worker/runtime/gc"
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

Expand All @@ -28,15 +27,17 @@ type ImageCache struct {
cache *caching.Cache
docker *docker.Client
monitor runtime.Monitor
env *runtime.Environment
}

// New creates a new ImageCache object
func New(d *docker.Client, tracker gc.ResourceTracker, monitor runtime.Monitor) *ImageCache {
func New(d *docker.Client, env *runtime.Environment, monitor runtime.Monitor) *ImageCache {
ic := &ImageCache{
docker: d,
monitor: monitor,
env: env,
}
ic.cache = caching.New(ic.constructor, true, tracker, monitor)
ic.cache = caching.New(ic.constructor, true, env.GarbageCollector, monitor)
return ic
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func (ic *ImageCache) constructor(ctx caching.Context, opts interface{}) (cachin
}

// Load from reference
return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue}, options.reference)
return ic.dockerLoadFromReference(cachingContextWithQueue{ctx, options.queue, ic.env.RootURL}, options.reference)
}

// ImageHandle wraps caching.Handle such that we don't need to do any casting
Expand All @@ -109,7 +110,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{}
options.Image = s
} else {
prefix = "Fetching image"
ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference"}, imagePayload)
ref, err := imageFetcher.NewReference(taskContextWithProgress{ctx, "Resolving image reference", ic.env.RootURL}, imagePayload)
if err != nil {
return nil, runtime.NewMalformedPayloadError(fmt.Sprintf(
"unable to resolve docker image reference error: %s", err.Error(),
Expand All @@ -132,7 +133,7 @@ func (ic *ImageCache) Require(ctx *runtime.TaskContext, imagePayload interface{}
options.queue = ctx.Queue
}

handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix}, options)
handle, err := ic.cache.Require(taskContextWithProgress{ctx, prefix, ic.env.RootURL}, options)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions engines/qemu/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package qemuengine

import (
"fmt"
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/fetcher"
Expand All @@ -21,8 +22,13 @@ var imageFetcher = fetcher.Combine(

type fetchImageContext struct {
*runtime.TaskContext
rootURL *url.URL
}

func (c fetchImageContext) Progress(description string, percent float64) {
c.Log(fmt.Sprintf("Fetching image: %s - %.0f %%", description, percent*100))
}

func (c fetchImageContext) RootURL() *url.URL {
return c.rootURL
}
2 changes: 1 addition & 1 deletion engines/qemu/sandboxbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newSandboxBuilder(
var scopeSets [][]string
var inst *image.Instance

ctx := &fetchImageContext{c}
ctx := &fetchImageContext{c, e.Environment.RootURL}
ref, err := imageFetcher.NewReference(ctx, payload.Image)
if err != nil {
goto handleErr
Expand Down
1 change: 0 additions & 1 deletion examples/macosx-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ config:
TERM_PROGRAM: {$env: TERM_PROGRAM}
SHELL: /bin/bash
pollingInterval: 10
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
1 change: 0 additions & 1 deletion examples/mock-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ logLevel: debug
plugins:
disabled: []
pollingInterval: 1
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
1 change: 0 additions & 1 deletion examples/script-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ config:
- artifacts
- env
pollingInterval: 5
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /tmp/tc-worker/
serverIp: 127.0.0.1
Expand Down
2 changes: 1 addition & 1 deletion plugins/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *plugin) getVolume(ctx *runtime.TaskContext, options payloadEntry) (*cac
}

// Create a context with progress reporting capability
progressCtx := progressContext{ctx, options.Name}
progressCtx := progressContext{ctx, options.Name, p.environment.RootURL}

// First we resolve the reference, if there is any
var ref fetcher.Reference
Expand Down
13 changes: 12 additions & 1 deletion plugins/cache/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"fmt"
"io"
"net/url"
"time"

"github.com/pkg/errors"
Expand All @@ -26,15 +27,21 @@ type cacheOptions struct {
type preloadFetchContext struct {
caching.Context
InitialTaskContext *runtime.TaskContext
rootURL *url.URL
}

func (c *preloadFetchContext) Queue() client.Queue {
return c.InitialTaskContext.Queue()
}

func (c *preloadFetchContext) RootURL() *url.URL {
return c.rootURL
}

type progressContext struct {
*runtime.TaskContext
Name string
Name string
rootURL *url.URL
}

func (c *progressContext) Progress(description string, percent float64) {
Expand All @@ -45,6 +52,10 @@ func (c *progressContext) Progress(description string, percent float64) {
}
}

func (c *progressContext) RootURL() *url.URL {
return c.rootURL
}

func constructor(ctx caching.Context, opts interface{}) (caching.Resource, error) {
options := opts.(cacheOptions) // must of this type

Expand Down
13 changes: 3 additions & 10 deletions plugins/interactive/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ import (
// configured or given in the task definition
const defaultArtifactPrefix = "private/interactive/"

// defaultShellToolURL is the default URL for the tool that can connect to the
// shell socket and display an interactive shell session.
const defaultShellToolURL = "https://tools.taskcluster.net/shell/"

// defaultShellToolURL is the default URL for the tool that can list displays
// and connect to the display socket with interactive noVNC session.
const defaultDisplayToolURL = "https://tools.taskcluster.net/display/"

type provider struct {
plugins.PluginProviderBase
}
Expand All @@ -39,15 +31,16 @@ func (provider) ConfigSchema() schematypes.Schema {
func (provider) NewPlugin(options plugins.PluginOptions) (plugins.Plugin, error) {
var c config
schematypes.MustValidateAndMap(configSchema, options.Config, &c)
toolsURL := options.Environment.GetServiceURL("tools")

if c.ArtifactPrefix == "" {
c.ArtifactPrefix = defaultArtifactPrefix
}
if c.ShellToolURL == "" {
c.ShellToolURL = defaultShellToolURL
c.ShellToolURL = fmt.Sprintf("%s/shell", toolsURL)
}
if c.DisplayToolURL == "" {
c.DisplayToolURL = defaultDisplayToolURL
c.DisplayToolURL = fmt.Sprintf("%s/display", toolsURL)
}

// IF no WebHookServer is available we disabling the interactive plugin
Expand Down
7 changes: 6 additions & 1 deletion plugins/livelog/livelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,12 @@ func (tp *taskPlugin) uploadLog() error {
return err // Upload error isn't fatal
}

backingURL := fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log", tp.context.TaskInfo.TaskID, tp.context.TaskInfo.RunID)
backingURL := fmt.Sprintf(
"%s/v1/task/%s/runs/%d/artifacts/public/logs/live_backing.log",
tp.environment.GetServiceURL("queue"),
tp.context.TaskInfo.TaskID,
tp.context.TaskInfo.RunID,
)
err = tp.context.CreateRedirectArtifact(runtime.RedirectArtifact{
Name: "public/logs/live.log",
Mimetype: "text/plain; charset=utf-8",
Expand Down
19 changes: 19 additions & 0 deletions runtime/environment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package runtime

import (
"net/url"

"github.com/taskcluster/taskcluster-worker/runtime/gc"
"github.com/taskcluster/taskcluster-worker/runtime/webhookserver"
)
Expand All @@ -19,4 +21,21 @@ type Environment struct {
WorkerType string
WorkerGroup string
WorkerID string
RootURL *url.URL
}

// GetServiceURL takes a service name and returns the full taskcluster
// URL of it.
func GetServiceURL(rootURL *url.URL, serviceName string) string {
copyURL, err := url.Parse(rootURL.String())
if err != nil {
panic("WAT???")
}
copyURL.Host = serviceName + "." + copyURL.Host
return copyURL.String()
}

// GetServiceURL returns the URL of the given service
func (env *Environment) GetServiceURL(serviceName string) string {
return GetServiceURL(env.RootURL, serviceName)
}
4 changes: 2 additions & 2 deletions runtime/fetcher/artifactfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
schematypes "github.com/taskcluster/go-schematypes"
"github.com/taskcluster/httpbackoff"
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

Expand Down Expand Up @@ -95,8 +96,7 @@ func (r *artifactReference) Fetch(ctx Context, target WriteReseter) error {
// Construct URL
var u string
if r.isPublic() {
// TODO: Get queueBaseUrl from TaskContext somehow... and facilitate mocking in tests
u = fmt.Sprintf("https://queue.taskcluster.net/v1/task/%s/runs/%d/artifacts/%s", r.TaskID, r.RunID, r.Artifact)
u = fmt.Sprintf("%s/v1/task/%s/runs/%d/artifacts/%s", runtime.GetServiceURL(ctx.RootURL(), "queue"), r.TaskID, r.RunID, r.Artifact)
} else {
u2, err := ctx.Queue().GetArtifact_SignedURL(r.TaskID, strconv.Itoa(r.RunID), r.Artifact, 25*time.Minute)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions runtime/fetcher/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fetcher

import (
"context"
"net/url"
"time"

"github.com/taskcluster/taskcluster-worker/runtime/client"
Expand All @@ -21,11 +22,13 @@ type Context interface {
// consumers may wish to round to one decimal using "%.0f" formatting.
// Progress reports won't be sent more than once every 5 seconds.
Progress(description string, percent float64)
RootURL() *url.URL
}

type contextWithCancel struct {
context.Context
parent Context
parent Context
rootURL *url.URL
}

func (c *contextWithCancel) Queue() client.Queue {
Expand All @@ -36,8 +39,12 @@ func (c *contextWithCancel) Progress(description string, percent float64) {
c.parent.Progress(description, percent)
}

func (c *contextWithCancel) RootURL() *url.URL {
return c.rootURL
}

// WithCancel returns a Context and a cancel function similar to context.WithCancel
func WithCancel(ctx Context) (Context, func()) {
func WithCancel(ctx Context, rootURL *url.URL) (Context, func()) {
child, cancel := context.WithCancel(ctx)
return &contextWithCancel{child, ctx}, cancel
return &contextWithCancel{child, ctx, rootURL}, cancel
}
1 change: 0 additions & 1 deletion test/testdata/worker-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ config:
groups: []
logLevel: debug
pollingInterval: 1
queueBaseUrl: https://queue.taskcluster.net/v1
reclaimOffset: 120
temporaryFolder: /var/tmp/tc-worker-tmp
serverIp: 127.0.0.1
Expand Down
12 changes: 5 additions & 7 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type configType struct {
MinimumMemory int64 `json:"minimumMemory"`
Monitor interface{} `json:"monitor"`
Credentials tcclient.Credentials `json:"credentials"`
QueueBaseURL string `json:"queueBaseUrl"`
AuthBaseURL string `json:"authBaseUrl"`
RootURL string `json:"rootURL"`
WorkerOptions options `json:"worker"`
}

Expand Down Expand Up @@ -224,11 +223,10 @@ func ConfigSchema() schematypes.Object {
Minimum: 0,
Maximum: math.MaxInt64,
},
"monitor": monitoring.ConfigSchema,
"credentials": credentialsSchema,
"queueBaseUrl": schematypes.String{},
"authBaseUrl": schematypes.String{},
"worker": optionsSchema,
"rootURL": schematypes.String{},
"monitor": monitoring.ConfigSchema,
"credentials": credentialsSchema,
"worker": optionsSchema,
},
Required: []string{
"engine",
Expand Down
Loading