From 3d12a33c7304c3887a1d4f0b3ccdcef413deb4d5 Mon Sep 17 00:00:00 2001 From: catsby Date: Tue, 7 Jun 2022 17:39:54 -0500 Subject: [PATCH 1/6] add simple metrics --- internal/cli/server_run.go | 6 +- internal/telemetry/metrics/stats.go | 113 +++++++++++++++++++++ pkg/server/singleprocess/service_runner.go | 64 ++++++++++++ 3 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 internal/telemetry/metrics/stats.go diff --git a/internal/cli/server_run.go b/internal/cli/server_run.go index ba1dfd8d48d..a80ae1be828 100644 --- a/internal/cli/server_run.go +++ b/internal/cli/server_run.go @@ -115,7 +115,7 @@ func (c *ServerRunCommand) Run(args []string) int { } path := c.config.DBPath log.Info("opening DB", "path", path) - db, err := bolt.Open(path, 0600, nil) + db, err := bolt.Open(path, 0o600, nil) if err != nil { c.ui.Output( "Error opening database: %s", err.Error(), @@ -271,7 +271,8 @@ func (c *ServerRunCommand) Run(args []string) int { } if httpInsecureLn != nil { values = append(values, terminal.NamedValue{ - Name: "HTTP Address (Insecure)", Value: httpInsecureLn.Addr().String()}) + Name: "HTTP Address (Insecure)", Value: httpInsecureLn.Addr().String(), + }) } if auth { values = append(values, terminal.NamedValue{Name: "Auth Required", Value: "yes"}) @@ -374,6 +375,7 @@ This command will bootstrap the server and setup a CLI context. telemetryOptions = append(telemetryOptions, telemetry.WithDatadogExporter( datadog.Options{ TraceAddr: c.flagTelemetryDatadogTraceAddr, + StatsAddr: c.flagTelemetryDatadogTraceAddr, Service: "waypoint", }, )) diff --git a/internal/telemetry/metrics/stats.go b/internal/telemetry/metrics/stats.go new file mode 100644 index 00000000000..5616da3d69c --- /dev/null +++ b/internal/telemetry/metrics/stats.go @@ -0,0 +1,113 @@ +package metrics + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +func init() { + // Register OpenCensus views. + if err := view.Register(statsViews...); err != nil { + log.Printf("========================") + log.Printf("=== error registering view ===") + log.Printf("=== %v ===", err) + log.Printf("========================") + fmt.Fprintf(os.Stderr, "error registering OpenCensus views: %v", err) + } else { + log.Printf("========================") + log.Printf("=== no registering view ===") + log.Printf("========================") + } +} + +var ( + // TagMethod is a tag for capturing the method. + TagOperation = tag.MustNewKey("operation") + + operationDurationMeasure = stats.Float64( + "waypoint_operation", + "The number of seconds duration for this operation", + stats.UnitSeconds, + ) + + operationCountMeasure = stats.Int64( + "waypoint_operation_count", + "count of operations", + stats.UnitDimensionless, + ) + + waypointOperationCounts = &view.View{ + Name: operationCountMeasure.Name(), + Description: operationCountMeasure.Description(), + TagKeys: []tag.Key{TagOperation}, + Measure: operationCountMeasure, + Aggregation: view.Count(), + } + + waypointOperationDurations = &view.View{ + Name: operationDurationMeasure.Name(), + Description: operationDurationMeasure.Description(), + TagKeys: []tag.Key{TagOperation}, + Measure: operationDurationMeasure, + // add a custom distribution bucket of 10 second intervals between + // 0 and 240 seconds, then 10 minute intervals up to 60 minutes + Aggregation: view.Distribution( + (10 * time.Second).Seconds(), + (20 * time.Second).Seconds(), + (30 * time.Second).Seconds(), + (40 * time.Second).Seconds(), + (50 * time.Second).Seconds(), + (60 * time.Second).Seconds(), + (70 * time.Second).Seconds(), + (80 * time.Second).Seconds(), + (90 * time.Second).Seconds(), + (100 * time.Second).Seconds(), + (110 * time.Second).Seconds(), + (120 * time.Second).Seconds(), + (130 * time.Second).Seconds(), + (140 * time.Second).Seconds(), + (150 * time.Second).Seconds(), + (160 * time.Second).Seconds(), + (170 * time.Second).Seconds(), + (180 * time.Second).Seconds(), + (190 * time.Second).Seconds(), + (200 * time.Second).Seconds(), + (210 * time.Second).Seconds(), + (220 * time.Second).Seconds(), + (230 * time.Second).Seconds(), + (240 * time.Second).Seconds(), + (10 * time.Minute).Seconds(), + (20 * time.Minute).Seconds(), + (30 * time.Minute).Seconds(), + (40 * time.Minute).Seconds(), + (50 * time.Minute).Seconds(), + (60 * time.Minute).Seconds(), + ), + } + + // statsViews is a list of all stats views for + // measurements emitted by this package. + statsViews = []*view.View{ + waypointOperationDurations, + waypointOperationCounts, + } +) + +func MeasureOperation(ctx context.Context, lastWriteAt time.Time, operationName string) { + _ = stats.RecordWithTags(ctx, []tag.Mutator{ + tag.Upsert(TagOperation, operationName), + }, operationDurationMeasure.M(time.Since(lastWriteAt).Seconds())) +} + +func CountOperation(ctx context.Context, operationName string) { + _ = stats.RecordWithTags(ctx, []tag.Mutator{ + tag.Upsert(TagOperation, operationName), + }, operationCountMeasure.M(1)) +} diff --git a/pkg/server/singleprocess/service_runner.go b/pkg/server/singleprocess/service_runner.go index 252c66160cb..b4149214615 100644 --- a/pkg/server/singleprocess/service_runner.go +++ b/pkg/server/singleprocess/service_runner.go @@ -2,8 +2,10 @@ package singleprocess import ( "context" + "fmt" "io" "strings" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -14,6 +16,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" empty "google.golang.org/protobuf/types/known/emptypb" + "github.com/hashicorp/waypoint/internal/telemetry/metrics" pb "github.com/hashicorp/waypoint/pkg/server/gen" "github.com/hashicorp/waypoint/pkg/server/logstream" serverptypes "github.com/hashicorp/waypoint/pkg/server/ptypes" @@ -535,7 +538,18 @@ func (s *Service) RunnerJobStream( } log.Trace("loaded config sources for job", "total_sourcers", len(cfgSrcs)) + operation := opString(job.Job) log.Debug("sending job assignment to runner") + log.Debug("===> ") + log.Debug(fmt.Sprintf("===> defer func start (%s) <===", operation)) + log.Debug("===> ") + defer func(start time.Time) { + metrics.MeasureOperation(ctx, start, operation) + log.Debug("===> ") + log.Debug(fmt.Sprintf("===> defer func execute (%s) <===", operation)) + log.Debug("===> ") + }(time.Now()) + metrics.CountOperation(ctx, operation) // Send the job assignment. // // If this has an error, we continue to accumulate the error until @@ -874,3 +888,53 @@ func (s *Service) runnerVerifyToken( return nil } + +func opString(job *pb.Job) string { + switch job.Operation.(type) { + case *pb.Job_Noop_: + return "Noop" + case *pb.Job_Up: + return "up" + case *pb.Job_Build: + return "build" + + case *pb.Job_Push: + return "push" + + case *pb.Job_Deploy: + return "deploy" + case *pb.Job_Destroy: + return "destroy" + + case *pb.Job_Release: + return "release" + + case *pb.Job_Validate: + return "validate" + + case *pb.Job_Auth: + return "auth" + + case *pb.Job_Docs: + return "docs" + + case *pb.Job_ConfigSync: + return "config_sync" + + case *pb.Job_Exec: + return "exec" + + case *pb.Job_Logs: + return "logs" + + case *pb.Job_QueueProject: + return "queue_project" + + case *pb.Job_StatusReport: + return "status" + + case *pb.Job_Init: + return "init" + } + return "none" +} From c06d49bf2636211fefb32262970c39acafe15130 Mon Sep 17 00:00:00 2001 From: catsby Date: Wed, 8 Jun 2022 16:20:46 -0500 Subject: [PATCH 2/6] cleanups --- internal/cli/server_run.go | 3 +- internal/telemetry/metrics/stats.go | 22 +++++------- pkg/server/singleprocess/service_runner.go | 41 ++++++++-------------- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/internal/cli/server_run.go b/internal/cli/server_run.go index a80ae1be828..ff51bd63684 100644 --- a/internal/cli/server_run.go +++ b/internal/cli/server_run.go @@ -115,7 +115,7 @@ func (c *ServerRunCommand) Run(args []string) int { } path := c.config.DBPath log.Info("opening DB", "path", path) - db, err := bolt.Open(path, 0o600, nil) + db, err := bolt.Open(path, 0600, nil) if err != nil { c.ui.Output( "Error opening database: %s", err.Error(), @@ -377,6 +377,7 @@ This command will bootstrap the server and setup a CLI context. TraceAddr: c.flagTelemetryDatadogTraceAddr, StatsAddr: c.flagTelemetryDatadogTraceAddr, Service: "waypoint", + Namespace: "waypoint", }, )) } diff --git a/internal/telemetry/metrics/stats.go b/internal/telemetry/metrics/stats.go index 5616da3d69c..d267e6d8706 100644 --- a/internal/telemetry/metrics/stats.go +++ b/internal/telemetry/metrics/stats.go @@ -3,7 +3,6 @@ package metrics import ( "context" "fmt" - "log" "os" "time" @@ -15,34 +14,28 @@ import ( func init() { // Register OpenCensus views. if err := view.Register(statsViews...); err != nil { - log.Printf("========================") - log.Printf("=== error registering view ===") - log.Printf("=== %v ===", err) - log.Printf("========================") fmt.Fprintf(os.Stderr, "error registering OpenCensus views: %v", err) - } else { - log.Printf("========================") - log.Printf("=== no registering view ===") - log.Printf("========================") } } var ( - // TagMethod is a tag for capturing the method. + // TagOperation is a tag for capturing the operation type, e.g. build, + // deploy, release TagOperation = tag.MustNewKey("operation") operationDurationMeasure = stats.Float64( - "waypoint_operation", - "The number of seconds duration for this operation", + "operation_duration", + "The duration for this operation, measured in seconds", stats.UnitSeconds, ) operationCountMeasure = stats.Int64( - "waypoint_operation_count", + "operation_count", "count of operations", stats.UnitDimensionless, ) + // views aggregate measurements waypointOperationCounts = &view.View{ Name: operationCountMeasure.Name(), Description: operationCountMeasure.Description(), @@ -100,12 +93,15 @@ var ( } ) +// MeasureOperation records the duration of an operation and upserts the +// operation value into the TagOperation tag func MeasureOperation(ctx context.Context, lastWriteAt time.Time, operationName string) { _ = stats.RecordWithTags(ctx, []tag.Mutator{ tag.Upsert(TagOperation, operationName), }, operationDurationMeasure.M(time.Since(lastWriteAt).Seconds())) } +// CountOperation records a single incremental value for an operation func CountOperation(ctx context.Context, operationName string) { _ = stats.RecordWithTags(ctx, []tag.Mutator{ tag.Upsert(TagOperation, operationName), diff --git a/pkg/server/singleprocess/service_runner.go b/pkg/server/singleprocess/service_runner.go index b4149214615..bb0f914dd38 100644 --- a/pkg/server/singleprocess/service_runner.go +++ b/pkg/server/singleprocess/service_runner.go @@ -2,7 +2,6 @@ package singleprocess import ( "context" - "fmt" "io" "strings" "time" @@ -538,16 +537,9 @@ func (s *Service) RunnerJobStream( } log.Trace("loaded config sources for job", "total_sourcers", len(cfgSrcs)) - operation := opString(job.Job) - log.Debug("sending job assignment to runner") - log.Debug("===> ") - log.Debug(fmt.Sprintf("===> defer func start (%s) <===", operation)) - log.Debug("===> ") + operation := operationString(job.Job) defer func(start time.Time) { metrics.MeasureOperation(ctx, start, operation) - log.Debug("===> ") - log.Debug(fmt.Sprintf("===> defer func execute (%s) <===", operation)) - log.Debug("===> ") }(time.Now()) metrics.CountOperation(ctx, operation) // Send the job assignment. @@ -889,52 +881,49 @@ func (s *Service) runnerVerifyToken( return nil } -func opString(job *pb.Job) string { +func operationString(job *pb.Job) string { + // Types that are assignable to Operation: switch job.Operation.(type) { case *pb.Job_Noop_: return "Noop" - case *pb.Job_Up: - return "up" case *pb.Job_Build: return "build" - case *pb.Job_Push: return "push" - case *pb.Job_Deploy: return "deploy" case *pb.Job_Destroy: return "destroy" - case *pb.Job_Release: return "release" - case *pb.Job_Validate: return "validate" - case *pb.Job_Auth: return "auth" - case *pb.Job_Docs: return "docs" - case *pb.Job_ConfigSync: return "config_sync" - case *pb.Job_Exec: return "exec" - + case *pb.Job_Up: + return "up" case *pb.Job_Logs: return "logs" - case *pb.Job_QueueProject: return "queue_project" - + case *pb.Job_Poll: + return "poll" case *pb.Job_StatusReport: - return "status" - + return "status_report" + case *pb.Job_StartTask: + return "start_task" + case *pb.Job_StopTask: + return "stop_task" + case *pb.Job_WatchTask: + return "watch_task" case *pb.Job_Init: return "init" } - return "none" + return "unknown" } From e4dd5dfc91198778b7db1ea0cbecb6646b982981 Mon Sep 17 00:00:00 2001 From: catsby Date: Wed, 8 Jun 2022 17:26:13 -0500 Subject: [PATCH 3/6] changelog --- .changelog/3440.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/3440.txt diff --git a/.changelog/3440.txt b/.changelog/3440.txt new file mode 100644 index 00000000000..fe919838493 --- /dev/null +++ b/.changelog/3440.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: Introduce basic server-side metric collections around operations +``` From 4529402ea2a2af1674e00e38f8e03b7170029fbe Mon Sep 17 00:00:00 2001 From: Clint Date: Thu, 9 Jun 2022 09:18:22 -0500 Subject: [PATCH 4/6] Update pkg/server/singleprocess/service_runner.go Co-authored-by: Evan Phoenix --- pkg/server/singleprocess/service_runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/singleprocess/service_runner.go b/pkg/server/singleprocess/service_runner.go index bb0f914dd38..2e60716443e 100644 --- a/pkg/server/singleprocess/service_runner.go +++ b/pkg/server/singleprocess/service_runner.go @@ -885,7 +885,7 @@ func operationString(job *pb.Job) string { // Types that are assignable to Operation: switch job.Operation.(type) { case *pb.Job_Noop_: - return "Noop" + return "noop" case *pb.Job_Build: return "build" case *pb.Job_Push: From 8d73a87d4177efbdc85334a69ed5fac069442d8a Mon Sep 17 00:00:00 2001 From: catsby Date: Thu, 9 Jun 2022 16:25:09 -0500 Subject: [PATCH 5/6] update after reviewing naming and feedback --- internal/telemetry/metrics/stats.go | 8 ++++---- pkg/server/singleprocess/service_runner.go | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/telemetry/metrics/stats.go b/internal/telemetry/metrics/stats.go index d267e6d8706..78eb55faf72 100644 --- a/internal/telemetry/metrics/stats.go +++ b/internal/telemetry/metrics/stats.go @@ -24,13 +24,13 @@ var ( TagOperation = tag.MustNewKey("operation") operationDurationMeasure = stats.Float64( - "operation_duration", - "The duration for this operation, measured in seconds", - stats.UnitSeconds, + "operation-duration.milliseconds", + "The duration for this operation, measured in milliseconds", + stats.UnitMilliseconds, ) operationCountMeasure = stats.Int64( - "operation_count", + "operation-count", "count of operations", stats.UnitDimensionless, ) diff --git a/pkg/server/singleprocess/service_runner.go b/pkg/server/singleprocess/service_runner.go index 2e60716443e..58d4cc5d679 100644 --- a/pkg/server/singleprocess/service_runner.go +++ b/pkg/server/singleprocess/service_runner.go @@ -537,6 +537,8 @@ func (s *Service) RunnerJobStream( } log.Trace("loaded config sources for job", "total_sourcers", len(cfgSrcs)) + log.Debug("sending job assignment to runner") + operation := operationString(job.Job) defer func(start time.Time) { metrics.MeasureOperation(ctx, start, operation) From 3b7452b4ca284e30762061cfbe8ea203b54d5e53 Mon Sep 17 00:00:00 2001 From: catsby Date: Thu, 9 Jun 2022 19:06:19 -0500 Subject: [PATCH 6/6] move operation stats to milliseconds, update name to denote unit --- internal/telemetry/metrics/stats.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/telemetry/metrics/stats.go b/internal/telemetry/metrics/stats.go index 78eb55faf72..2e22776099f 100644 --- a/internal/telemetry/metrics/stats.go +++ b/internal/telemetry/metrics/stats.go @@ -23,14 +23,14 @@ var ( // deploy, release TagOperation = tag.MustNewKey("operation") - operationDurationMeasure = stats.Float64( - "operation-duration.milliseconds", + operationDurationMeasure = stats.Int64( + "operation_duration.milliseconds", "The duration for this operation, measured in milliseconds", stats.UnitMilliseconds, ) operationCountMeasure = stats.Int64( - "operation-count", + "operation_count", "count of operations", stats.UnitDimensionless, ) @@ -98,7 +98,7 @@ var ( func MeasureOperation(ctx context.Context, lastWriteAt time.Time, operationName string) { _ = stats.RecordWithTags(ctx, []tag.Mutator{ tag.Upsert(TagOperation, operationName), - }, operationDurationMeasure.M(time.Since(lastWriteAt).Seconds())) + }, operationDurationMeasure.M(time.Since(lastWriteAt).Milliseconds())) } // CountOperation records a single incremental value for an operation