Skip to content

Make Buildkite endpoints configurable #385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 47 additions & 28 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ import (
"github.com/Khan/genqlient/graphql"
)

func NewClient(token string) graphql.Client {
func NewClient(token, endpoint string) graphql.Client {
if endpoint == "" {
endpoint = "https://graphql.buildkite.com/v1"
}
httpClient := http.Client{
Timeout: 60 * time.Second,
Transport: NewLogger(&authedTransport{
key: token,
wrapped: http.DefaultTransport,
}),
}
return graphql.NewClient("https://graphql.buildkite.com/v1", &httpClient)
return graphql.NewClient(endpoint, &httpClient)
}

type authedTransport struct {
Expand All @@ -30,8 +33,24 @@ type authedTransport struct {
}

func (t *authedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", "Bearer "+t.key)
return t.wrapped.RoundTrip(req)
// RoundTripper should not mutate the request except to close the body, and
// should always close the request body whether or not there was an error.
// See https://pkg.go.dev/net/http#RoundTripper.
// This implementation based on https://github.com/golang/oauth2/blob/master/transport.go
reqBodyClosed := false
if req.Body != nil {
defer func() {
if !reqBodyClosed {
req.Body.Close()
}
}()
}

reqCopy := req.Clone(req.Context())
reqCopy.Header.Set("Authorization", "Bearer "+t.key)

reqBodyClosed = true
return t.wrapped.RoundTrip(reqCopy)
}

type logTransport struct {
Expand All @@ -51,42 +70,42 @@ func (t *logTransport) RoundTrip(in *http.Request) (out *http.Response, err erro
log.Printf("--> %s %s", in.Method, in.URL)

// Save these headers so we can redact Authorization.
savedHeaders := in.Header.Clone()
inCopy := in
if in.Header != nil && in.Header.Get("authorization") != "" {
in.Header.Set("authorization", "<redacted>")
inCopy = in.Clone(in.Context())
inCopy.Header.Set("authorization", "<redacted>")
}

b, err := httputil.DumpRequestOut(in, true)
if err == nil {
log.Println(string(b))
} else {
b, err := httputil.DumpRequestOut(inCopy, true)
if err != nil {
log.Printf("Failed to dump request %s %s: %v", in.Method, in.URL, err)
}

// Restore the non-redacted headers.
in.Header = savedHeaders
if b := string(b); b != "" {
log.Println(b)
}

start := time.Now()
out, err = t.inner.RoundTrip(in)
duration := time.Since(start)
if err != nil {
log.Printf("<-- %v %s %s (%s)", err, in.Method, in.URL, duration)
}
if out != nil {
msg := fmt.Sprintf("<-- %d", out.StatusCode)
if out.Request != nil {
msg = fmt.Sprintf("%s %s", msg, out.Request.URL)
}
msg = fmt.Sprintf("%s (%s)", msg, duration)

log.Print(msg)

b, err := httputil.DumpResponse(out, true)
if err == nil {
log.Println(string(b))
} else {
log.Printf("Failed to dump response %s %s: %v", in.Method, in.URL, err)
}

if out == nil {
return
}
msg := fmt.Sprintf("<-- %d", out.StatusCode)
if out.Request != nil {
msg = fmt.Sprintf("%s %s", msg, out.Request.URL)
}
log.Printf("%s (%s)", msg, duration)

b, err = httputil.DumpResponse(out, true)
if err != nil {
log.Printf("Failed to dump response %s %s: %v", in.Method, in.URL, err)
}
if b := string(b); b != "" {
log.Println(b)
}
return
}
12 changes: 12 additions & 0 deletions charts/agent-stack-k8s/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@
"title": "The config Schema",
"required": ["org"],
"properties": {
"agent-endpoint": {
"type": "string",
"default": "",
"title": "The Agent REST API endpoint URL",
"examples": [""]
},
"graphql-endpoint": {
"type": "string",
"default": "",
"title": "The GraphQL endpoint URL",
"examples": [""]
},
"image": {
"type": "string",
"default": "",
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func AddConfigFlags(cmd *cobra.Command) {
"",
"Bind address to expose the pprof profiler (e.g. localhost:6060)",
)
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")
cmd.Flags().String("agent-endpoint", "", "Buildkite Agent REST API endpoint URL")

cmd.Flags().Duration(
"image-pull-backoff-grace-period",
config.DefaultImagePullBackOffGracePeriod,
Expand Down
8 changes: 7 additions & 1 deletion cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/buildkite/agent-stack-k8s/v2/cmd/controller"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/google/go-cmp/cmp"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +32,8 @@ func TestReadAndParseConfig(t *testing.T) {
Tags: []string{"queue=my-queue", "priority=high"},
ClusterUUID: "beefcafe-abbe-baba-abba-deedcedecade",
ProhibitKubernetesPlugin: true,
GraphQLEndpoint: "http://graphql.buildkite.localhost/v1",
AgentEndpoint: "http://agent.buildkite.localhost/v3",
DefaultCommandParams: &config.CommandParams{
Interposer: config.InterposerVector,
EnvFrom: []corev1.EnvFromSource{{
Expand Down Expand Up @@ -115,5 +118,8 @@ func TestReadAndParseConfig(t *testing.T) {

actual, err := controller.ParseAndValidateConfig(v)
require.NoError(t, err)
require.Equal(t, expected, *actual)

if diff := cmp.Diff(*actual, expected); diff != "" {
t.Errorf("parsed config diff (-got +want):\n%s", diff)
}
}
5 changes: 5 additions & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ max-in-flight: 100
namespace: my-buildkite-ns
org: my-buildkite-org

# Setting a custom GraphQL endpoint and Agent REST API endpoint is usually only
# useful if you have a different instance of Buildkite itself available to run.
graphql-endpoint: http://graphql.buildkite.localhost/v1
agent-endpoint: http://agent.buildkite.localhost/v3

# only set cluster-uuid if the pipelines are in a cluster
# the UUID may be found in the cluster settings
cluster-uuid: beefcafe-abbe-baba-abba-deedcedecade
Expand Down
18 changes: 10 additions & 8 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ type Config struct {
Debug bool `json:"debug"`
JobTTL time.Duration `json:"job-ttl"`
PollInterval time.Duration `json:"poll-interval"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
AgentTokenSecret string `json:"agent-token-secret" validate:"required"`
BuildkiteToken string `json:"buildkite-token" validate:"required"`
Image string `json:"image" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
AgentEndpoint string `json:"agent-endpoint" validate:"omitempty"`
// This field is mandatory for most new orgs.
// Some old orgs allows unclustered setup.
ClusterUUID string `json:"cluster-uuid" validate:"omitempty"`
Expand Down
16 changes: 9 additions & 7 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ func Run(
}

m, err := monitor.New(logger.Named("monitor"), k8sClient, monitor.Config{
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
GraphQLEndpoint: cfg.GraphQLEndpoint,
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
MaxInFlight: cfg.MaxInFlight,
PollInterval: cfg.PollInterval,
Tags: cfg.Tags,
Token: cfg.BuildkiteToken,
})
if err != nil {
logger.Fatal("failed to create monitor", zap.Error(err))
Expand All @@ -54,6 +55,7 @@ func Run(
Namespace: cfg.Namespace,
Image: cfg.Image,
AgentTokenSecretName: cfg.AgentTokenSecret,
AgentEndpoint: cfg.AgentEndpoint,
JobTTL: cfg.JobTTL,
AdditionalRedactedVars: cfg.AdditionalRedactedVars,
DefaultCheckoutParams: cfg.DefaultCheckoutParams,
Expand Down
17 changes: 9 additions & 8 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ type Monitor struct {
}

type Config struct {
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
Org string
Tags []string
GraphQLEndpoint string
Namespace string
Token string
ClusterUUID string
MaxInFlight int
PollInterval time.Duration
Org string
Tags []string
}

type JobHandler interface {
Create(context.Context, *api.CommandJob) error
}

func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, error) {
graphqlClient := api.NewClient(cfg.Token)
graphqlClient := api.NewClient(cfg.Token, cfg.GraphQLEndpoint)

if cfg.PollInterval < time.Second {
cfg.PollInterval = time.Second
Expand Down
14 changes: 7 additions & 7 deletions internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ func failJob(
jobUUID string,
tags []string,
message string,
options ...agentcore.ControllerOption,
) error {
ctr, err := agentcore.NewController(
ctx,
agentToken,
kjobName(jobUUID),
tags, // queue is required for acquire! maybe more
agentcore.WithUserAgent("agent-stack-k8s/"+version.Version()),
opts := append([]agentcore.ControllerOption{
agentcore.WithUserAgent("agent-stack-k8s/" + version.Version()),
agentcore.WithLogger(logger.NewConsoleLogger(logger.NewTextPrinter(os.Stderr), func(int) {})),
)
}, options...)

// queue is required for acquire! maybe more
ctr, err := agentcore.NewController(ctx, agentToken, kjobName(jobUUID), tags, opts...)
if err != nil {
zapLogger.Error("registering or connecting ephemeral agent", zap.Error(err))
return fmt.Errorf("registering or connecting ephemeral agent: %w", err)
Expand Down
9 changes: 7 additions & 2 deletions internal/controller/scheduler/imagePullBackOffWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewImagePullBackOffWatcher(
return &imagePullBackOffWatcher{
logger: logger,
k8s: k8s,
gql: api.NewClient(cfg.BuildkiteToken),
gql: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
cfg: cfg,
gracePeriod: cfg.ImagePullBackOffGradePeriod,
ignoreJobs: make(map[uuid.UUID]struct{}),
Expand Down Expand Up @@ -228,7 +228,12 @@ func (w *imagePullBackOffWatcher) failJob(ctx context.Context, log *zap.Logger,
tags = append(tags, fmt.Sprintf("%s=%s", k, value))
}

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String()); err != nil {
var opts []agentcore.ControllerOption
if w.cfg.AgentEndpoint != "" {
opts = append(opts, agentcore.WithEndpoint(w.cfg.AgentEndpoint))
}

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String(), opts...); err != nil {
log.Error("Couldn't fail the job", zap.Error(err))
// If the error was because BK rejected the acquisition, then its moved
// on to a state where we need to cancel instead.
Expand Down
15 changes: 14 additions & 1 deletion internal/controller/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

"github.com/buildkite/agent/v3/clicommand"
agentcore "github.com/buildkite/agent/v3/core"

"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -43,6 +44,7 @@ type Config struct {
Namespace string
Image string
AgentTokenSecretName string
AgentEndpoint string
JobTTL time.Duration
AdditionalRedactedVars []string
DefaultCheckoutParams *config.CheckoutParams
Expand Down Expand Up @@ -479,6 +481,12 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI
},
},
}
if w.cfg.AgentEndpoint != "" {
agentContainer.Env = append(agentContainer.Env, corev1.EnvVar{
Name: "BUILDKITE_AGENT_ENDPOINT",
Value: w.cfg.AgentEndpoint,
})
}
agentContainer.Env = append(agentContainer.Env, env...)
podSpec.Containers = append(podSpec.Containers, agentContainer)

Expand Down Expand Up @@ -827,7 +835,12 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string
w.logger.Error("fetching agent token from secret", zap.Error(err))
return err
}
return failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message)

var opts []agentcore.ControllerOption
if w.cfg.AgentEndpoint != "" {
opts = append(opts, agentcore.WithEndpoint(w.cfg.AgentEndpoint))
}
return failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...)
}

func (w *worker) labelWithAgentTags(dstLabels map[string]string, agentQueryRules []string) {
Expand Down
4 changes: 2 additions & 2 deletions internal/integration/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestCleanupOrphanedPipelines(t *testing.T) {
}

ctx := context.Background()
graphqlClient := api.NewClient(cfg.BuildkiteToken)
graphqlClient := api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint)

pipelines, err := api.SearchPipelines(ctx, graphqlClient, cfg.Org, "test-", 100)
require.NoError(t, err)
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestCleanupOrphanedPipelines(t *testing.T) {

tc := testcase{
T: t,
GraphQL: api.NewClient(cfg.BuildkiteToken),
GraphQL: api.NewClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
PipelineName: pipeline.Node.Name,
}.Init()
tc.deletePipeline(ctx)
Expand Down
Loading