Skip to content

Commit

Permalink
PushMetricSetV2WithContext
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Apr 29, 2019
1 parent 0477eaa commit da007c5
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 10 deletions.
5 changes: 5 additions & 0 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func mustImplementFetcher(ms MetricSet) error {
if _, ok := ms.(PushMetricSetV2); ok {
ifcs = append(ifcs, "PushMetricSetV2")
}

if _, ok := ms.(PushMetricSetV2WithContext); ok {
ifcs = append(ifcs, "PushMetricSetV2WithContext")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
Expand Down
10 changes: 10 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ to implement Modules and their associated MetricSets.
package mb

import (
"context"
"fmt"
"net/url"
"time"
Expand Down Expand Up @@ -217,6 +218,15 @@ type PushMetricSetV2 interface {
Run(r PushReporterV2)
}

// PushMetricSetV2Context is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the context is closed.
type PushMetricSetV2WithContext interface {
MetricSet
Run(ctx context.Context, r ReporterV2)
}

// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data.
Expand Down
20 changes: 20 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package module

import (
"context"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -191,6 +192,8 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V1())
case mb.PushMetricSetV2:
ms.Run(reporter.V2())
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
Expand Down Expand Up @@ -313,6 +316,23 @@ func (r *eventReporter) V1() mb.PushReporter {
}
func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} }

// channelContext implements context.Context by wrapping a channel
type channelContext struct {
done <-chan struct{}
}

func (r *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
func (r *channelContext) Done() <-chan struct{} { return r.done }
func (r *channelContext) Err() error {
select {
case <-r.done:
return context.Canceled
default:
return nil
}
}
func (r *channelContext) Value(key interface{}) interface{} { return nil }

// reporterV1 wraps V2 to provide a v1 interface.
type reporterV1 struct {
v2 mb.PushReporterV2
Expand Down
44 changes: 42 additions & 2 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ that Metricbeat does it and with the same validations.
package testing

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -301,7 +302,21 @@ func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 {

pushMetricSet, ok := metricSet.(mb.PushMetricSetV2)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSet")
t.Fatal("MetricSet does not implement PushMetricSetV2")
}

return pushMetricSet
}

// NewPushMetricSetV2WithContext instantiates a new PushMetricSetV2WithContext
// using the given configuration. The ModuleFactory and MetricSetFactory are
// obtained from the global Registry.
func NewPushMetricSetV2WithContext(t testing.TB, config interface{}) mb.PushMetricSetV2WithContext {
metricSet := NewMetricSet(t, config)

pushMetricSet, ok := metricSet.(mb.PushMetricSetV2WithContext)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSetV2WithContext")
}

return pushMetricSet
Expand Down Expand Up @@ -345,7 +360,7 @@ func (r *capturingPushReporterV2) Done() <-chan struct{} {
// time and returns all of the events and errors that occur during that period.
func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event {
var (
r = &capturingPushReporterV2{doneC: make(chan struct{}), eventsC: make(chan mb.Event)}
r = &capturingPushReporterV2{eventsC: make(chan mb.Event)}
wg sync.WaitGroup
events []mb.Event
)
Expand Down Expand Up @@ -387,3 +402,28 @@ func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.Push
wg.Wait()
return events
}

// RunPushMetricSetV2WithContext run the given push metricset for the specific amount of
// time and returns all of the events that occur during that period.
func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2WithContext) []mb.Event {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
r := &capturingPushReporterV2{eventsC: make(chan mb.Event)}

go metricSet.Run(ctx, r)

var events []mb.Event
for {
select {
case <-ctx.Done():
// Timeout
return events
case e := <-r.eventsC:
events = append(events, e)
if len(events) >= waitEvents {
cancel()
return events
}
}
}
return events
}
8 changes: 3 additions & 5 deletions metricbeat/module/docker/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Run listens for docker events and reports them
func (m *MetricSet) Run(reporter mb.PushReporterV2) {
ctx, cancel := context.WithCancel(context.Background())
func (m *MetricSet) Run(ctx context.Context, reporter mb.ReporterV2) {
options := types.EventsOptions{
Since: fmt.Sprintf("%d", time.Now().Unix()),
}
Expand All @@ -100,16 +99,15 @@ func (m *MetricSet) Run(reporter mb.PushReporterV2) {
time.Sleep(1 * time.Second)
break WATCH

case <-reporter.Done():
case <-ctx.Done():
m.logger.Debug("docker", "event watcher stopped")
cancel()
return
}
}
}
}

func (m *MetricSet) reportEvent(reporter mb.PushReporterV2, event events.Message) {
func (m *MetricSet) reportEvent(reporter mb.ReporterV2, event events.Message) {
time := time.Unix(event.Time, 0)

attributes := make(map[string]string, len(event.Actor.Attributes))
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/module/docker/event/event_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package event

import (
"context"
"io"
"os"
"testing"
Expand All @@ -28,19 +29,18 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"golang.org/x/net/context"

"github.com/elastic/beats/auditbeat/core"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

func TestData(t *testing.T) {
ms := mbtest.NewPushMetricSetV2(t, getConfig())
ms := mbtest.NewPushMetricSetV2WithContext(t, getConfig())
var events []mb.Event
done := make(chan interface{})
go func() {
events = mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
events = mbtest.RunPushMetricSetV2WithContext(10*time.Second, 1, ms)
close(done)
}()

Expand Down

0 comments on commit da007c5

Please sign in to comment.