From f87e6bbaa75c8a2cfe85aa7cab8857e63e081967 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 15:51:56 +0530 Subject: [PATCH 01/27] fix: initial commit --- metricbeat/mb/mb.go | 24 ++++++++++++++++++--- metricbeat/mb/module/runner.go | 5 +++++ metricbeat/mb/module/runner_group.go | 9 ++++++++ metricbeat/mb/module/wrapper.go | 5 +++++ metricbeat/module/system/process/process.go | 6 ++++++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 7e18dc9029d..c361a28c6df 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -27,6 +27,7 @@ import ( "net/url" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -65,6 +66,8 @@ type Module interface { Name() string // Name returns the name of the Module. Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + UpdateStatus(status status.Status, msg string) + SetStatusReporter(statusReporter status.StatusReporter) } // BaseModule implements the Module interface. @@ -73,9 +76,10 @@ type Module interface { // MetricSets, it can embed this type into another struct to satisfy the // Module interface requirements. type BaseModule struct { - name string - config ModuleConfig - rawConfig *conf.C + name string + config ModuleConfig + rawConfig *conf.C + statusReporter status.StatusReporter } func (m *BaseModule) String() string { @@ -95,6 +99,16 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } +// UnpackConfig unpacks the raw module config to the given object. +func (m *BaseModule) UpdateStatus(status status.Status, msg string) { + m.statusReporter.UpdateStatus(status, msg) +} + +// UnpackConfig unpacks the raw module config to the given object. +func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter +} + // WithConfig re-configures the module with the given raw configuration and returns a // copy of the module. // Intended to be called from module factories. Note that if metricsets are specified @@ -354,6 +368,10 @@ func (b *BaseMetricSet) Registration() MetricSetRegistration { return b.registration } +func (b *BaseMetricSet) UpdateStatus(status status.Status, msg string) { + b.Module().UpdateStatus(status, msg) +} + // Configuration types // ModuleConfig is the base configuration data for all Modules. diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 1b0a621d705..aedb443e9a8 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup { func (mr *runner) String() string { return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets)) } + +func (mr *runner) SetStatusReporter(reporter status.StatusReporter) { + mr.mod.SetStatusReporter(reporter) +} diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index e020cd87d55..b4d92d29f56 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" ) type runnerGroup struct { @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner { } } +func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) { + for _, runner := range rg.runners { + if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { + runnerWithStatus.SetStatusReporter(reporter) + } + } +} + func (rg *runnerGroup) Start() { rg.startOnce.Do(func() { for _, runner := range rg.runners { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index d41bdf01497..dc2ef6a9938 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -94,6 +95,10 @@ func NewWrapperForMetricSet(module mb.Module, metricSet mb.MetricSet, options .. return createWrapper(module, []mb.MetricSet{metricSet}, options...) } +func (r *Wrapper) UpdateStatus(status status.Status, msg string) { + r.Module.UpdateStatus(status, msg) +} + func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Option) (*Wrapper, error) { wrapper := &Wrapper{ Module: module, diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index ad9fa8d5ac0..676541df2ff 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -24,6 +24,7 @@ import ( "os" "runtime" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/logp" @@ -53,6 +54,7 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { + base.UpdateStatus(status.Failed, err.Error()) return nil, err } @@ -99,6 +101,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { err := m.stats.Init() if err != nil { + base.UpdateStatus(status.Failed, err.Error()) return nil, err } return m, nil @@ -127,7 +130,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) if err != nil { + m.UpdateStatus(status.Degraded, err.Error()) return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) + } else { + m.UpdateStatus(status.Running, "") } r.Event(mb.Event{ MetricSetFields: proc, From 98108a1acd1ab734324f71362dff72fe9d43abf3 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 18:20:35 +0530 Subject: [PATCH 02/27] tests: add integration test cases --- .../module/system/tests/integration_test.go | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 metricbeat/module/system/tests/integration_test.go diff --git a/metricbeat/module/system/tests/integration_test.go b/metricbeat/module/system/tests/integration_test.go new file mode 100644 index 00000000000..e27ee2fca03 --- /dev/null +++ b/metricbeat/module/system/tests/integration_test.go @@ -0,0 +1,200 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestSystem(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.RootCmd) + + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(os.TempDir(), filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + var logOutputStream = []*proto.UnitExpected{ + { + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + }, + { + Id: unitOneID, + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Streams: []*proto.Stream{{ + Id: "system/metrics-system.process-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.process", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"process"}, + // pid -1 doesn't exist. It should report an error AND update state as DEGRADED + "process.pid": -1, + }), + }}, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + }, + }, + } + + // expectedMBStreams.Streams = systemInputStreams + // elastic-agent management V2 mock server + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + server.Start() + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.RootCmd.Execute() + require.NoError(t, err) + }() + expectedStatus := []proto.State{ + proto.State_HEALTHY, + proto.State_DEGRADED, + } + + timer := time.NewTimer(2 * time.Minute) + id := 0 + for id < len(expectedStatus) { + time.Sleep(1 * time.Second) + select { + case observed := <-observedStates: + state := extracState(observed.GetUnits(), unitOneID) + fmt.Println("state", state, expectedStatus[id]) + expectedUnits <- logOutputStream + if state != expectedStatus[id] { + continue + } + timer.Reset(2 * time.Minute) + id++ + case <-timer.C: + t.Fatal("timeout waiting for checkin") + default: + } + } +} + +func extracState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} From e37ef8608af9db41fcfc0111160aed61d7147f89 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 19:10:21 +0530 Subject: [PATCH 03/27] fix: expand testing scenarios --- .../module/system/tests/integration_test.go | 160 +++++++++++------- 1 file changed, 96 insertions(+), 64 deletions(-) diff --git a/metricbeat/module/system/tests/integration_test.go b/metricbeat/module/system/tests/integration_test.go index e27ee2fca03..efa4372f584 100644 --- a/metricbeat/module/system/tests/integration_test.go +++ b/metricbeat/module/system/tests/integration_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build integration + package tests import ( @@ -58,65 +60,35 @@ func TestSystem(t *testing.T) { require.NoError(t, err) }() - var logOutputStream = []*proto.UnitExpected{ - { - Id: unitOutID, - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - DataStream: &proto.DataStream{ - Namespace: "default", - }, - Type: "file", - Revision: 1, - Meta: &proto.Meta{ - Package: &proto.Package{ - Name: "system", - Version: "1.17.0", - }, - }, - Source: tests.RequireNewStruct(map[string]interface{}{ - "type": "file", - "enabled": true, - "path": outPath, - "filename": "beat-out", - "number_of_files": 7, - }), + /* + process with pid=-1 doesn't exist. This should degrade the input for a while + */ + inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) + inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) // Correct pid. This should turn the input back to healthy + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", }, - }, - { - Id: unitOneID, - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - DataStream: &proto.DataStream{ - Namespace: "default", - }, - Streams: []*proto.Stream{{ - Id: "system/metrics-system.process-default-system", - DataStream: &proto.DataStream{ - Dataset: "system.process", - Type: "metrics", - }, - Source: tests.RequireNewStruct(map[string]interface{}{ - "metricsets": []interface{}{"process"}, - // pid -1 doesn't exist. It should report an error AND update state as DEGRADED - "process.pid": -1, - }), - }}, - Type: "system/metrics", - Id: "system/metrics-system-default-system", - Name: "system-1", - Revision: 1, - Meta: &proto.Meta{ - Package: &proto.Package{ - Name: "system", - Version: "1.17.0", - }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", }, }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), }, } @@ -164,21 +136,46 @@ func TestSystem(t *testing.T) { err := cmd.RootCmd.Execute() require.NoError(t, err) }() - expectedStatus := []proto.State{ - proto.State_HEALTHY, - proto.State_DEGRADED, + // expectedStatus := []proto.State{ + // proto.State_HEALTHY, + // proto.State_DEGRADED, + // } + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIncorrectPid, + }, + { + proto.State_DEGRADED, + &inputStreamCorrectPid, + }, + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, } timer := time.NewTimer(2 * time.Minute) id := 0 - for id < len(expectedStatus) { - time.Sleep(1 * time.Second) + for id < len(scenarios) { select { case observed := <-observedStates: state := extracState(observed.GetUnits(), unitOneID) - fmt.Println("state", state, expectedStatus[id]) - expectedUnits <- logOutputStream - if state != expectedStatus[id] { + fmt.Println(id, state, scenarios[id].expectedStatus) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { continue } timer.Reset(2 * time.Minute) @@ -198,3 +195,38 @@ func extracState(units []*proto.UnitObserved, idx string) proto.State { } return -1 } + +func getInputStream(id string, pid int, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Streams: []*proto.Stream{{ + Id: "system/metrics-system.process-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.process", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"process"}, + "process.pid": pid, + }), + }}, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + }, + } +} From 258c786dd5056a91295bf725e6b84d4876a7c4eb Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 19:13:47 +0530 Subject: [PATCH 04/27] fix: add comments --- metricbeat/module/system/tests/integration_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/system/tests/integration_test.go b/metricbeat/module/system/tests/integration_test.go index efa4372f584..0a097edabe8 100644 --- a/metricbeat/module/system/tests/integration_test.go +++ b/metricbeat/module/system/tests/integration_test.go @@ -61,10 +61,11 @@ func TestSystem(t *testing.T) { }() /* - process with pid=-1 doesn't exist. This should degrade the input for a while - */ + * process with pid=-1 doesn't exist. This should degrade the input for a while */ inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) - inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) // Correct pid. This should turn the input back to healthy + /* + * process with valid pid. This should change state to healthy */ + inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) outputExpectedStream := proto.UnitExpected{ Id: unitOutID, Type: proto.UnitType_OUTPUT, @@ -136,10 +137,6 @@ func TestSystem(t *testing.T) { err := cmd.RootCmd.Execute() require.NoError(t, err) }() - // expectedStatus := []proto.State{ - // proto.State_HEALTHY, - // proto.State_DEGRADED, - // } scenarios := []struct { expectedStatus proto.State From d7cecfe05e30983af0535179fc0a79ad979d3fd4 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 19:20:56 +0530 Subject: [PATCH 05/27] fix: move integration tests to system/process --- .../tests/process_integration_test.go} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename metricbeat/module/system/{tests/integration_test.go => process/tests/process_integration_test.go} (99%) diff --git a/metricbeat/module/system/tests/integration_test.go b/metricbeat/module/system/process/tests/process_integration_test.go similarity index 99% rename from metricbeat/module/system/tests/integration_test.go rename to metricbeat/module/system/process/tests/process_integration_test.go index 0a097edabe8..d039b114baf 100644 --- a/metricbeat/module/system/tests/integration_test.go +++ b/metricbeat/module/system/process/tests/process_integration_test.go @@ -43,7 +43,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" ) -func TestSystem(t *testing.T) { +func TestProcessStatusReporter(t *testing.T) { unitOneID := mock.NewID() unitOutID := mock.NewID() token := mock.NewID() From feaa0fa11db95a9768f3e14af3a8066243d43e22 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 19:21:57 +0530 Subject: [PATCH 06/27] cleanup --- .../module/system/process/tests/process_integration_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metricbeat/module/system/process/tests/process_integration_test.go b/metricbeat/module/system/process/tests/process_integration_test.go index d039b114baf..7d57658cfff 100644 --- a/metricbeat/module/system/process/tests/process_integration_test.go +++ b/metricbeat/module/system/process/tests/process_integration_test.go @@ -93,11 +93,10 @@ func TestProcessStatusReporter(t *testing.T) { }, } - // expectedMBStreams.Streams = systemInputStreams - // elastic-agent management V2 mock server observedStates := make(chan *proto.CheckinObserved) expectedUnits := make(chan []*proto.UnitExpected) done := make(chan struct{}) + // V2 mock server server := &mock.StubServerV2{ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { select { @@ -167,7 +166,6 @@ func TestProcessStatusReporter(t *testing.T) { select { case observed := <-observedStates: state := extracState(observed.GetUnits(), unitOneID) - fmt.Println(id, state, scenarios[id].expectedStatus) expectedUnits <- []*proto.UnitExpected{ scenarios[id].nextInputunit, &outputExpectedStream, From 5f35ae5b2feb4f21f749adae5250da1b49aee60f Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 20:02:36 +0530 Subject: [PATCH 07/27] fix: ci --- metricbeat/helper/http_test.go | 3 +++ metricbeat/mb/testing/modules.go | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 2fbfea0d1ad..af15b7f35aa 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig { func (*dummyModule) UnpackConfig(interface{}) error { return nil } +func (dummyModule) UpdateStatus(_ status.Status, _ string) {} +func (dummyModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 1dcc9b075b8..1dbf99bb5d2 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -61,6 +61,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -72,9 +73,11 @@ type TestModule struct { RawConfig *conf.C } -func (m *TestModule) Name() string { return m.ModName } -func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } -func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) Name() string { return m.ModName } +func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } +func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} +func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) From ecdfa1e42919876694180d90af2334c6ceeabb37 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 20:48:53 +0530 Subject: [PATCH 08/27] fix: ci and typos --- metricbeat/helper/http_test.go | 9 +++++---- metricbeat/mb/mb.go | 14 +++++++------- .../process/tests/process_integration_test.go | 8 ++++++-- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index af15b7f35aa..1db9ab1cc1e 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,7 +19,7 @@ package helper import ( "fmt" - "io/ioutil" + "io" "net" "net/http" "net/http/httptest" @@ -56,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { content := []byte(test.Content) - tmpfile, err := ioutil.TempFile("", "token") + tmpfile, err := os.CreateTemp("", "token") if err != nil { t.Fatal(err) } @@ -237,6 +237,7 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) + // nolint: errcheck // Ignore the error go http.Serve(l, mux) return l @@ -244,7 +245,7 @@ func TestOverUnixSocket(t *testing.T) { for title, c := range cases { t.Run(title, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -263,7 +264,7 @@ func TestOverUnixSocket(t *testing.T) { r, err := h.FetchResponse() require.NoError(t, err) defer r.Body.Close() - content, err := ioutil.ReadAll(r.Body) + content, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, []byte("ehlo!"), content) }) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index c361a28c6df..6fc01834fc9 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -63,11 +63,11 @@ const ( // Module is the common interface for all Module implementations. type Module interface { - Name() string // Name returns the name of the Module. - Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. - UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. - UpdateStatus(status status.Status, msg string) - SetStatusReporter(statusReporter status.StatusReporter) + Name() string // Name returns the name of the Module. + Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. + UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. + SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status repoter for the given module. } // BaseModule implements the Module interface. @@ -99,12 +99,12 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } -// UnpackConfig unpacks the raw module config to the given object. +// UpdateStatus updates the status of the module. Reflected on elastic-agent. func (m *BaseModule) UpdateStatus(status status.Status, msg string) { m.statusReporter.UpdateStatus(status, msg) } -// UnpackConfig unpacks the raw module config to the given object. +// SetStatusReporter updates the status of the module. Reflected on elastic-agent. func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { m.statusReporter = statusReporter } diff --git a/metricbeat/module/system/process/tests/process_integration_test.go b/metricbeat/module/system/process/tests/process_integration_test.go index 7d57658cfff..51e236ccc2b 100644 --- a/metricbeat/module/system/process/tests/process_integration_test.go +++ b/metricbeat/module/system/process/tests/process_integration_test.go @@ -165,7 +165,7 @@ func TestProcessStatusReporter(t *testing.T) { for id < len(scenarios) { select { case observed := <-observedStates: - state := extracState(observed.GetUnits(), unitOneID) + state := extractState(observed.GetUnits(), unitOneID) expectedUnits <- []*proto.UnitExpected{ scenarios[id].nextInputunit, &outputExpectedStream, @@ -173,6 +173,10 @@ func TestProcessStatusReporter(t *testing.T) { if state != scenarios[id].expectedStatus { continue } + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + timer.Reset(2 * time.Minute) id++ case <-timer.C: @@ -182,7 +186,7 @@ func TestProcessStatusReporter(t *testing.T) { } } -func extracState(units []*proto.UnitObserved, idx string) proto.State { +func extractState(units []*proto.UnitObserved, idx string) proto.State { for _, unit := range units { if unit.Id == idx { return unit.GetState() From 6c877bb46c1ddf1a1bf7805ae42f3e0923d1110b Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 20:49:53 +0530 Subject: [PATCH 09/27] chore: update changelog --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 93d33bee5c4..b2794170f0a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -52,6 +52,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] +- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] + *Osquerybeat* From 19b82c7a60d31c111ec6838966c68b2a538c4114 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 21:26:16 +0530 Subject: [PATCH 10/27] fix: add helper --- metricbeat/mb/mb.go | 11 +++++++++++ metricbeat/module/system/process/process.go | 5 ++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 6fc01834fc9..db5dee38dfe 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -372,6 +372,17 @@ func (b *BaseMetricSet) UpdateStatus(status status.Status, msg string) { b.Module().UpdateStatus(status, msg) } +// UpdateStatusOnErr is a helper which will: +// - set status to DEGRADED if error is encountered +// - set status to RUNNING otherwise +func (b *BaseMetricSet) UpdateStatusOnErr(err error) { + if err != nil { + b.UpdateStatus(status.Degraded, err.Error()) + } else { + b.UpdateStatus(status.Running, "") + } +} + // Configuration types // ModuleConfig is the base configuration data for all Modules. diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index 676541df2ff..efe16c33f84 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -114,6 +114,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // monitor either a single PID, or the configured set of processes. if m.setpid == 0 { procs, roots, err := m.stats.Get() + m.UpdateStatusOnErr(err) if err != nil { return fmt.Errorf("process stats: %w", err) } @@ -129,11 +130,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { } } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) + m.UpdateStatusOnErr(err) if err != nil { - m.UpdateStatus(status.Degraded, err.Error()) return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) - } else { - m.UpdateStatus(status.Running, "") } r.Event(mb.Event{ MetricSetFields: proc, From 52cd0ed4f8dad44a1e5dfcf0a3d2cc5ee93d6cf3 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 21:27:04 +0530 Subject: [PATCH 11/27] fix: remove extra space --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b2794170f0a..53d4146e77b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -54,7 +54,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] - Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] - *Osquerybeat* - Add action responses data stream, allowing osquerybeat to post action results directly to elasticsearch. {pull}39143[39143] From 2fa72c978c21247f786cf88f2eedf8c326cecea1 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 21:44:41 +0530 Subject: [PATCH 12/27] fix: ci --- metricbeat/module/elasticsearch/node_stats/data_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metricbeat/module/elasticsearch/node_stats/data_test.go b/metricbeat/module/elasticsearch/node_stats/data_test.go index e6151555701..2317418eeaf 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_test.go +++ b/metricbeat/module/elasticsearch/node_stats/data_test.go @@ -22,6 +22,7 @@ package node_stats import ( "testing" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig { func (m mockModule) UnpackConfig(to interface{}) error { return nil } + +func (m mockModule) UpdateStatus(_ status.Status, _ string) {} +func (m mockModule) SetStatusReporter(_ status.StatusReporter) {} From 06c98f410714ce1ff0edeb7ffc0de2c646ab6c66 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 26 Jun 2024 22:01:39 +0530 Subject: [PATCH 13/27] fix: move integration tests to x-pack --- .../system}/process_integration_test.go | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) rename {metricbeat/module/system/process/tests => x-pack/libbeat/management/tests/mbtest/system}/process_integration_test.go (87%) diff --git a/metricbeat/module/system/process/tests/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go similarity index 87% rename from metricbeat/module/system/process/tests/process_integration_test.go rename to x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go index 51e236ccc2b..d3c3c6019c6 100644 --- a/metricbeat/module/system/process/tests/process_integration_test.go +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -1,19 +1,6 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. //go:build integration From bdd1464be4a9ac34c40dcbf658ec92c930be3227 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 27 Jun 2024 16:20:44 +0530 Subject: [PATCH 14/27] fix: add null check --- metricbeat/mb/mb.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index db5dee38dfe..7577a703400 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -101,7 +101,9 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { // UpdateStatus updates the status of the module. Reflected on elastic-agent. func (m *BaseModule) UpdateStatus(status status.Status, msg string) { - m.statusReporter.UpdateStatus(status, msg) + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } } // SetStatusReporter updates the status of the module. Reflected on elastic-agent. From c7b97666e9188be1cc8431b18658734fd6b046f4 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 27 Jun 2024 16:24:27 +0530 Subject: [PATCH 15/27] fix: ci --- metricbeat/helper/http_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 1db9ab1cc1e..3b3d89820d5 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -237,7 +237,7 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - // nolint: errcheck // Ignore the error + // nolint:errcheck // Ignore the error go http.Serve(l, mux) return l From 6785fe7ec042ec06dec60639b9f147eabaaa6c69 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 2 Jul 2024 11:19:52 +0530 Subject: [PATCH 16/27] fix: remove unused code --- metricbeat/mb/module/wrapper.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index dc2ef6a9938..d41bdf01497 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -95,10 +94,6 @@ func NewWrapperForMetricSet(module mb.Module, metricSet mb.MetricSet, options .. return createWrapper(module, []mb.MetricSet{metricSet}, options...) } -func (r *Wrapper) UpdateStatus(status status.Status, msg string) { - r.Module.UpdateStatus(status, msg) -} - func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Option) (*Wrapper, error) { wrapper := &Wrapper{ Module: module, From a04995067cb46bdfa595e303f4435436c6346712 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 2 Jul 2024 11:38:04 +0530 Subject: [PATCH 17/27] fix: lint --- metricbeat/helper/http_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 3b3d89820d5..b874587ad74 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -237,7 +237,7 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - // nolint:errcheck // Ignore the error + //nolint:errcheck // Ignore the error go http.Serve(l, mux) return l From f3f137f200aa3da07b14b001fb4747cf97cf56fa Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 2 Jul 2024 20:19:11 +0530 Subject: [PATCH 18/27] fix: lint and imports --- metricbeat/mb/mb.go | 2 +- metricbeat/mb/testing/modules.go | 5 +++++ .../tests/mbtest/system/process_integration_test.go | 5 +++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 7577a703400..e7567747ce6 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -106,7 +106,7 @@ func (m *BaseModule) UpdateStatus(status status.Status, msg string) { } } -// SetStatusReporter updates the status of the module. Reflected on elastic-agent. +// SetStatusReporter sets the status repoter of the module. func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { m.statusReporter = statusReporter } diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 1dbf99bb5d2..475b0f607e3 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -135,6 +135,9 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re return metricsets } +/* eslint-disable */ +// Following methods returns deprecated metricsets for testing. Disable eslint as it complains + func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { metricSet := NewMetricSet(t, config) @@ -443,6 +446,8 @@ func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { } } +/* eslint-enable */ + // RunPushMetricSetV2 run the given push metricset for the specific amount of // time and returns all of the events and errors that occur during that period. func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event { diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go index d3c3c6019c6..8bdeffa2f25 100644 --- a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -20,13 +20,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" - "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + conf "github.com/elastic/elastic-agent-libs/config" ) From 5ffaa6a6becb6c4070b582d0c1a5cd23dce67c5a Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 3 Jul 2024 18:41:57 +0530 Subject: [PATCH 19/27] fix: ci windows --- metricbeat/helper/http_test.go | 3 +-- metricbeat/mb/testing/modules.go | 6 +----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index b874587ad74..939200a5a4d 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -237,8 +237,7 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - //nolint:errcheck // Ignore the error - go http.Serve(l, mux) + go http.Serve(l, mux) //nolint:errcheck // Ignore the error return l } diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 475b0f607e3..e76c2e9028b 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,6 +53,7 @@ that Metricbeat does it and with the same validations. } } */ +//nolint:errcheck // It's a test file package testing import ( @@ -135,9 +136,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re return metricsets } -/* eslint-disable */ -// Following methods returns deprecated metricsets for testing. Disable eslint as it complains - func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { metricSet := NewMetricSet(t, config) @@ -446,8 +444,6 @@ func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { } } -/* eslint-enable */ - // RunPushMetricSetV2 run the given push metricset for the specific amount of // time and returns all of the events and errors that occur during that period. func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event { From 9d9bcda4a8a15328699b7d3fb26f5fc2c76862e5 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 19:02:10 +0530 Subject: [PATCH 20/27] inting for windows --- metricbeat/helper/http_test.go | 2 +- metricbeat/mb/testing/modules.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 939200a5a4d..2f5a43d2dc4 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -237,7 +237,7 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - go http.Serve(l, mux) //nolint:errcheck // Ignore the error + go http.Serve(l, mux) //nolint:all // Ignore the error return l } diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index e76c2e9028b..e198e9a29b5 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,7 +53,7 @@ that Metricbeat does it and with the same validations. } } */ -//nolint:errcheck // It's a test file +//nolint:all // It's a test file package testing import ( From 8dd335a25fe1dbeb20d2a19abc8f2af8b01caa1e Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 19:54:50 +0530 Subject: [PATCH 21/27] fix lint linux --- .../management/tests/mbtest/system/process_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go index 8bdeffa2f25..d773b3c9a49 100644 --- a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -100,7 +100,7 @@ func TestProcessStatusReporter(t *testing.T) { return nil }, } - server.Start() + require.NoError(t, server.Start()) defer server.Stop() // start the client From 8dc7b967ba553dcf7a617a57bca40b0865ec7221 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 22:32:38 +0530 Subject: [PATCH 22/27] fix: go imports --- .../management/tests/mbtest/system/process_integration_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go index d773b3c9a49..5e22332f2e4 100644 --- a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -20,10 +20,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" From 6e88ac5c0335822d0b2c1fc2eccc96213747b365 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 8 Jul 2024 17:35:10 +0530 Subject: [PATCH 23/27] fix: switch to the generic way --- metricbeat/mb/mb.go | 15 --------------- metricbeat/mb/module/wrapper.go | 3 +++ metricbeat/module/system/process/process.go | 5 ----- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index e7567747ce6..ed847eef6b4 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -370,21 +370,6 @@ func (b *BaseMetricSet) Registration() MetricSetRegistration { return b.registration } -func (b *BaseMetricSet) UpdateStatus(status status.Status, msg string) { - b.Module().UpdateStatus(status, msg) -} - -// UpdateStatusOnErr is a helper which will: -// - set status to DEGRADED if error is encountered -// - set status to RUNNING otherwise -func (b *BaseMetricSet) UpdateStatusOnErr(err error) { - if err != nil { - b.UpdateStatus(status.Degraded, err.Error()) - } else { - b.UpdateStatus(status.Running, "") - } -} - // Configuration types // ModuleConfig is the base configuration data for all Modules. diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index d41bdf01497..566acb280dc 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -376,8 +377,10 @@ func (r reporterV2) Event(event mb.Event) bool { if event.Error == nil { r.msw.stats.success.Add(1) + r.msw.Module().UpdateStatus(status.Running, "") } else { r.msw.stats.failures.Add(1) + r.msw.Module().UpdateStatus(status.Degraded, event.Error.Error()) } if event.Namespace == "" { diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index efe16c33f84..ad9fa8d5ac0 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -24,7 +24,6 @@ import ( "os" "runtime" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/logp" @@ -54,7 +53,6 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { - base.UpdateStatus(status.Failed, err.Error()) return nil, err } @@ -101,7 +99,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { err := m.stats.Init() if err != nil { - base.UpdateStatus(status.Failed, err.Error()) return nil, err } return m, nil @@ -114,7 +111,6 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // monitor either a single PID, or the configured set of processes. if m.setpid == 0 { procs, roots, err := m.stats.Get() - m.UpdateStatusOnErr(err) if err != nil { return fmt.Errorf("process stats: %w", err) } @@ -130,7 +126,6 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { } } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) - m.UpdateStatusOnErr(err) if err != nil { return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) } From eaf18faf82f103ecd5500f41d81fcc79af79bf02 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 10 Jul 2024 02:36:06 +0530 Subject: [PATCH 24/27] chore: make error descriptive --- metricbeat/mb/module/wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 566acb280dc..3fd40e6cec6 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -380,7 +380,7 @@ func (r reporterV2) Event(event mb.Event) bool { r.msw.Module().UpdateStatus(status.Running, "") } else { r.msw.stats.failures.Add(1) - r.msw.Module().UpdateStatus(status.Degraded, event.Error.Error()) + r.msw.Module().UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", r.msw.module.Name(), r.msw.MetricSet.Name(), event.Error)) } if event.Namespace == "" { From 959014d0441cc5cb734798f9090802766cba3ebd Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 12 Jul 2024 16:29:17 +0530 Subject: [PATCH 25/27] fix: move status report after fetch --- metricbeat/mb/module/wrapper.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 3fd40e6cec6..feaf11363fa 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -147,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { registry.Add(metricsPath, msw.Metrics(), monitoring.Full) monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String()) + msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name())) msw.run(done, out) }(msw) } @@ -254,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { err := fetcher.Fetch(reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) @@ -377,10 +384,8 @@ func (r reporterV2) Event(event mb.Event) bool { if event.Error == nil { r.msw.stats.success.Add(1) - r.msw.Module().UpdateStatus(status.Running, "") } else { r.msw.stats.failures.Add(1) - r.msw.Module().UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", r.msw.module.Name(), r.msw.MetricSet.Name(), event.Error)) } if event.Namespace == "" { From 2f06c149a3a72ef102957e5e60ec5c86ea154417 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 16 Jul 2024 01:14:29 +0530 Subject: [PATCH 26/27] fix: typo --- metricbeat/mb/mb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index ed847eef6b4..0be1db7cef3 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -67,7 +67,7 @@ type Module interface { Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. - SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status repoter for the given module. + SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module. } // BaseModule implements the Module interface. From 6d5f22fa3aad56f7f006863f8b49032fcee2431a Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Wed, 24 Jul 2024 13:07:03 +0530 Subject: [PATCH 27/27] fix: remove nolint --- metricbeat/mb/testing/modules.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 9c0b195a26a..736bb1f40e6 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,7 +53,7 @@ that Metricbeat does it and with the same validations. } } */ -//nolint:all // It's a test file + package testing import (