diff --git a/cmd/agent.go b/cmd/agent.go index 45bebf370ad..2c93f9174ed 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -2,8 +2,11 @@ package cmd import ( "bytes" + "context" "encoding/json" + "time" + "github.com/sirupsen/logrus" "github.com/spf13/afero" "github.com/spf13/cobra" "go.k6.io/k6/cmd/state" @@ -13,11 +16,76 @@ import ( "go.k6.io/k6/lib" "go.k6.io/k6/loader" "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gopkg.in/guregu/null.v3" ) +// TODO: something cleaner +func getMetricsHook( + ctx context.Context, instanceID uint32, + client distributed.DistributedTestClient, logger logrus.FieldLogger, +) func(*engine.MetricsEngine) func() { + logger = logger.WithField("component", "metric-engine-hook") + return func(me *engine.MetricsEngine) func() { + stop := make(chan struct{}) + done := make(chan struct{}) + + dumpMetrics := func() { + logger.Debug("Starting metric dump...") + me.MetricsLock.Lock() + defer me.MetricsLock.Unlock() + + metrics := make([]*distributed.MetricDump, 0, len(me.ObservedMetrics)) + for _, om := range me.ObservedMetrics { + data, err := om.Sink.Drain() + if err != nil { + logger.Errorf("There was a problem draining the sink for metric %s: %s", om.Name, err) + } + metrics = append(metrics, &distributed.MetricDump{ + Name: om.Name, + Data: data, + }) + } + + data := &distributed.MetricsDump{ + InstanceID: instanceID, + Metrics: metrics, + } + _, err := client.SendMetrics(ctx, data) + if err != nil { + logger.Errorf("There was a problem dumping metrics: %s", err) + } + } + + go func() { + defer close(done) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dumpMetrics() + case <-stop: + dumpMetrics() + return + } + } + }() + + finalize := func() { + logger.Debug("Final metric dump...") + close(stop) + <-done + logger.Debug("Done!") + } + + return finalize + } +} + // TODO: a whole lot of cleanup, refactoring, error handling and hardening func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen c := &cmdsRunAndAgent{gs: gs} @@ -42,6 +110,8 @@ func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen return nil, nil, err } + c.metricsEngineHook = getMetricsHook(gs.Ctx, resp.InstanceID, client, gs.Logger) + controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger) if err != nil { return nil, nil, err diff --git a/cmd/coordinator.go b/cmd/coordinator.go index 7c561131aaf..1037951dea1 100644 --- a/cmd/coordinator.go +++ b/cmd/coordinator.go @@ -1,12 +1,19 @@ package cmd import ( + "fmt" "net" + "strings" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.k6.io/k6/cmd/state" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/execution/distributed" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics/engine" "google.golang.org/grpc" ) @@ -17,19 +24,80 @@ type cmdCoordinator struct { instanceCount int } -func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { +// TODO: split apart +func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { //nolint: funlen + ctx, runAbort := execution.NewTestRunContext(c.gs.Ctx, c.gs.Logger) + test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig) if err != nil { return err } + // Only consolidated options, not derived + testRunState, err := test.buildTestRunState(test.consolidatedConfig.Options) + if err != nil { + return err + } + + metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, c.gs.Logger) + if err != nil { + return err + } + coordinator, err := distributed.NewCoordinatorServer( - c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger, + c.instanceCount, test.initRunner.MakeArchive(), metricsEngine, c.gs.Logger, ) if err != nil { return err } + if !testRunState.RuntimeOptions.NoSummary.Bool { + defer func() { + c.gs.Logger.Debug("Generating the end-of-test summary...") + summaryResult, serr := test.initRunner.HandleSummary(ctx, &lib.Summary{ + Metrics: metricsEngine.ObservedMetrics, + RootGroup: test.initRunner.GetDefaultGroup(), + TestRunDuration: coordinator.GetCurrentTestRunDuration(), + NoColor: c.gs.Flags.NoColor, + UIState: lib.UIState{ + IsStdOutTTY: c.gs.Stdout.IsTTY, + IsStdErrTTY: c.gs.Stderr.IsTTY, + }, + }) + if serr == nil { + serr = handleSummaryResult(c.gs.FS, c.gs.Stdout, c.gs.Stderr, summaryResult) + } + if serr != nil { + c.gs.Logger.WithError(serr).Error("Failed to handle the end-of-test summary") + } + }() + } + + if !testRunState.RuntimeOptions.NoThresholds.Bool { + getCurrentTestDuration := coordinator.GetCurrentTestRunDuration + finalizeThresholds := metricsEngine.StartThresholdCalculations(nil, runAbort, getCurrentTestDuration) + + defer func() { + // This gets called after all of the outputs have stopped, so we are + // sure there won't be any more metrics being sent. + c.gs.Logger.Debug("Finalizing thresholds...") + breachedThresholds := finalizeThresholds() + if len(breachedThresholds) > 0 { + tErr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone( + fmt.Errorf("thresholds on metrics '%s' have been breached", strings.Join(breachedThresholds, ", ")), + exitcodes.ThresholdsHaveFailed, + ), errext.AbortedByThresholdsAfterTestEnd) + + if err == nil { + err = tErr + } else { + c.gs.Logger.WithError(tErr).Debug("Breached thresholds, but test already exited with another error") + } + } + }() + } + c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress) listener, err := net.Listen("tcp", c.gRPCAddress) if err != nil { diff --git a/cmd/run.go b/cmd/run.go index ed1ba22edbb..e67487aff19 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -43,6 +43,7 @@ type cmdsRunAndAgent struct { // TODO: figure out something more elegant? loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) + metricsEngineHook func(*engine.MetricsEngine) func() testEndHook func(err error) } @@ -179,9 +180,9 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { } // We'll need to pipe metrics to the MetricsEngine and process them if any - // of these are enabled: thresholds, end-of-test summary + // of these are enabled: thresholds, end-of-test summary, engine hook shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool || - !testRunState.RuntimeOptions.NoThresholds.Bool) + !testRunState.RuntimeOptions.NoThresholds.Bool || c.metricsEngineHook != nil) var metricsIngester *engine.OutputIngester if shouldProcessMetrics { err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool) @@ -244,6 +245,11 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { stopOutputs(err) }() + if c.metricsEngineHook != nil { + hookFinalize := c.metricsEngineHook(metricsEngine) + defer hookFinalize() + } + if !testRunState.RuntimeOptions.NoThresholds.Bool { finalizeThresholds := metricsEngine.StartThresholdCalculations( metricsIngester, runAbort, executionState.GetCurrentTestRunDuration, diff --git a/execution/distributed/coordinator.go b/execution/distributed/coordinator.go index 42a22a067af..9e40b3a8005 100644 --- a/execution/distributed/coordinator.go +++ b/execution/distributed/coordinator.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "go.k6.io/k6/lib" + "go.k6.io/k6/metrics/engine" ) // CoordinatorServer coordinates multiple k6 agents. @@ -21,6 +22,7 @@ type CoordinatorServer struct { instanceCount int test *lib.Archive logger logrus.FieldLogger + metricsEngine *engine.MetricsEngine testStartTimeLock sync.Mutex testStartTime *time.Time @@ -34,7 +36,7 @@ type CoordinatorServer struct { // NewCoordinatorServer initializes and returns a new CoordinatorServer. func NewCoordinatorServer( - instanceCount int, test *lib.Archive, logger logrus.FieldLogger, + instanceCount int, test *lib.Archive, metricsEngine *engine.MetricsEngine, logger logrus.FieldLogger, ) (*CoordinatorServer, error) { segments, err := test.Options.ExecutionSegment.Split(int64(instanceCount)) if err != nil { @@ -58,6 +60,7 @@ func NewCoordinatorServer( cs := &CoordinatorServer{ instanceCount: instanceCount, test: test, + metricsEngine: metricsEngine, logger: logger, ess: ess, cc: newCoordinatorController(instanceCount, logger), @@ -144,6 +147,18 @@ func (cs *CoordinatorServer) CommandAndControl(stream DistributedTest_CommandAnd return cs.cc.handleInstanceStream(initInstMsg.InitInstanceID, stream) } +// SendMetrics accepts and imports the given metrics in the coordinator's MetricsEngine. +func (cs *CoordinatorServer) SendMetrics(_ context.Context, dumpMsg *MetricsDump) (*MetricsDumpResponse, error) { + // TODO: something nicer? + for _, md := range dumpMsg.Metrics { + if err := cs.metricsEngine.ImportMetric(md.Name, md.Data); err != nil { + cs.logger.Errorf("Error merging sink for metric %s: %w", md.Name, err) + // return nil, err + } + } + return &MetricsDumpResponse{}, nil +} + // Wait blocks until all instances have disconnected. func (cs *CoordinatorServer) Wait() { cs.wg.Wait() diff --git a/execution/distributed/distributed.pb.go b/execution/distributed/distributed.pb.go index 92221687a88..d87689487b1 100644 --- a/execution/distributed/distributed.pb.go +++ b/execution/distributed/distributed.pb.go @@ -410,6 +410,154 @@ func (x *DataPacket) GetError() string { return "" } +type MetricsDump struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InstanceID uint32 `protobuf:"varint,1,opt,name=instanceID,proto3" json:"instanceID,omitempty"` + Metrics []*MetricDump `protobuf:"bytes,2,rep,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *MetricsDump) Reset() { + *x = MetricsDump{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsDump) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsDump) ProtoMessage() {} + +func (x *MetricsDump) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsDump.ProtoReflect.Descriptor instead. +func (*MetricsDump) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{5} +} + +func (x *MetricsDump) GetInstanceID() uint32 { + if x != nil { + return x.InstanceID + } + return 0 +} + +func (x *MetricsDump) GetMetrics() []*MetricDump { + if x != nil { + return x.Metrics + } + return nil +} + +type MetricDump struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *MetricDump) Reset() { + *x = MetricDump{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricDump) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricDump) ProtoMessage() {} + +func (x *MetricDump) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricDump.ProtoReflect.Descriptor instead. +func (*MetricDump) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{6} +} + +func (x *MetricDump) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *MetricDump) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type MetricsDumpResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *MetricsDumpResponse) Reset() { + *x = MetricsDumpResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsDumpResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsDumpResponse) ProtoMessage() {} + +func (x *MetricsDumpResponse) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsDumpResponse.ProtoReflect.Descriptor instead. +func (*MetricsDumpResponse) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{7} +} + var File_distributed_proto protoreflect.FileDescriptor var file_distributed_proto_rawDesc = []byte{ @@ -456,21 +604,37 @@ var file_distributed_proto_rawDesc = []byte{ 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0xb2, 0x01, 0x0a, 0x0f, 0x44, - 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x54, 0x65, 0x73, 0x74, 0x12, 0x49, - 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, - 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, - 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x11, 0x43, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x19, - 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x41, 0x67, 0x65, - 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x74, - 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, - 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, - 0x23, 0x5a, 0x21, 0x67, 0x6f, 0x2e, 0x6b, 0x36, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x36, 0x2f, 0x65, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, - 0x75, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x60, 0x0a, 0x0b, 0x4d, 0x65, + 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, + 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, + 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, + 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, + 0x75, 0x6d, 0x70, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x34, 0x0a, 0x0a, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x75, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, + 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x22, 0x15, 0x0a, 0x13, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, + 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xff, 0x01, 0x0a, 0x0f, 0x44, 0x69, + 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x54, 0x65, 0x73, 0x74, 0x12, 0x49, 0x0a, + 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x74, + 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, + 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x41, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x19, 0x2e, + 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x41, 0x67, 0x65, 0x6e, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, + 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x18, 0x2e, + 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x1a, 0x20, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, + 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, + 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, + 0x6f, 0x2e, 0x6b, 0x36, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x36, 0x2f, 0x65, 0x78, 0x65, 0x63, 0x75, + 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -485,26 +649,32 @@ func file_distributed_proto_rawDescGZIP() []byte { return file_distributed_proto_rawDescData } -var file_distributed_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_distributed_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_distributed_proto_goTypes = []interface{}{ - (*RegisterRequest)(nil), // 0: distributed.RegisterRequest - (*RegisterResponse)(nil), // 1: distributed.RegisterResponse - (*AgentMessage)(nil), // 2: distributed.AgentMessage - (*ControllerMessage)(nil), // 3: distributed.ControllerMessage - (*DataPacket)(nil), // 4: distributed.DataPacket + (*RegisterRequest)(nil), // 0: distributed.RegisterRequest + (*RegisterResponse)(nil), // 1: distributed.RegisterResponse + (*AgentMessage)(nil), // 2: distributed.AgentMessage + (*ControllerMessage)(nil), // 3: distributed.ControllerMessage + (*DataPacket)(nil), // 4: distributed.DataPacket + (*MetricsDump)(nil), // 5: distributed.MetricsDump + (*MetricDump)(nil), // 6: distributed.MetricDump + (*MetricsDumpResponse)(nil), // 7: distributed.MetricsDumpResponse } var file_distributed_proto_depIdxs = []int32{ 4, // 0: distributed.AgentMessage.createdData:type_name -> distributed.DataPacket 4, // 1: distributed.ControllerMessage.dataWithID:type_name -> distributed.DataPacket - 0, // 2: distributed.DistributedTest.Register:input_type -> distributed.RegisterRequest - 2, // 3: distributed.DistributedTest.CommandAndControl:input_type -> distributed.AgentMessage - 1, // 4: distributed.DistributedTest.Register:output_type -> distributed.RegisterResponse - 3, // 5: distributed.DistributedTest.CommandAndControl:output_type -> distributed.ControllerMessage - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 6, // 2: distributed.MetricsDump.metrics:type_name -> distributed.MetricDump + 0, // 3: distributed.DistributedTest.Register:input_type -> distributed.RegisterRequest + 2, // 4: distributed.DistributedTest.CommandAndControl:input_type -> distributed.AgentMessage + 5, // 5: distributed.DistributedTest.SendMetrics:input_type -> distributed.MetricsDump + 1, // 6: distributed.DistributedTest.Register:output_type -> distributed.RegisterResponse + 3, // 7: distributed.DistributedTest.CommandAndControl:output_type -> distributed.ControllerMessage + 7, // 8: distributed.DistributedTest.SendMetrics:output_type -> distributed.MetricsDumpResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_distributed_proto_init() } @@ -573,6 +743,42 @@ func file_distributed_proto_init() { return nil } } + file_distributed_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsDump); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricDump); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsDumpResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_distributed_proto_msgTypes[2].OneofWrappers = []interface{}{ (*AgentMessage_Error)(nil), @@ -592,7 +798,7 @@ func file_distributed_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_distributed_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/execution/distributed/distributed.proto b/execution/distributed/distributed.proto index 8d9da559b37..1cfdc913c50 100644 --- a/execution/distributed/distributed.proto +++ b/execution/distributed/distributed.proto @@ -9,6 +9,8 @@ service DistributedTest { rpc CommandAndControl(stream AgentMessage) returns (stream ControllerMessage) {}; + + rpc SendMetrics(MetricsDump) returns (MetricsDumpResponse) {}; } message RegisterRequest {} @@ -42,4 +44,16 @@ message DataPacket { string id = 1; bytes data = 2; string error = 3; -} \ No newline at end of file +} + +message MetricsDump { + uint32 instanceID = 1; + repeated MetricDump metrics = 2; +} + +message MetricDump { + string name = 1; + bytes data = 2; +} + +message MetricsDumpResponse {}; diff --git a/execution/distributed/distributed_grpc.pb.go b/execution/distributed/distributed_grpc.pb.go index 41f6c9afc21..5acb1e7c4b8 100644 --- a/execution/distributed/distributed_grpc.pb.go +++ b/execution/distributed/distributed_grpc.pb.go @@ -24,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 type DistributedTestClient interface { Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) CommandAndControl(ctx context.Context, opts ...grpc.CallOption) (DistributedTest_CommandAndControlClient, error) + SendMetrics(ctx context.Context, in *MetricsDump, opts ...grpc.CallOption) (*MetricsDumpResponse, error) } type distributedTestClient struct { @@ -74,12 +75,22 @@ func (x *distributedTestCommandAndControlClient) Recv() (*ControllerMessage, err return m, nil } +func (c *distributedTestClient) SendMetrics(ctx context.Context, in *MetricsDump, opts ...grpc.CallOption) (*MetricsDumpResponse, error) { + out := new(MetricsDumpResponse) + err := c.cc.Invoke(ctx, "/distributed.DistributedTest/SendMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DistributedTestServer is the server API for DistributedTest service. // All implementations must embed UnimplementedDistributedTestServer // for forward compatibility type DistributedTestServer interface { Register(context.Context, *RegisterRequest) (*RegisterResponse, error) CommandAndControl(DistributedTest_CommandAndControlServer) error + SendMetrics(context.Context, *MetricsDump) (*MetricsDumpResponse, error) mustEmbedUnimplementedDistributedTestServer() } @@ -93,6 +104,9 @@ func (UnimplementedDistributedTestServer) Register(context.Context, *RegisterReq func (UnimplementedDistributedTestServer) CommandAndControl(DistributedTest_CommandAndControlServer) error { return status.Errorf(codes.Unimplemented, "method CommandAndControl not implemented") } +func (UnimplementedDistributedTestServer) SendMetrics(context.Context, *MetricsDump) (*MetricsDumpResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SendMetrics not implemented") +} func (UnimplementedDistributedTestServer) mustEmbedUnimplementedDistributedTestServer() {} // UnsafeDistributedTestServer may be embedded to opt out of forward compatibility for this service. @@ -150,6 +164,24 @@ func (x *distributedTestCommandAndControlServer) Recv() (*AgentMessage, error) { return m, nil } +func _DistributedTest_SendMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MetricsDump) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributedTestServer).SendMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/distributed.DistributedTest/SendMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributedTestServer).SendMetrics(ctx, req.(*MetricsDump)) + } + return interceptor(ctx, in, info, handler) +} + // DistributedTest_ServiceDesc is the grpc.ServiceDesc for DistributedTest service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -161,6 +193,10 @@ var DistributedTest_ServiceDesc = grpc.ServiceDesc{ MethodName: "Register", Handler: _DistributedTest_Register_Handler, }, + { + MethodName: "SendMetrics", + Handler: _DistributedTest_SendMetrics_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 55f0b87fdb7..dd60edb7dc9 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -61,6 +61,39 @@ func (me *MetricsEngine) CreateIngester() *OutputIngester { } } +// ImportMetric parses metric and the data for the given metric and merges it in +// the MetricsEngine's Sink for it. +// +// TODO: something better? deduplicate code with getThresholdMetricOrSubmetric +func (me *MetricsEngine) ImportMetric(name string, data []byte) error { + me.MetricsLock.Lock() + defer me.MetricsLock.Unlock() + + // TODO: replace with strings.Cut after Go 1.18 + nameParts := strings.SplitN(name, "{", 2) + + metric := me.registry.Get(nameParts[0]) + if metric == nil { + return fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) + } + if len(nameParts) == 1 { // no sub-metric + me.markObserved(metric) + return metric.Sink.Merge(data) + } + + if nameParts[1][len(nameParts[1])-1] != '}' { + return fmt.Errorf("missing ending bracket, sub-metric format needs to be 'metric{key:value}'") + } + + sm, err := metric.AddSubmetric(nameParts[1][:len(nameParts[1])-1]) + if err != nil { + return err + } + + me.markObserved(sm.Metric) + return sm.Metric.Sink.Merge(data) +} + func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) { // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) diff --git a/metrics/sink.go b/metrics/sink.go index f74804d92cd..b62baaec5e4 100644 --- a/metrics/sink.go +++ b/metrics/sink.go @@ -1,6 +1,7 @@ package metrics import ( + "encoding/json" "fmt" "math" "sort" @@ -18,6 +19,9 @@ type Sink interface { Add(s Sample) // Add a sample to the sink. Format(t time.Duration) map[string]float64 // Data for thresholds. IsEmpty() bool // Check if the Sink is empty. + + Drain() ([]byte, error) // Drain encodes the current sink values and clears them. + Merge([]byte) error // Merge decoeds the given values and merges them with the values in the current sink. } // NewSink creates the related Sink for @@ -64,7 +68,33 @@ func (c *CounterSink) Format(t time.Duration) map[string]float64 { } } +// Drain encodes the current sink values and clears them. +// TODO: something more robust and efficient +func (c *CounterSink) Drain() ([]byte, error) { + res := []byte(fmt.Sprintf("%d %b", c.First.UnixMilli(), c.Value)) + c.Value = 0 + return res, nil +} + +// Merge decoeds the given values and merges them with the values in the current sink. +func (c *CounterSink) Merge(from []byte) error { + var firstMs int64 + var val float64 + _, err := fmt.Sscanf(string(from), "%d %b", &firstMs, &val) + if err != nil { + return err + } + + c.Value += val + if first := time.UnixMilli(firstMs); c.First.After(first) { + c.First = first + } + + return nil +} + type GaugeSink struct { + Last time.Time Value float64 Max, Min float64 minSet bool @@ -74,6 +104,7 @@ type GaugeSink struct { func (g *GaugeSink) IsEmpty() bool { return !g.minSet } func (g *GaugeSink) Add(s Sample) { + g.Last = s.Time g.Value = s.Value if s.Value > g.Max { g.Max = s.Value @@ -88,6 +119,44 @@ func (g *GaugeSink) Format(t time.Duration) map[string]float64 { return map[string]float64{"value": g.Value} } +// Drain encodes the current sink values and clears them. +// +// TODO: something more robust and efficient +func (g *GaugeSink) Drain() ([]byte, error) { + res := []byte(fmt.Sprintf("%d %b %b %b", g.Last.UnixMilli(), g.Value, g.Min, g.Max)) + + g.Last = time.Time{} + g.Value = 0 + + return res, nil +} + +// Merge decoeds the given values and merges them with the values in the current sink. +func (g *GaugeSink) Merge(from []byte) error { + var lastMms int64 + var val, min, max float64 + _, err := fmt.Sscanf(string(from), "%d %b %b %b", &lastMms, &val, &min, &max) + if err != nil { + return err + } + + last := time.UnixMilli(lastMms) + if last.After(g.Last) { + g.Last = last + g.Value = val + } + + if max > g.Max { + g.Max = max + } + if min < g.Min || !g.minSet { + g.Min = min + g.minSet = true + } + + return nil +} + // NewTrendSink makes a Trend sink with the OpenHistogram circllhist histogram. func NewTrendSink() *TrendSink { return &TrendSink{} @@ -187,6 +256,29 @@ func (t *TrendSink) Format(tt time.Duration) map[string]float64 { } } +// Drain encodes the current sink values and clears them. +// +// TODO: obviously use something more efficient (e.g. protobuf) +func (t *TrendSink) Drain() ([]byte, error) { + res, err := json.Marshal(t.values) + *t = TrendSink{} + return res, err +} + +// Merge decoeds the given values and merges them with the values in the current sink. +func (t *TrendSink) Merge(from []byte) error { + // TODO: obviously use something more efficient (e.g. protobuf), this is + // just for demo purposes + var values []float64 + if err := json.Unmarshal(from, &values); err != nil { + return err + } + for _, v := range values { + t.Add(Sample{Value: v}) + } + return nil +} + type RateSink struct { Trues int64 Total int64 @@ -210,3 +302,26 @@ func (r RateSink) Format(t time.Duration) map[string]float64 { return map[string]float64{"rate": rate} } + +// Drain encodes the current sink values and clears them. +// +// TODO: something more robust and efficient +func (r *RateSink) Drain() ([]byte, error) { + res := []byte(fmt.Sprintf("%d %d", r.Trues, r.Total)) + r.Trues = 0 + r.Total = 0 + return res, nil +} + +// Merge decoeds the given values and merges them with the values in the current sink. +func (r *RateSink) Merge(from []byte) error { + var trues, total int64 + _, err := fmt.Sscanf(string(from), "%d %d", &trues, &total) + if err != nil { + return err + } + + r.Trues += trues + r.Total += total + return nil +}