diff --git a/go.mod b/go.mod index ee9f88f..898a991 100644 --- a/go.mod +++ b/go.mod @@ -3,23 +3,20 @@ module github.com/renderedtext/agent-k8s-stack go 1.21 require ( - github.com/dgraph-io/ristretto v0.1.1 k8s.io/api v0.28.4 k8s.io/apimachinery v0.28.4 k8s.io/client-go v0.28.4 + k8s.io/klog/v2 v2.120.0 ) require ( - github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -32,7 +29,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect @@ -45,7 +41,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index e89a04e..8cd49fd 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,11 @@ -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= @@ -25,8 +16,6 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -71,8 +60,6 @@ github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -83,7 +70,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -111,7 +97,6 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= @@ -144,7 +129,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= @@ -157,8 +141,8 @@ k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4= -k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= -k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.120.0 h1:z+q5mfovBj1fKFxiRzsa2DsJLPIVMk/KFL81LMOfK+8= +k8s.io/klog/v2 v2.120.0/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= diff --git a/main.go b/main.go index 263ab9a..22137f4 100644 --- a/main.go +++ b/main.go @@ -1,48 +1,80 @@ package main import ( + "flag" "fmt" "log" "os" "path/filepath" "strconv" "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" "github.com/renderedtext/agent-k8s-stack/pkg/controller" "github.com/renderedtext/agent-k8s-stack/pkg/semaphore" + "github.com/renderedtext/agent-k8s-stack/pkg/signals" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) func main() { + klog.InitFlags(nil) + flag.Parse() + + // set up signals so we handle the shutdown signal gracefully + ctx := signals.SetupSignalHandler() + apiToken := os.Getenv("SEMAPHORE_API_TOKEN") if apiToken == "" { - log.Fatal("no SEMAPHORE_API_TOKEN specified") + klog.Error("invalid configuration: no SEMAPHORE_API_TOKEN specified") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } endpoint := os.Getenv("SEMAPHORE_ENDPOINT") if endpoint == "" { - log.Fatal("no SEMAPHORE_ENDPOINT specified") + klog.Error("invalid configuration: no SEMAPHORE_ENDPOINT specified") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } cfg, err := buildConfig(endpoint) if err != nil { - log.Fatalf("could not build config: %v", err) + klog.Errorf("error building config: %v", err) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } clientset, err := newK8sClientset() if err != nil { - log.Fatal("error creating k8s clientset") + klog.Errorf("error creating kube client: %v", err) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } semaphoreClient := semaphore.NewClient(endpoint, apiToken) - controller, err := controller.New(cfg, semaphoreClient, clientset) + informerFactory := informers.NewSharedInformerFactoryWithOptions( + clientset, + time.Second*30, + informers.WithNamespace(cfg.Namespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = "semaphoreci.com/resource-type=agent-type-configuration" + }), + ) + + controller, err := controller.New(ctx, informerFactory, cfg, semaphoreClient, clientset) if err != nil { panic(err) } - controller.Run() + // Start method is non-blocking and runs all registered informers in a dedicated goroutine. + informerFactory.Start(ctx.Done()) + + if err = controller.Run(ctx); err != nil { + klog.Errorf("Error running controller: %v", err) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } } func buildConfig(endpoint string) (*controller.Config, error) { diff --git a/pkg/agent_types/registry.go b/pkg/agent_types/registry.go new file mode 100644 index 0000000..f5c5c36 --- /dev/null +++ b/pkg/agent_types/registry.go @@ -0,0 +1,129 @@ +package agent_types + +import ( + "fmt" + "strings" + + "k8s.io/client-go/informers" + "k8s.io/klog/v2" + + v1 "k8s.io/api/core/v1" +) + +type AgentType struct { + SecretName string + AgentTypeName string + RegistrationToken string + AgentStartupParameters []string +} + +type Registry struct { + agentTypes map[string]*AgentType +} + +func NewRegistry() (*Registry, error) { + return &Registry{ + agentTypes: map[string]*AgentType{}, + }, nil +} + +func (r *Registry) RegisterInformer(informerFactory informers.SharedInformerFactory) error { + informer := informerFactory.Core().V1().Secrets() + informer.Informer().AddEventHandler(r) + return nil +} + +func (r *Registry) OnAdd(obj interface{}, _ bool) { + secret := obj.(*v1.Secret) + agentType, err := parseAgentType(secret) + if err != nil { + klog.Errorf("Error when adding agent type: %v", err) + return + } + + klog.Infof("Agent type added: %s", agentType.AgentTypeName) + r.agentTypes[agentType.AgentTypeName] = agentType +} + +func (r *Registry) OnUpdate(oldObj, newObj interface{}) { + newSecret := newObj.(*v1.Secret) + oldSecret := oldObj.(*v1.Secret) + if newSecret.ResourceVersion == oldSecret.ResourceVersion { + return + } + + agentType, err := parseAgentType(newSecret) + if err != nil { + klog.Errorf("Error when updating agent type: %v", err) + return + } + + klog.Infof("Agent type updated: %s", agentType.AgentTypeName) + r.agentTypes[agentType.AgentTypeName] = agentType +} + +func (r *Registry) OnDelete(obj interface{}) { + secret := obj.(*v1.Secret) + agentTypeName, err := findAgentTypeName(secret) + if err != nil { + fmt.Printf("Error when deleting agent type: %v\n", err) + return + } + + klog.Infof("Agent type deleted: %s", agentTypeName) + delete(r.agentTypes, agentTypeName) +} + +func (r *Registry) All() []string { + types := []string{} + for k := range r.agentTypes { + types = append(types, k) + } + + return types +} + +func (r *Registry) Get(name string) *AgentType { + v, ok := r.agentTypes[name] + if !ok { + return nil + } + + return v +} + +func findAgentTypeName(secret *v1.Secret) (string, error) { + agentTypeName, ok := secret.Data["agentTypeName"] + if !ok { + return "", fmt.Errorf("no 'agentTypeName' field in secret '%s'", secret.GetName()) + } + + return string(agentTypeName), nil +} + +func parseAgentType(secret *v1.Secret) (*AgentType, error) { + agentTypeName, ok := secret.Data["agentTypeName"] + if !ok { + return nil, fmt.Errorf("no agentTypeName field in secret '%s'", secret.GetName()) + } + + registrationToken, ok := secret.Data["registrationToken"] + if !ok { + return nil, fmt.Errorf("no registrationToken field in secret '%s'", secret.GetName()) + } + + agentStartupParameters := []string{} + if parameters, ok := secret.Data["agentStartupParameters"]; ok && string(parameters) != "" { + for _, v := range strings.Split(string(parameters), " ") { + parameter := strings.Trim(strings.Trim(v, "\n"), " ") + agentStartupParameters = append(agentStartupParameters, parameter) + } + } + + return &AgentType{ + SecretName: secret.GetName(), + AgentTypeName: string(agentTypeName), + RegistrationToken: string(registrationToken), + AgentStartupParameters: agentStartupParameters, + }, nil +} diff --git a/pkg/controller/agent_type_finder.go b/pkg/controller/agent_type_finder.go deleted file mode 100644 index 8cff4be..0000000 --- a/pkg/controller/agent_type_finder.go +++ /dev/null @@ -1,135 +0,0 @@ -package controller - -import ( - "context" - "fmt" - "log" - "strings" - "time" - - "github.com/dgraph-io/ristretto" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" -) - -type AgentType struct { - SecretName string - AgentTypeName string - RegistrationToken string - AgentStartupParameters []string -} - -// We cache the agent type secret information -// to avoid going to the Kubernetes on every iteration. -// But we should also reach to changes to the agent type secrets, -// so we put an expiration on them. -var SecretCacheTTL = 5 * time.Minute - -type AgentTypeFinder struct { - secretsInterface typedv1.SecretInterface - cache *ristretto.Cache -} - -func NewAgentTypeFinder(client kubernetes.Interface, namespace string) (*AgentTypeFinder, error) { - // The provider needs read access to secrets, - // so it can find all the secrets for each agent type, - // and use the agent type token in them to grab metrics from the Semaphore API. - secretsInterface := client.CoreV1().Secrets(namespace) - - /* - * We keep at most 50 agent types in our cache. - */ - cache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 500, - MaxCost: 50, - BufferItems: 64, - Metrics: false, - }) - - if err != nil { - return nil, err - } - - return &AgentTypeFinder{ - secretsInterface: secretsInterface, - cache: cache, - }, nil -} - -func (f *AgentTypeFinder) Find() ([]*AgentType, error) { - list, err := f.secretsInterface.List(context.Background(), metav1.ListOptions{ - LabelSelector: "semaphoreci.com/resource-type=agent-type-configuration", - }) - - if err != nil { - return []*AgentType{}, fmt.Errorf("error listing secrets: %v", err) - } - - agentTypes := []*AgentType{} - for _, secret := range list.Items { - agentType, err := f.findAgentType(secret.GetName()) - if err != nil { - log.Printf("Error decoding secret %s: %v", secret.GetName(), err) - continue - } - - agentTypes = append(agentTypes, agentType) - } - - return agentTypes, nil -} - -// Get the agent type information (endpoint and token) from the secret specified. -// We also cache this information to avoid going to the Kubernetes API on every iteration. -func (f *AgentTypeFinder) findAgentType(secretName string) (*AgentType, error) { - value, found := f.cache.Get(secretName) - if found { - if info, ok := value.(*AgentType); ok { - return info, nil - } - } - - // If the agent type info does not exist in the cache, - // we fetch the information from the Kubernetes API. - o, err := f.secretsInterface.Get(context.Background(), secretName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("error describing secret: %v", err) - } - - info, err := f.secretToAgentType(o) - if err != nil { - return nil, fmt.Errorf("error finding agent type information in secret: %v", err) - } - - f.cache.SetWithTTL(secretName, info, 1, SecretCacheTTL) - return info, nil -} - -func (f *AgentTypeFinder) secretToAgentType(secret *v1.Secret) (*AgentType, error) { - agentTypeName, ok := secret.Data["agentTypeName"] - if !ok { - return nil, fmt.Errorf("no agentTypeName field in secret '%s'", secret.GetName()) - } - - registrationToken, ok := secret.Data["registrationToken"] - if !ok { - return nil, fmt.Errorf("no registrationToken field in secret '%s'", secret.GetName()) - } - - agentStartupParameters := []string{} - if parameters, ok := secret.Data["agentStartupParameters"]; ok && string(parameters) != "" { - for _, v := range strings.Split(string(parameters), " ") { - parameter := strings.Trim(strings.Trim(v, "\n"), " ") - agentStartupParameters = append(agentStartupParameters, parameter) - } - } - - return &AgentType{ - SecretName: secret.GetName(), - AgentTypeName: string(agentTypeName), - RegistrationToken: string(registrationToken), - AgentStartupParameters: agentStartupParameters, - }, nil -} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f0cf085..006a87a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,15 +3,20 @@ package controller import ( "context" "fmt" - "log" "strings" "time" + "k8s.io/client-go/informers" + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/util/wait" + + agentTypes "github.com/renderedtext/agent-k8s-stack/pkg/agent_types" "github.com/renderedtext/agent-k8s-stack/pkg/semaphore" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -26,82 +31,88 @@ type Config struct { } type Controller struct { - cfg *Config - semaphoreClient *semaphore.Client - agentTypeFinder *AgentTypeFinder - clientset kubernetes.Interface + cfg *Config + semaphoreClient *semaphore.Client + agentTypeRegistry *agentTypes.Registry + clientset kubernetes.Interface currentJobs []semaphore.JobRequest } -func New(cfg *Config, semaphoreClient *semaphore.Client, clientset kubernetes.Interface) (*Controller, error) { - agentTypeFinder, err := NewAgentTypeFinder(clientset, cfg.Namespace) +func New( + ctx context.Context, + informerFactory informers.SharedInformerFactory, + cfg *Config, + semaphoreClient *semaphore.Client, + clientset kubernetes.Interface) (*Controller, error) { + + agentTypeRegistry, err := agentTypes.NewRegistry() if err != nil { return nil, err } + if err := agentTypeRegistry.RegisterInformer(informerFactory); err != nil { + return nil, err + } + return &Controller{ - cfg: cfg, - semaphoreClient: semaphoreClient, - clientset: clientset, - agentTypeFinder: agentTypeFinder, - currentJobs: []semaphore.JobRequest{}, + cfg: cfg, + semaphoreClient: semaphoreClient, + clientset: clientset, + agentTypeRegistry: agentTypeRegistry, + currentJobs: []semaphore.JobRequest{}, }, nil } -func (c *Controller) Run() { - for { - c.tick() - time.Sleep(10 * time.Second) - } +func (c *Controller) Run(ctx context.Context) error { + logger := klog.FromContext(ctx) + logger.Info("Starting controller") + + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + + <-ctx.Done() + logger.Info("Shutting down workers") + + return nil } -func (c *Controller) tick() { - agentTypes, err := c.agentTypeFinder.Find() - if err != nil { - log.Printf("Error finding agent types: %v", err) - return +func (c *Controller) runWorker(ctx context.Context) { + for c.tick(ctx) { + time.Sleep(10 * time.Second) } +} +func (c *Controller) tick(ctx context.Context) bool { + agentTypes := c.agentTypeRegistry.All() if len(agentTypes) == 0 { - log.Printf("No agent types found. Not even looking at the queue.") - return + klog.Info("No agent types found") + return true } - log.Printf("Looking for jobs...\n") - jobs, err := c.semaphoreClient.JobsFor(agentTypeNames(agentTypes)) + klog.Infof("Polling job queue for %v", agentTypes) + jobs, err := c.semaphoreClient.JobsFor(agentTypes) if err != nil { - log.Printf("Error: %v\n", err) - return + klog.Error(err, "error polling job queue") + return true } - log.Printf("Found %d jobs in the queue\n", len(jobs)) + klog.Infof("Found %d jobs in the queue", len(jobs)) for _, j := range jobs { if len(c.currentJobs) == c.cfg.MaxParallelJobs { - log.Printf("Reached max parallel number of jobs: %d\n", c.cfg.MaxParallelJobs) + klog.Infof("Reached %d max parallel number of jobs", c.cfg.MaxParallelJobs) break } c.addJob(j) } - c.reconcile(agentTypes) -} - -func agentTypeNames(agentTypes []*AgentType) []string { - names := []string{} - for _, agentType := range agentTypes { - names = append(names, agentType.AgentTypeName) - } - - return names + c.reconcile(ctx) + return true } -func (c *Controller) reconcile(agentTypes []*AgentType) { - log.Printf("Current jobs: %v\n", c.currentJobs) +func (c *Controller) reconcile(ctx context.Context) { for _, j := range c.currentJobs { - log.Printf("Reconciling job %s\n", j) - c.reconcileJob(j, agentTypes) + c.reconcileJob(ctx, j) } } @@ -109,48 +120,46 @@ func (c *Controller) jobName(jobID string) string { return fmt.Sprintf("semaphore-agent-%s", jobID) } -func (c *Controller) reconcileJob(job semaphore.JobRequest, agentTypes []*AgentType) { +func (c *Controller) reconcileJob(ctx context.Context, job semaphore.JobRequest) { + logger := klog.LoggerWithValues(klog.Background(), "jobID", job.JobID, "agentType", job.MachineType) j, err := c.clientset.BatchV1(). Jobs(c.cfg.Namespace). - Get(context.Background(), c.jobName(job.JobID), v1.GetOptions{}) + Get(ctx, c.jobName(job.JobID), metav1.GetOptions{}) // Job already exists, check if it's finished if err == nil { if j.Status.Succeeded > 0 { - log.Printf("[%s] Job finished successfully - deleting...\n", job.JobID) - if err := c.deleteJob(c.clientset, j.Name); err != nil { - log.Printf("[%s] Error deleting finished job - %v\n", job.JobID, err) - } else { - c.removeJob(job.JobID) + logger.Info("Job finished successfully - deleting") + if err := c.deleteJob(ctx, c.clientset, j.Name); err != nil { + logger.Error(err, "Error deleting finished job") } + c.removeJob(job.JobID) return } // NOTE: if the job finished, but failed, we leave it around for troubleshooting purposes. if j.Status.Failed > 0 { - log.Printf("[%s] Job finished and failed\n", job.JobID) + logger.Info("Job finished and failed") return } - log.Printf("[%s] Job not yet finished.\n", job.JobID) + logger.Info("Job not yet finished") } // Job does not exist, so we need to create it. if errors.IsNotFound(err) { - err := c.createJob(c.clientset, job, agentTypes) + err := c.createJob(ctx, c.clientset, job) if err != nil { - log.Printf("[%s] Error creating job: %v\n", err, job.JobID) - } else { - log.Printf("[%s] Job created successfully\n", job.JobID) + logger.Error(err, "Error creating job") } + logger.Info("Job created successfully") return } if err != nil { - log.Printf("[%s] Unknown error when trying to find job: %v\n", job.JobID, err) - return + logger.Error(err, "Unknown error when trying to find job") } } @@ -181,39 +190,29 @@ func (c *Controller) removeJob(ID string) { c.currentJobs = newJobs } -func (c *Controller) createJob(k8sClient kubernetes.Interface, job semaphore.JobRequest, agentTypes []*AgentType) error { - k8sJob, err := c.buildJob(job, agentTypes) +func (c *Controller) createJob(ctx context.Context, k8sClient kubernetes.Interface, job semaphore.JobRequest) error { + k8sJob, err := c.buildJob(job) if err != nil { return err } _, err = k8sClient.BatchV1(). Jobs(c.cfg.Namespace). - Create(context.Background(), k8sJob, v1.CreateOptions{}) + Create(ctx, k8sJob, metav1.CreateOptions{}) return err } -func (c *Controller) deleteJob(k8sClient kubernetes.Interface, name string) error { - propagationPolicy := v1.DeletePropagationBackground +func (c *Controller) deleteJob(ctx context.Context, k8sClient kubernetes.Interface, name string) error { + propagationPolicy := metav1.DeletePropagationBackground return k8sClient.BatchV1(). Jobs(c.cfg.Namespace). - Delete(context.Background(), name, v1.DeleteOptions{ + Delete(ctx, name, metav1.DeleteOptions{ PropagationPolicy: &propagationPolicy, }) } -func findAgentType(agentTypes []*AgentType, typeName string) *AgentType { - for _, agentType := range agentTypes { - if agentType.AgentTypeName == typeName { - return agentType - } - } - - return nil -} - -func (c *Controller) buildJob(job semaphore.JobRequest, agentTypes []*AgentType) (*batchv1.Job, error) { - agentType := findAgentType(agentTypes, job.MachineType) +func (c *Controller) buildJob(job semaphore.JobRequest) (*batchv1.Job, error) { + agentType := c.agentTypeRegistry.Get(job.MachineType) if agentType == nil { return nil, fmt.Errorf("nil agent type") } @@ -224,7 +223,7 @@ func (c *Controller) buildJob(job semaphore.JobRequest, agentTypes []*AgentType) terminationGracePeriod := int64(300) return &batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: c.jobName(job.JobID), Namespace: c.cfg.Namespace, Labels: c.buildLabels(job), @@ -235,7 +234,7 @@ func (c *Controller) buildJob(job semaphore.JobRequest, agentTypes []*AgentType) BackoffLimit: &retries, ActiveDeadlineSeconds: &activeDeadlineSeconds, Template: corev1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{Labels: c.buildLabels(job)}, + ObjectMeta: metav1.ObjectMeta{Labels: c.buildLabels(job)}, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, ServiceAccountName: c.cfg.ServiceAccountName, @@ -295,7 +294,7 @@ func (c *Controller) buildLabels(job semaphore.JobRequest) map[string]string { return labels } -func (c *Controller) buildAgentStartupParameters(agentType *AgentType, jobID string) []string { +func (c *Controller) buildAgentStartupParameters(agentType *agentTypes.AgentType, jobID string) []string { labels := []string{ fmt.Sprintf("semaphoreci.com/agent-type=%s", agentType.AgentTypeName), } diff --git a/pkg/signals/signals.go b/pkg/signals/signals.go new file mode 100644 index 0000000..91ba265 --- /dev/null +++ b/pkg/signals/signals.go @@ -0,0 +1,30 @@ +package signals + +import ( + "context" + "os" + "os/signal" + "syscall" +) + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} +var onlyOneSignalHandler = make(chan struct{}) + +// SetupSignalHandler registered for SIGTERM and SIGINT. A context is returned +// which is cancelled on one of these signals. If a second signal is caught, +// the program is terminated with exit code 1. +func SetupSignalHandler() context.Context { + close(onlyOneSignalHandler) // panics when called twice + + c := make(chan os.Signal, 2) + ctx, cancel := context.WithCancel(context.Background()) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + cancel() + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return ctx +}