Skip to content

Commit

Permalink
feat: use /api/v1/self_hosted_agents/jobs API to look at job queue (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaspin authored Nov 29, 2024
1 parent 3c892b4 commit 547824f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 48 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ A Kubernetes controller that runs Semaphore jobs in Kubernetes.

| Environment variable | Description |
|----------------------------------------|-------------|
| SEMAPHORE_API_TOKEN | The Semaphore API token used to inspect the job queues. |
| SEMAPHORE_ENDPOINT | The Semaphore control plane endpoint, e.g. `<your-organization>.semaphoreci.com`. |
| KUBERNETES_NAMESPACE | The Kubernetes namespace where the resources for Semaphore jobs will be created. By default, the default namespace is used. |
| SEMAPHORE_AGENT_IMAGE | The [Semaphore agent](https://github.com/semaphoreci/agent) image to use when creating agents. By default, `semaphoreci/agent:latest`. |
Expand Down
8 changes: 1 addition & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ func main() {

ctx := signals.SetupSignalHandler()

apiToken := os.Getenv("SEMAPHORE_API_TOKEN")
if apiToken == "" {
klog.Error("invalid configuration: no SEMAPHORE_API_TOKEN specified")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

endpoint := os.Getenv("SEMAPHORE_ENDPOINT")
if endpoint == "" {
klog.Error("invalid configuration: no SEMAPHORE_ENDPOINT specified")
Expand All @@ -53,7 +47,7 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

semaphoreClient := semaphore.NewClient(endpoint, apiToken)
semaphoreClient := semaphore.NewClient(endpoint)
informerFactory, err := NewInformerFactory(clientset, cfg)
if err != nil {
klog.Errorf("error creating informer factory: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agenttypes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func (r *Registry) OnDelete(obj interface{}) {
delete(r.agentTypes, agentTypeName)
}

func (r *Registry) All() []string {
types := []string{}
for k := range r.agentTypes {
types = append(types, k)
func (r *Registry) All() []*AgentType {
types := []*AgentType{}
for _, v := range r.agentTypes {
types = append(types, v)
}

return types
Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Controller) runWorker(ctx context.Context) {
func (c *Controller) tick(ctx context.Context) bool {
agentTypes := c.agentTypeRegistry.All()
if len(agentTypes) == 0 {
klog.Info("No agent types found")
klog.Info("Not polling Semaphore API - no agent types found")
return true
}

Expand All @@ -86,8 +86,8 @@ func (c *Controller) tick(ctx context.Context) bool {
return true
}

klog.InfoS("Polling Semaphore API", "types", agentTypes)
jobs, err := c.semaphoreClient.JobsFor(agentTypes)
klog.InfoS("Polling Semaphore API", "types", agentTypeNames(agentTypes))
jobs, err := c.semaphoreClient.ListJobs(agentTypes)
if err != nil {
klog.Error(err, "error polling job queue")
return true
Expand All @@ -111,3 +111,11 @@ func (c *Controller) tick(ctx context.Context) bool {

return true
}

func agentTypeNames(agentTypes []*agenttypes.AgentType) []string {
names := []string{}
for _, agentType := range agentTypes {
names = append(names, agentType.AgentTypeName)
}
return names
}
73 changes: 40 additions & 33 deletions pkg/semaphore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,80 +5,87 @@ import (
"fmt"
"io"
"net/http"

"github.com/renderedtext/agent-k8s-stack/pkg/agenttypes"
"k8s.io/klog/v2"
)

type Client struct {
Endpoint string
Token string
}

func NewClient(endpoint, token string) *Client {
func NewClient(endpoint string) *Client {
return &Client{
Endpoint: endpoint,
Token: token,
}
}

type APIResponse struct {
Jobs []APIJob `json:"jobs" yaml:"jobs"`
}

type APIJob struct {
Metadata APIJobMetadata `json:"metadata" yaml:"metadata"`
Spec APIJobSpec `json:"spec" yaml:"spec"`
type Response struct {
Jobs []Job `json:"jobs" yaml:"jobs"`
}

// There are more fields here, but I only care about the ID for now.
type APIJobMetadata struct {
type Job struct {
ID string `json:"id" yaml:"id"`
}

// There are more fields here, but I only care about the machine type for now.
type APIJobSpec struct {
Agent struct {
Machine struct {
Type string
}
}
}

type JobRequest struct {
JobID string
MachineType string
}

func (a *Client) JobsFor(machineTypes []string) ([]JobRequest, error) {
URL := fmt.Sprintf("https://%s/api/v1alpha/jobs?states=PENDING&states=QUEUED", a.Endpoint)
func (a *Client) ListJobs(agentTypes []*agenttypes.AgentType) ([]JobRequest, error) {
jobRequests := []JobRequest{}

for _, agentType := range agentTypes {
jobs, err := a.listJobsForAgentType(agentType)
if err != nil {
klog.ErrorS(err, "error listing jobs for agent type", "agentType", agentType.AgentTypeName)
continue
}

for _, j := range jobs {
jobRequest := JobRequest{
JobID: j,
MachineType: agentType.AgentTypeName,
}
jobRequests = append(jobRequests, jobRequest)
}
}

return jobRequests, nil
}

func (a *Client) listJobsForAgentType(agentType *agenttypes.AgentType) ([]string, error) {
URL := fmt.Sprintf("https://%s/api/v1/self_hosted_agents/jobs", a.Endpoint)
req, err := http.NewRequest(http.MethodGet, URL, nil)
if err != nil {
return []JobRequest{}, err
return []string{}, err
}

req.Header.Add("Authorization", fmt.Sprintf("Token %s", a.Token))
req.Header.Add("Authorization", fmt.Sprintf("Token %s", agentType.RegistrationToken))
response, err := http.DefaultClient.Do(req)
if err != nil {
return []JobRequest{}, err
return []string{}, err
}

body, err := io.ReadAll(response.Body)
if err != nil {
return []JobRequest{}, err
return []string{}, err
}

apiResponse := APIResponse{}
apiResponse := Response{}
err = json.Unmarshal(body, &apiResponse)
if err != nil {
return []JobRequest{}, err
return []string{}, err
}

js := []JobRequest{}
jobs := []string{}
for _, j := range apiResponse.Jobs {
if in(machineTypes, j.Spec.Agent.Machine.Type) {
js = append(js, JobRequest{JobID: j.Metadata.ID, MachineType: j.Spec.Agent.Machine.Type})
}
jobs = append(jobs, j.ID)
}

return js, nil
return jobs, nil
}

func in(list []string, item string) bool {
Expand Down

0 comments on commit 547824f

Please sign in to comment.