Skip to content

Commit

Permalink
Make Buildkite endpoints configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Sep 30, 2024
1 parent 9077705 commit aa8f1ed
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 85 deletions.
75 changes: 47 additions & 28 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ import (
"github.com/Khan/genqlient/graphql"
)

func NewClient(token string) graphql.Client {
func NewClient(token, endpoint string) graphql.Client {
if endpoint == "" {
endpoint = "https://graphql.buildkite.com/v1"
}
httpClient := http.Client{
Timeout: 60 * time.Second,
Transport: NewLogger(&authedTransport{
key: token,
wrapped: http.DefaultTransport,
}),
}
return graphql.NewClient("https://graphql.buildkite.com/v1", &httpClient)
return graphql.NewClient(endpoint, &httpClient)
}

type authedTransport struct {
Expand All @@ -30,8 +33,24 @@ type authedTransport struct {
}

func (t *authedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", "Bearer "+t.key)
return t.wrapped.RoundTrip(req)
// RoundTripper should not mutate the request except to close the body, and
// should always close the request body whether or not there was an error.
// See https://pkg.go.dev/net/http#RoundTripper.
// This implementation based on https://github.com/golang/oauth2/blob/master/transport.go
reqBodyClosed := false
if req.Body != nil {
defer func() {
if !reqBodyClosed {
req.Body.Close()
}
}()
}

reqCopy := req.Clone(req.Context())
reqCopy.Header.Set("Authorization", "Bearer "+t.key)

reqBodyClosed = true
return t.wrapped.RoundTrip(reqCopy)
}

type logTransport struct {
Expand All @@ -51,42 +70,42 @@ func (t *logTransport) RoundTrip(in *http.Request) (out *http.Response, err erro
log.Printf("--> %s %s", in.Method, in.URL)

// Save these headers so we can redact Authorization.
savedHeaders := in.Header.Clone()
inCopy := in
if in.Header != nil && in.Header.Get("authorization") != "" {
in.Header.Set("authorization", "<redacted>")
inCopy = in.Clone(in.Context())
inCopy.Header.Set("authorization", "<redacted>")
}

b, err := httputil.DumpRequestOut(in, true)
if err == nil {
log.Println(string(b))
} else {
b, err := httputil.DumpRequestOut(inCopy, true)
if err != nil {
log.Printf("Failed to dump request %s %s: %v", in.Method, in.URL, err)
}

// Restore the non-redacted headers.
in.Header = savedHeaders
if b := string(b); b != "" {
log.Println(b)
}

start := time.Now()
out, err = t.inner.RoundTrip(in)
duration := time.Since(start)
if err != nil {
log.Printf("<-- %v %s %s (%s)", err, in.Method, in.URL, duration)
}
if out != nil {
msg := fmt.Sprintf("<-- %d", out.StatusCode)
if out.Request != nil {
msg = fmt.Sprintf("%s %s", msg, out.Request.URL)
}
msg = fmt.Sprintf("%s (%s)", msg, duration)

log.Print(msg)

b, err := httputil.DumpResponse(out, true)
if err == nil {
log.Println(string(b))
} else {
log.Printf("Failed to dump response %s %s: %v", in.Method, in.URL, err)
}

if out == nil {
return
}
msg := fmt.Sprintf("<-- %d", out.StatusCode)
if out.Request != nil {
msg = fmt.Sprintf("%s %s", msg, out.Request.URL)
}
log.Printf("%s (%s)", msg, duration)

b, err = httputil.DumpResponse(out, true)
if err != nil {
log.Printf("Failed to dump response %s %s: %v", in.Method, in.URL, err)
}
if b := string(b); b != "" {
log.Println(b)
}
return
}
12 changes: 12 additions & 0 deletions charts/agent-stack-k8s/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@
"title": "The config Schema",
"required": ["org"],
"properties": {
"agent-endpoint": {
"type": "string",
"default": "",
"title": "The Agent REST API endpoint URL",
"examples": [""]
},
"graphql-endpoint": {
"type": "string",
"default": "",
"title": "The GraphQL endpoint URL",
"examples": [""]
},
"image": {
"type": "string",
"default": "",
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func AddConfigFlags(cmd *cobra.Command) {
"",
"Bind address to expose the pprof profiler (e.g. localhost:6060)",
)
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")
cmd.Flags().String("agent-endpoint", "", "Buildkite Agent REST API endpoint URL")

cmd.Flags().Duration(
"image-pull-backoff-grace-period",
config.DefaultImagePullBackOffGracePeriod,
Expand Down
8 changes: 7 additions & 1 deletion cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/buildkite/agent-stack-k8s/v2/cmd/controller"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/google/go-cmp/cmp"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +32,8 @@ func TestReadAndParseConfig(t *testing.T) {
Tags: []string{"queue=my-queue", "priority=high"},
ClusterUUID: "beefcafe-abbe-baba-abba-deedcedecade",
ProhibitKubernetesPlugin: true,
GraphQLEndpoint: "http://graphql.buildkite.localhost/v1",
AgentEndpoint: "http://agent.buildkite.localhost/v3",
DefaultCommandParams: &config.CommandParams{
Interposer: config.InterposerVector,
EnvFrom: []corev1.EnvFromSource{{
Expand Down Expand Up @@ -115,5 +118,8 @@ func TestReadAndParseConfig(t *testing.T) {

actual, err := controller.ParseAndValidateConfig(v)
require.NoError(t, err)
require.Equal(t, expected, *actual)

if diff := cmp.Diff(*actual, expected); diff != "" {
t.Errorf("parsed config diff (-got +want):\n%s", diff)
}
}
5 changes: 5 additions & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ max-in-flight: 100
namespace: my-buildkite-ns
org: my-buildkite-org

# Setting a custom GraphQL endpoint and Agent REST API endpoint is usually only
# useful if you have a different instance of Buildkite itself available to run.
graphql-endpoint: http://graphql.buildkite.localhost/v1
agent-endpoint: http://agent.buildkite.localhost/v3

# only set cluster-uuid if the pipelines are in a cluster
# the UUID may be found in the cluster settings
cluster-uuid: beefcafe-abbe-baba-abba-deedcedecade
Expand Down
18 changes: 10 additions & 8 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ type Config struct {
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
AgentEndpoint string `json:"agent-endpoint" validate:"omitempty"`
// This field is mandatory for most new orgs.
// Some old orgs allows unclustered setup.
ClusterUUID string `json:"cluster-uuid" validate:"omitempty"`
Expand Down
16 changes: 9 additions & 7 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ func Run(
}

m, err := monitor.New(logger.Named("monitor"), k8sClient, monitor.Config{
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
})
if err != nil {
logger.Fatal("failed to create monitor", zap.Error(err))
Expand All @@ -54,6 +55,7 @@ func Run(
Namespace: cfg.Namespace,
Image: cfg.Image,
AgentTokenSecretName: cfg.AgentTokenSecret,
AgentEndpoint: cfg.AgentEndpoint,
JobTTL: cfg.JobTTL,
AdditionalRedactedVars: cfg.AdditionalRedactedVars,
DefaultCheckoutParams: cfg.DefaultCheckoutParams,
Expand Down
17 changes: 9 additions & 8 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ type Monitor struct {
}

type Config struct {
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
Org string
Tags []string
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
Org string
Tags []string
}

type JobHandler interface {
Create(context.Context, *api.CommandJob) error
}

func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
graphqlClient := api.NewClient(cfg.Token)
graphqlClient := api.NewClient(cfg.Token, cfg.GraphQLEndpoint)

if cfg.PollInterval < time.Second {
cfg.PollInterval = time.Second
Expand Down
14 changes: 7 additions & 7 deletions internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ func failJob(
jobUUID string,
tags []string,
message string,
options ...agentcore.ControllerOption,
) error {
ctr, err := agentcore.NewController(
ctx,
agentToken,
kjobName(jobUUID),
tags, // queue is required for acquire! maybe more
agentcore.WithUserAgent("agent-stack-k8s/"+version.Version()),
opts := append([]agentcore.ControllerOption{
agentcore.WithUserAgent("agent-stack-k8s/" + version.Version()),
agentcore.WithLogger(logger.NewConsoleLogger(logger.NewTextPrinter(os.Stderr), func(int) {})),
)
}, options...)

// queue is required for acquire! maybe more
ctr, err := agentcore.NewController(ctx, agentToken, kjobName(jobUUID), tags, opts...)
if err != nil {
zapLogger.Error("registering or connecting ephemeral agent", zap.Error(err))
return fmt.Errorf("registering or connecting ephemeral agent: %w", err)
Expand Down
9 changes: 7 additions & 2 deletions internal/controller/scheduler/imagePullBackOffWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewImagePullBackOffWatcher(
return &imagePullBackOffWatcher{
logger: logger,
k8s: k8s,
gql: api.NewClient(cfg.BuildkiteToken),
gql: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
cfg: cfg,
gracePeriod: cfg.ImagePullBackOffGradePeriod,
ignoreJobs: make(map[uuid.UUID]struct{}),
Expand Down Expand Up @@ -228,7 +228,12 @@ func (w *imagePullBackOffWatcher) failJob(ctx context.Context, log *zap.Logger,
tags = append(tags, fmt.Sprintf("%s=%s", k, value))
}

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String()); err != nil {
var opts []agentcore.ControllerOption
if w.cfg.AgentEndpoint != "" {
opts = append(opts, agentcore.WithEndpoint(w.cfg.AgentEndpoint))
}

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String(), opts...); err != nil {
log.Error("Couldn't fail the job", zap.Error(err))
// If the error was because BK rejected the acquisition, then its moved
// on to a state where we need to cancel instead.
Expand Down
15 changes: 14 additions & 1 deletion internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

"github.com/buildkite/agent/v3/clicommand"
agentcore "github.com/buildkite/agent/v3/core"

"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -43,6 +44,7 @@ type Config struct {
Namespace string
Image string
AgentTokenSecretName string
AgentEndpoint string
JobTTL time.Duration
AdditionalRedactedVars []string
DefaultCheckoutParams *config.CheckoutParams
Expand Down Expand Up @@ -479,6 +481,12 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI
},
},
}
if w.cfg.AgentEndpoint != "" {
agentContainer.Env = append(agentContainer.Env, corev1.EnvVar{
Name: "BUILDKITE_AGENT_ENDPOINT",
Value: w.cfg.AgentEndpoint,
})
}
agentContainer.Env = append(agentContainer.Env, env...)
podSpec.Containers = append(podSpec.Containers, agentContainer)

Expand Down Expand Up @@ -827,7 +835,12 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string
w.logger.Error("fetching agent token from secret", zap.Error(err))
return err
}
return failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message)

var opts []agentcore.ControllerOption
if w.cfg.AgentEndpoint != "" {
opts = append(opts, agentcore.WithEndpoint(w.cfg.AgentEndpoint))
}
return failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...)
}

func (w *worker) labelWithAgentTags(dstLabels map[string]string, agentQueryRules []string) {
Expand Down
4 changes: 2 additions & 2 deletions internal/integration/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestCleanupOrphanedPipelines(t *testing.T) {
}

ctx := context.Background()
graphqlClient := api.NewClient(cfg.BuildkiteToken)
graphqlClient := api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint)

pipelines, err := api.SearchPipelines(ctx, graphqlClient, cfg.Org, "test-", 100)
require.NoError(t, err)
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestCleanupOrphanedPipelines(t *testing.T) {

tc := testcase{
T: t,
GraphQL: api.NewClient(cfg.BuildkiteToken),
GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
PipelineName: pipeline.Node.Name,
}.Init()
tc.deletePipeline(ctx)
Expand Down
Loading

0 comments on commit aa8f1ed

Please sign in to comment.