diff --git a/cloudwatch.go b/cloudwatch.go new file mode 100644 index 00000000..6adbd3c7 --- /dev/null +++ b/cloudwatch.go @@ -0,0 +1,70 @@ +package main + +import ( + "log" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" +) + +func cloudwatchSend(r *result) error { + svc := cloudwatch.New(session.New()) + + metrics := []*cloudwatch.MetricDatum{} + metrics = append(metrics, cloudwatchMetrics(r.totals, nil)...) + + for name, c := range r.queues { + metrics = append(metrics, cloudwatchMetrics(c, []*cloudwatch.Dimension{ + {Name: aws.String("Queue"), Value: aws.String(name)}, + })...) + } + + for name, c := range r.pipelines { + metrics = append(metrics, cloudwatchMetrics(c, []*cloudwatch.Dimension{ + {Name: aws.String("Pipeline"), Value: aws.String(name)}, + })...) + } + + log.Printf("Extracted %d cloudwatch metrics from results", len(metrics)) + + for _, chunk := range chunkCloudwatchMetrics(10, metrics) { + log.Printf("Submitting chunk of %d metrics to Cloudwatch", len(chunk)) + _, err := svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + MetricData: chunk, + Namespace: aws.String("Buildkite"), + }) + if err != nil { + return err + } + } + + return nil +} + +func cloudwatchMetrics(c counts, dimensions []*cloudwatch.Dimension) []*cloudwatch.MetricDatum { + m := []*cloudwatch.MetricDatum{} + + for k, v := range c { + m = append(m, &cloudwatch.MetricDatum{ + MetricName: aws.String(k), + Dimensions: dimensions, + Value: aws.Float64(float64(v)), + Unit: aws.String("Count"), + }) + } + + return m +} + +func chunkCloudwatchMetrics(size int, data []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum { + var chunks = [][]*cloudwatch.MetricDatum{} + for i := 0; i < len(data); i += size { + end := i + size + if end > len(data) { + end = len(data) + } + chunks = append(chunks, data[i:end]) + } + return chunks +} diff --git a/main.go b/main.go index 26d7c0d5..09d05fab 100644 --- a/main.go +++ b/main.go @@ -6,12 +6,8 @@ import ( "log" "os" "regexp" - "strings" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/cloudwatch" "gopkg.in/buildkite/go-buildkite.v2/buildkite" ) @@ -29,8 +25,12 @@ func main() { accessToken = flag.String("token", "", "A Buildkite API Access Token") orgSlug = flag.String("org", "", "A Buildkite Organization Slug") interval = flag.Duration("interval", 0, "Update metrics every interval, rather than once") + history = flag.Duration("history", time.Hour*24, "Historical data to use for finished builds") debug = flag.Bool("debug", false, "Show API debugging output") version = flag.Bool("version", false, "Show the version") + + // filters + queue = flag.String("queue", "", "Only include a specific queue") ) flag.Parse() @@ -56,65 +56,102 @@ func main() { client := buildkite.NewClient(config.Client()) buildkite.SetHttpDebug(*debug) - if err := runCollector(client, *orgSlug, time.Hour*24); err != nil { + f := func() error { + t := time.Now() + + res, err := collectResults(client, collectOpts{ + OrgSlug: *orgSlug, + Historical: *history, + Queue: *queue, + }) + if err != nil { + return err + } + + dumpResults(res) + + err = cloudwatchSend(res) + if err != nil { + return err + } + + log.Printf("Finished in %s", time.Now().Sub(t)) + return nil + } + + if err := f(); err != nil { log.Fatal(err) } if *interval > 0 { for _ = range time.NewTicker(*interval).C { - if err := runCollector(client, *orgSlug, time.Hour); err != nil { + if err := f(); err != nil { log.Println(err) } } } } -func runCollector(client *buildkite.Client, orgSlug string, historical time.Duration) error { - svc := cloudwatch.New(session.New()) +type collectOpts struct { + OrgSlug string + Historical time.Duration + Queue string +} +func collectResults(client *buildkite.Client, opts collectOpts) (*result, error) { res := &result{ totals: newCounts(), queues: map[string]counts{}, pipelines: map[string]counts{}, } - log.Printf("Collecting buildkite metrics from org %s", orgSlug) - if err := res.addBuildAndJobMetrics(client, orgSlug, historical); err != nil { - return err + if opts.Queue == "" { + log.Println("Collecting historical metrics") + if err := res.addHistoricalMetrics(client, opts); err != nil { + return nil, err + } } - if err := res.addAgentMetrics(client, orgSlug, historical); err != nil { - return err + log.Println("Collecting running and scheduled build and job metrics") + if err := res.addBuildAndJobMetrics(client, opts); err != nil { + return nil, err } - metrics := res.toMetrics() - log.Printf("Extracted %d cloudwatch metrics from results", len(metrics)) + log.Println("Collecting agent metrics") + if err := res.addAgentMetrics(client, opts); err != nil { + return nil, err + } - for _, metric := range metrics { - ds := []string{} - for _, d := range metric.Dimensions { - ds = append(ds, *d.Name+"="+*d.Value) + if opts.Queue != "" { + if c, ok := res.queues[opts.Queue]; ok { + return &result{ + queues: map[string]counts{ + opts.Queue: c, + }, + }, nil } + return &result{}, nil + } - path := []string{"Buildkite"} - if len(ds) > 0 { - path = append(path, strings.Join(ds, ",")) - } + return res, nil +} - log.Printf("%s > %s = %.0f", - strings.Join(path, " > "), *metric.MetricName, *metric.Value) +func dumpResults(res *result) { + for name, c := range res.totals { + log.Printf("Buildkite > %s = %d", name, c) } - for _, chunk := range chunkMetricData(10, metrics) { - log.Printf("Submitting chunk of %d metrics to Cloudwatch", len(chunk)) - if err := putMetricData(svc, chunk); err != nil { - if err != nil { - return err - } + for name, c := range res.queues { + for k, v := range c { + log.Printf("Buildkite > [queue = %s] > %s = %d", name, k, v) } } - return nil + for name, c := range res.pipelines { + for k, v := range c { + log.Printf("Buildkite > [pipeline = %s] > %s = %d", name, k, v) + } + } } const ( @@ -138,21 +175,6 @@ func newCounts() counts { } } -func (c counts) toMetrics(dimensions []*cloudwatch.Dimension) []*cloudwatch.MetricDatum { - m := []*cloudwatch.MetricDatum{} - - for k, v := range c { - m = append(m, &cloudwatch.MetricDatum{ - MetricName: aws.String(k), - Dimensions: dimensions, - Value: aws.Float64(float64(v)), - Unit: aws.String("Count"), - }) - } - - return m -} - func queue(j *buildkite.Job) string { for _, m := range j.AgentQueryRules { if match := queuePattern.FindStringSubmatch(m); match != nil { @@ -183,36 +205,12 @@ type result struct { queues, pipelines map[string]counts } -func (r *result) toMetrics() []*cloudwatch.MetricDatum { - data := []*cloudwatch.MetricDatum{} - data = append(data, r.totals.toMetrics(nil)...) - - for name, c := range r.queues { - data = append(data, c.toMetrics([]*cloudwatch.Dimension{ - {Name: aws.String("Queue"), Value: aws.String(name)}, - })...) - } - - for name, c := range r.pipelines { - data = append(data, c.toMetrics([]*cloudwatch.Dimension{ - {Name: aws.String("Pipeline"), Value: aws.String(name)}, - })...) - } - - return data -} - -func (r *result) addBuildAndJobMetrics(client *buildkite.Client, orgSlug string, historical time.Duration) error { - // Algorithm: - // Get Builds with finished_from = 24 hours ago - // Build results with zero values for pipelines/queues - // Get all running and scheduled builds, add to results - - finishedBuilds := listBuildsByOrg(client.Builds, orgSlug, buildkite.BuildsListOptions{ - FinishedFrom: time.Now().UTC().Add(historical * -1), +func (r *result) addHistoricalMetrics(client *buildkite.Client, opts collectOpts) error { + finishedBuilds := listBuildsByOrg(client.Builds, opts.OrgSlug, buildkite.BuildsListOptions{ + FinishedFrom: time.Now().UTC().Add(opts.Historical * -1), }) - err := finishedBuilds.Pages(func(v interface{}, lastPage bool) bool { + return finishedBuilds.Pages(func(v interface{}, lastPage bool) bool { for _, queue := range uniqueQueues(v.([]buildkite.Build)) { if _, ok := r.queues[queue]; !ok { r.queues[queue] = newCounts() @@ -223,18 +221,17 @@ func (r *result) addBuildAndJobMetrics(client *buildkite.Client, orgSlug string, } return true }) - if err != nil { - return err - } +} - currentBuilds := listBuildsByOrg(client.Builds, orgSlug, buildkite.BuildsListOptions{ +func (r *result) addBuildAndJobMetrics(client *buildkite.Client, opts collectOpts) error { + currentBuilds := listBuildsByOrg(client.Builds, opts.OrgSlug, buildkite.BuildsListOptions{ State: []string{"scheduled", "running"}, }) return currentBuilds.Pages(func(v interface{}, lastPage bool) bool { for _, build := range v.([]buildkite.Build) { - log.Printf("Adding build to stats (id=%q, pipeline=%q, branch=%q, state=%q)", - *build.ID, *build.Pipeline.Name, *build.Branch, *build.State) + // log.Printf("Adding build to stats (id=%q, pipeline=%q, branch=%q, state=%q)", + // *build.ID, *build.Pipeline.Name, *build.Branch, *build.State) if _, ok := r.pipelines[*build.Pipeline.Name]; !ok { r.pipelines[*build.Pipeline.Name] = newCounts() @@ -262,8 +259,8 @@ func (r *result) addBuildAndJobMetrics(client *buildkite.Client, orgSlug string, state = *job.State } - log.Printf("Adding job to stats (id=%q, pipeline=%q, queue=%q, type=%q, state=%q)", - *job.ID, *build.Pipeline.Name, queue(job), *job.Type, state) + // log.Printf("Adding job to stats (id=%q, pipeline=%q, queue=%q, type=%q, state=%q)", + // *job.ID, *build.Pipeline.Name, queue(job), *job.Type, state) if _, ok := r.queues[queue(job)]; !ok { r.queues[queue(job)] = newCounts() @@ -299,10 +296,10 @@ func (r *result) addBuildAndJobMetrics(client *buildkite.Client, orgSlug string, }) } -func (r *result) addAgentMetrics(client *buildkite.Client, orgSlug string, historical time.Duration) error { +func (r *result) addAgentMetrics(client *buildkite.Client, opts collectOpts) error { p := &pager{ lister: func(page int) (interface{}, int, error) { - agents, resp, err := client.Agents.List(orgSlug, &buildkite.AgentListOptions{ + agents, resp, err := client.Agents.List(opts.OrgSlug, &buildkite.AgentListOptions{ ListOptions: buildkite.ListOptions{ Page: page, }, @@ -345,8 +342,8 @@ func (r *result) addAgentMetrics(client *buildkite.Client, orgSlug string, histo r.queues[queue][totalAgentCount] = 0 } - log.Printf("Adding agent to stats (name=%q, queue=%q, job=%#v)", - *agent.Name, queue, agent.Job != nil) + // log.Printf("Adding agent to stats (name=%q, queue=%q, job=%#v)", + // *agent.Name, queue, agent.Job != nil) if agent.Job != nil { r.totals[busyAgentCount]++ @@ -400,27 +397,3 @@ func listBuildsByOrg(builds *buildkite.BuildsService, orgSlug string, opts build }, } } - -func chunkMetricData(size int, data []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum { - var chunks = [][]*cloudwatch.MetricDatum{} - for i := 0; i < len(data); i += size { - end := i + size - if end > len(data) { - end = len(data) - } - chunks = append(chunks, data[i:end]) - } - return chunks -} - -func putMetricData(svc *cloudwatch.CloudWatch, data []*cloudwatch.MetricDatum) error { - _, err := svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - MetricData: data, - Namespace: aws.String("Buildkite"), - }) - if err != nil { - return err - } - - return nil -}