Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metricbeat] - Allow metricsets to report their status via v2 protocol #40025

Merged
merged 35 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f87e6bb
fix: initial commit
VihasMakwana Jun 26, 2024
98108a1
tests: add integration test cases
VihasMakwana Jun 26, 2024
e37ef86
fix: expand testing scenarios
VihasMakwana Jun 26, 2024
258c786
fix: add comments
VihasMakwana Jun 26, 2024
d7cecfe
fix: move integration tests to system/process
VihasMakwana Jun 26, 2024
feaa0fa
cleanup
VihasMakwana Jun 26, 2024
5f35ae5
fix: ci
VihasMakwana Jun 26, 2024
ecdfa1e
fix: ci and typos
VihasMakwana Jun 26, 2024
6c877bb
chore: update changelog
VihasMakwana Jun 26, 2024
19b82c7
fix: add helper
VihasMakwana Jun 26, 2024
52cd0ed
fix: remove extra space
VihasMakwana Jun 26, 2024
735c611
Merge branch 'main' into add-metricbeat-status-reporter
VihasMakwana Jun 26, 2024
2fa72c9
fix: ci
VihasMakwana Jun 26, 2024
06c98f4
fix: move integration tests to x-pack
VihasMakwana Jun 26, 2024
bdd1464
fix: add null check
VihasMakwana Jun 27, 2024
c7b9766
fix: ci
VihasMakwana Jun 27, 2024
d0fc62c
Merge branch 'main' into add-metricbeat-status-reporter
pierrehilbert Jul 1, 2024
b6814a4
Merge branch 'forked/main' into add-metricbeat-status-reporter
VihasMakwana Jul 2, 2024
6785fe7
fix: remove unused code
VihasMakwana Jul 2, 2024
a049950
fix: lint
VihasMakwana Jul 2, 2024
f3f137f
fix: lint and imports
VihasMakwana Jul 2, 2024
4cb011b
Merge branch 'main' into add-metricbeat-status-reporter
VihasMakwana Jul 2, 2024
5ffaa6a
fix: ci windows
VihasMakwana Jul 3, 2024
bb6800b
Merge branch 'main' into add-metricbeat-status-reporter
VihasMakwana Jul 3, 2024
9d9bcda
inting for windows
VihasMakwana Jul 4, 2024
8dd335a
fix lint linux
VihasMakwana Jul 4, 2024
8dc7b96
fix: go imports
VihasMakwana Jul 4, 2024
c7bea06
Merge branch 'main' into add-metricbeat-status-reporter
VihasMakwana Jul 5, 2024
071267c
Merge branch 'main' into add-metricbeat-status-reporter
pierrehilbert Jul 6, 2024
6e88ac5
fix: switch to the generic way
VihasMakwana Jul 8, 2024
eaf18fa
chore: make error descriptive
VihasMakwana Jul 9, 2024
959014d
fix: move status report after fetch
VihasMakwana Jul 12, 2024
2f06c14
fix: typo
VihasMakwana Jul 15, 2024
abcb26a
Merge branch 'main' into add-metricbeat-status-reporter
VihasMakwana Jul 17, 2024
6d5f22f
fix: remove nolint
VihasMakwana Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]
- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025]
- Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation

*Osquerybeat*
Expand Down
13 changes: 8 additions & 5 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package helper

import (
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
content := []byte(test.Content)
tmpfile, err := ioutil.TempFile("", "token")
tmpfile, err := os.CreateTemp("", "token")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)
go http.Serve(l, mux) //nolint:all // Ignore the error

return l
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand All @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) {
r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
content, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})
Expand Down Expand Up @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig {
func (*dummyModule) UnpackConfig(interface{}) error {
return nil
}
func (dummyModule) UpdateStatus(_ status.Status, _ string) {}
func (dummyModule) SetStatusReporter(_ status.StatusReporter) {}
28 changes: 22 additions & 6 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,9 +63,11 @@ const (

// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent.
SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module.
}

// BaseModule implements the Module interface.
Expand All @@ -73,9 +76,10 @@ type Module interface {
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *conf.C
name string
config ModuleConfig
rawConfig *conf.C
statusReporter status.StatusReporter
}

func (m *BaseModule) String() string {
Expand All @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}

// UpdateStatus updates the status of the module. Reflected on elastic-agent.
func (m *BaseModule) UpdateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}

// SetStatusReporter sets the status repoter of the module.
func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) {
m.statusReporter = statusReporter
}

// WithConfig re-configures the module with the given raw configuration and returns a
// copy of the module.
// Intended to be called from module factories. Note that if metricsets are specified
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/module/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup {
func (mr *runner) String() string {
return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets))
}

func (mr *runner) SetStatusReporter(reporter status.StatusReporter) {
mr.mod.SetStatusReporter(reporter)
}
9 changes: 9 additions & 0 deletions metricbeat/mb/module/runner_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
)

type runnerGroup struct {
Expand All @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner {
}
}

func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) {
for _, runner := range rg.runners {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(reporter)
}
}
}

func (rg *runnerGroup) Start() {
rg.startOnce.Do(func() {
for _, runner := range rg.runners {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String())

msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name()))
msw.run(done, out)
}(msw)
}
Expand Down Expand Up @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use %v for the err

logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use %v for the err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkoutsovasilis I didn't see this.
I'll raise another PR for this.
Sorry from my end!

logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ that Metricbeat does it and with the same validations.
}
}
*/
//nolint:all // It's a test file
package testing

import (
Expand All @@ -61,6 +62,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -72,9 +74,11 @@ type TestModule struct {
RawConfig *conf.C
}

func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) UpdateStatus(_ status.Status, _ string) {}
func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {}

func NewTestModule(t testing.TB, config interface{}) *TestModule {
c, err := conf.NewConfigFrom(config)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package node_stats
import (
"testing"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig {
func (m mockModule) UnpackConfig(to interface{}) error {
return nil
}

func (m mockModule) UpdateStatus(_ status.Status, _ string) {}
func (m mockModule) SetStatusReporter(_ status.StatusReporter) {}
Loading
Loading