Skip to content

Commit

Permalink
ReportingMetricSetV2WithContext
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Apr 29, 2019
1 parent da007c5 commit b1ceef9
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 20 deletions.
8 changes: 6 additions & 2 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "ReportingMetricSetV2Error")
}

if _, ok := ms.(ReportingMetricSetV2WithContext); ok {
ifcs = append(ifcs, "ReportingMetricSetV2WithContext")
}

if _, ok := ms.(PushMetricSetV2); ok {
ifcs = append(ifcs, "PushMetricSetV2")
}
Expand All @@ -253,8 +257,8 @@ func mustImplementFetcher(ms MetricSet) error {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+
"PushMetricSetV2)",
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+
"PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)",
ms.Module().Name(), ms.Name())
case 1:
return nil
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ type ReportingMetricSetV2Error interface {
Fetch(r ReporterV2) error
}

// ReportingMetricSetV2WithContext is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2WithContext interface {
MetricSet
Fetch(ctx context.Context, r ReporterV2) error
}

// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
Expand Down
19 changes: 13 additions & 6 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
msw.startPeriodicFetching(&channelContext{done}, reporter)
default:
// Earlier startup stages prevent this from happening.
logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
Expand All @@ -207,9 +207,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
// startPeriodicFetching performs an immediate fetch for the MetricSet then it
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
// Fetch immediately.
msw.fetch(reporter)
msw.fetch(ctx, reporter)

// Start timer for future fetches.
t := time.NewTicker(msw.Module().Config().Period)
Expand All @@ -219,15 +219,15 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) {
case <-reporter.V2().Done():
return
case <-t.C:
msw.fetch(reporter)
msw.fetch(ctx, reporter)
}
}
}

// fetch invokes the appropriate Fetch method for the MetricSet and publishes
// the result using the publisher client. This method will recover from panics
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(reporter reporter) {
func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
Expand All @@ -246,6 +246,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, pa
return WriteEventsReporterV2ErrorCond(f, t, path, nil)
}

// WriteEventsReporterV2WithContext fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2WithContext(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string) error {
return WriteEventsReporterV2WithContextCond(f, t, path, nil)
}

// WriteEventsReporterV2Cond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error {
Expand Down Expand Up @@ -129,6 +135,21 @@ func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB
return writeEvent(events, f, t, path, cond)
}

// WriteEventsReporterV2WithContextCond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2WithContextCond(f mb.ReportingMetricSetV2WithContext, t testing.TB, path string, cond func(common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, errs := ReportingFetchV2WithContext(f)
if len(errs) > 0 {
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error {
if len(events) == 0 {
return fmt.Errorf("no events were generated")
Expand Down
24 changes: 24 additions & 0 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
return reportingMetricSetV2Error
}

// NewReportingMetricSetV2WithContext returns a new ReportingMetricSetV2WithContext instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2WithContext(t testing.TB, config interface{}) mb.ReportingMetricSetV2WithContext {
metricSet := NewMetricSet(t, config)

reportingMetricSet, ok := metricSet.(mb.ReportingMetricSetV2WithContext)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSetV2WithContext")
}

return reportingMetricSet
}

// CapturingReporterV2 is a reporter used for testing which stores all events and errors
type CapturingReporterV2 struct {
events []mb.Event
Expand Down Expand Up @@ -229,6 +242,17 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
return r.events, r.errs
}

// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
r := &CapturingReporterV2{}
err := metricSet.Fetch(context.Background(), r)
if err != nil {
r.errs = append(r.errs, err)
}
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/docker/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch returns a list of all containers as events.
// This is based on https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/list-containers.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
func (m *MetricSet) Fetch(ctx context.Context, r mb.ReporterV2) error {
// Fetch a list of all containers.
containers, err := m.dockerClient.ContainerList(context.Background(), types.ContainerListOptions{})
containers, err := m.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
if err != nil {
return errors.Wrap(err, "failed to get docker containers list")
}
Expand Down
12 changes: 2 additions & 10 deletions metricbeat/module/docker/container/container_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@ package container
import (
"testing"

"github.com/stretchr/testify/assert"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2WithContext(t, getConfig())
if err := mbtest.WriteEventsReporterV2WithContext(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down

0 comments on commit b1ceef9

Please sign in to comment.