From 564549122048eb77f22b6cff1a559ff45030ca98 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 24 Jun 2020 19:03:29 +0200 Subject: [PATCH] Add support for cloudfoundry metrics collection using consumer v1 API (#19268) As previously done for the Filebeat input, add support for metrics collection in the Metricbeat module using the consumer v1 API. It reuses the same Doppler consumer used in Filebeat. v2 API is still supported, and can be selected by adding `version: v2` to the configuration, v1 is used by default as is in principle more reliable. --- CHANGELOG.next.asciidoc | 1 + metricbeat/docs/modules/cloudfoundry.asciidoc | 9 +- .../common/cloudfoundry/dopplerconsumer.go | 8 +- x-pack/metricbeat/metricbeat.reference.yml | 1 + .../cloudfoundry/_meta/config.reference.yml | 1 + .../module/cloudfoundry/_meta/docs.asciidoc | 8 +- .../module/cloudfoundry/cloudfoundry.go | 126 ++---------- .../cloudfoundry/container/container.go | 4 +- .../container/container_integration_test.go | 14 ++ .../module/cloudfoundry/counter/counter.go | 4 +- .../counter/counter_integration_test.go | 14 ++ x-pack/metricbeat/module/cloudfoundry/v1.go | 164 ++++++++++++++++ .../metricbeat/module/cloudfoundry/v1_test.go | 181 ++++++++++++++++++ x-pack/metricbeat/module/cloudfoundry/v2.go | 128 +++++++++++++ .../module/cloudfoundry/value/value.go | 4 +- .../value/value_integration_test.go | 14 ++ 16 files changed, 558 insertions(+), 123 deletions(-) create mode 100644 x-pack/metricbeat/module/cloudfoundry/v1.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/v1_test.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/v2.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 224aada05d0..e6985762611 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -505,6 +505,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add memory metrics into compute googlecloud. {pull}18802[18802] - Add new fields to HAProxy module. {issue}18523[18523] - Add Tomcat overview dashboard {pull}14026[14026] +- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268] *Packetbeat* diff --git a/metricbeat/docs/modules/cloudfoundry.asciidoc b/metricbeat/docs/modules/cloudfoundry.asciidoc index 0baeb3f4180..3a8ba132280 100644 --- a/metricbeat/docs/modules/cloudfoundry.asciidoc +++ b/metricbeat/docs/modules/cloudfoundry.asciidoc @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` @@ -130,6 +136,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 ---- [float] diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 8d068a4fd84..10ea50dd928 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF } cb(event) case err := <-errChan: - // This error is an error on the connection, not a cloud foundry - // error envelope. Firehose should be able to reconnect, so just log it. - c.log.Infof("Error received on firehose: %v", err) + if err != nil { + // This error is an error on the connection, not a cloud foundry + // error envelope. Firehose should be able to reconnect, so just log it. + c.log.Infof("Error received on firehose: %v", err) + } case <-c.stop: return } diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index e4bf6b13d63..5b5013a4bc3 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -372,6 +372,7 @@ metricbeat.modules: rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 #----------------------------- CockroachDB Module ----------------------------- - module: cockroachdb diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml index c157d5deeff..be15db23b65 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/config.reference.yml @@ -10,3 +10,4 @@ rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}' client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}' client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}' + version: v1 diff --git a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc index 762d3e4f34f..b36a41bf891 100644 --- a/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr [float] === `rlp_address` -The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)". +The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". [float] === `client_id` @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "". Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events from the RLP Gateway. Default: "(generated UUID)". +[float] +==== `version` + +Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control. +Use `v2` to collect events from the RLP Gateway. Default: "`v1`". + [float] === `ssl` diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 96cded6fa7e..961827469dd 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -5,8 +5,7 @@ package cloudfoundry import ( - "context" - "sync" + "fmt" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" @@ -16,26 +15,18 @@ import ( // ModuleName is the name of this module. const ModuleName = "cloudfoundry" -type Module struct { - mb.BaseModule - - log *logp.Logger - - hub *cfcommon.Hub - listener *cfcommon.RlpListener - listenerLock sync.Mutex - - counterReporter mb.PushReporterV2 - valueReporter mb.PushReporterV2 - containerReporter mb.PushReporterV2 -} - func init() { if err := mb.Registry.AddModule(ModuleName, newModule); err != nil { panic(err) } } +type Module interface { + RunCounterReporter(mb.PushReporterV2) + RunContainerReporter(mb.PushReporterV2) + RunValueReporter(mb.PushReporterV2) +} + func newModule(base mb.BaseModule) (mb.Module, error) { var cfg cfcommon.Config if err := base.UnpackConfig(&cfg); err != nil { @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) { log := logp.NewLogger("cloudfoundry") hub := cfcommon.NewHub(&cfg, "metricbeat", log) - // early check that listener can be created - _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) - if err != nil { - return nil, err - - } - - return &Module{ - BaseModule: base, - log: log, - hub: hub, - }, nil -} - -func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(reporter, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(nil, m.valueReporter, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunValueReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, reporter, m.containerReporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, nil, m.containerReporter) - m.listenerLock.Unlock() -} - -func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) { - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, reporter) - m.listenerLock.Unlock() - - <-reporter.Done() - - m.listenerLock.Lock() - m.runReporters(m.counterReporter, m.valueReporter, nil) - m.listenerLock.Unlock() -} - -func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { - if m.listener != nil { - m.listener.Stop() - m.listener = nil - } - m.counterReporter = counterReporter - m.valueReporter = valueReporter - m.containerReporter = containerReporter - - start := false - callbacks := cfcommon.RlpListenerCallbacks{} - if m.counterReporter != nil { - start = true - callbacks.Counter = func(evt *cfcommon.EventCounter) { - m.counterReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.valueReporter != nil { - start = true - callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { - m.valueReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if m.containerReporter != nil { - start = true - callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { - m.containerReporter.Event(mb.Event{ - Timestamp: evt.Timestamp(), - RootFields: evt.ToFields(), - }) - } - } - if start { - l, err := m.hub.RlpListener(callbacks) - if err != nil { - m.log.Errorf("failed to create RlpListener: %v", err) - return - } - l.Start(context.Background()) - m.listener = l + switch cfg.Version { + case cfcommon.ConsumerVersionV1: + return newModuleV1(base, hub, log) + case cfcommon.ConsumerVersionV2: + return newModuleV2(base, hub, log) + default: + return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version) } } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container.go b/x-pack/metricbeat/module/cloudfoundry/container/container.go index 73019fb8808..4f8c6227103 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go index b05683b487e..e871a5823fc 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "container") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go index 10022f87d04..53d3833d810 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go index 6a87ce6f951..44cb4935e70 100644 --- a/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "counter") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go new file mode 100644 index 00000000000..7d9daf24673 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -0,0 +1,164 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +type ModuleV1 struct { + mb.BaseModule + + log *logp.Logger + + running atomic.Bool + consumer *cfcommon.DopplerConsumer + + events chan cfcommon.Event + subscriptions chan subscription +} + +func newModuleV1(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV1, error) { + m := ModuleV1{ + BaseModule: base, + log: log, + running: atomic.MakeBool(false), + } + consumer, err := hub.DopplerConsumer(cfcommon.DopplerCallbacks{ + Metric: m.callback, + }) + if err != nil { + return nil, err + } + m.consumer = consumer + m.events = make(chan cfcommon.Event) + m.subscriptions = make(chan subscription) + + return &m, nil +} + +func (m *ModuleV1) RunCounterReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeCounter, reporter) + defer m.unsubscribe(cfcommon.EventTypeCounter, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunValueReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeValueMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeValueMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) RunContainerReporter(reporter mb.PushReporterV2) { + m.subscribe(cfcommon.EventTypeContainerMetric, reporter) + defer m.unsubscribe(cfcommon.EventTypeContainerMetric, reporter) + <-reporter.Done() +} + +func (m *ModuleV1) subscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + go m.run() + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + } +} + +func (m *ModuleV1) unsubscribe(eventType cfcommon.EventType, reporter mb.PushReporterV2) { + m.subscriptions <- subscription{ + eventType: eventType, + reporter: reporter, + unsubscribe: true, + } +} + +func (m *ModuleV1) callback(event cfcommon.Event) { + m.events <- event +} + +func (m *ModuleV1) run() { + if !m.running.CAS(false, true) { + return + } + defer func() { m.running.Store(false) }() + + m.consumer.Run() + defer m.consumer.Stop() + + dispatcher := newEventDispatcher(m.log) + + for { + // Handle subscriptions and events dispatching on the same + // goroutine so locking is not needed. + select { + case e := <-m.events: + dispatcher.dispatch(e) + case s := <-m.subscriptions: + dispatcher.handleSubscription(s) + if dispatcher.empty() { + return + } + } + } +} + +type subscription struct { + eventType cfcommon.EventType + reporter mb.PushReporterV2 + + unsubscribe bool +} + +// eventDispatcher keeps track on the reporters that are subscribed to each event type +// and dispatches events to them when received. +type eventDispatcher struct { + log *logp.Logger + reporters map[cfcommon.EventType]mb.PushReporterV2 +} + +func newEventDispatcher(log *logp.Logger) *eventDispatcher { + return &eventDispatcher{ + log: log, + reporters: make(map[cfcommon.EventType]mb.PushReporterV2), + } +} + +func (d *eventDispatcher) handleSubscription(s subscription) { + current, subscribed := d.reporters[s.eventType] + if s.unsubscribe { + if !subscribed || current != s.reporter { + // This can happen if same metricset is used twice + d.log.Warnf("Ignoring unsubscription of not subscribed reporter for %s", s.eventType) + return + } + delete(d.reporters, s.eventType) + } else { + if subscribed { + if s.reporter != current { + // This can happen if same metricset is used twice + d.log.Warnf("Ignoring subscription of multiple reporters for %s", s.eventType) + } + return + } + d.reporters[s.eventType] = s.reporter + } +} + +func (d *eventDispatcher) dispatch(e cfcommon.Event) { + reporter, found := d.reporters[e.EventType()] + if !found { + return + } + reporter.Event(mb.Event{ + Timestamp: e.Timestamp(), + RootFields: e.ToFields(), + }) +} + +func (d *eventDispatcher) empty() bool { + return len(d.reporters) == 0 +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v1_test.go b/x-pack/metricbeat/module/cloudfoundry/v1_test.go new file mode 100644 index 00000000000..b30952a9189 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v1_test.go @@ -0,0 +1,181 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +func TestDispatcher(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + log := logp.NewLogger("cloudfoundry") + + assertEventType := func(t *testing.T, expected string, e mb.Event) { + t.Helper() + cf := e.RootFields["cloudfoundry"].(common.MapStr) + assert.Equal(t, expected, cf["type"]) + } + + waitFor := func(t *testing.T, expected string, r pushReporter) { + t.Helper() + select { + case e := <-r.events: + assertEventType(t, expected, e) + default: + t.Errorf("expected %s event", expected) + } + } + + t.Run("subscribe to one type", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) + + t.Run("subscribe and unsubscribe", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 1)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + unsubscribe: true, + }) + + assert.True(t, d.empty()) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-r.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + }) + + t.Run("subscribe to two types", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventValueMetric{}) + + waitFor(t, "counter", counterReporter) + waitFor(t, "value", valueReporter) + }) + + t.Run("subscribe to two types, receive only from one", func(t *testing.T) { + d := newEventDispatcher(log) + + counterReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &counterReporter, + }) + + valueReporter := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeValueMetric, + reporter: &valueReporter, + }) + + d.dispatch(&cfcommon.EventCounter{}) + d.dispatch(&cfcommon.EventCounter{}) + + select { + case <-valueReporter.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + + waitFor(t, "counter", counterReporter) + waitFor(t, "counter", counterReporter) + }) + + t.Run("subscribe twice to same type, ignore second", func(t *testing.T) { + d := newEventDispatcher(log) + first := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &first, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", first) + + second := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &second, + }) + + d.dispatch(&cfcommon.EventCounter{}) + select { + case <-second.events: + t.Errorf("shouldn't receive on this reporter") + default: + } + waitFor(t, "counter", first) + }) + + t.Run("unsubscribe not subscribed reporters, first one continues subscribed", func(t *testing.T) { + d := newEventDispatcher(log) + r := pushReporter{events: make(chan mb.Event, 2)} + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &r, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + + d.handleSubscription(subscription{ + eventType: cfcommon.EventTypeCounter, + reporter: &pushReporter{}, + unsubscribe: true, + }) + d.dispatch(&cfcommon.EventCounter{}) + waitFor(t, "counter", r) + }) +} + +type pushReporter struct { + events chan mb.Event +} + +func (r *pushReporter) Done() <-chan struct{} { return nil } +func (r *pushReporter) Error(err error) bool { return true } +func (r *pushReporter) Event(e mb.Event) bool { + r.events <- e + return true +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go new file mode 100644 index 00000000000..5cf7de6c103 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -0,0 +1,128 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +type ModuleV2 struct { + mb.BaseModule + + log *logp.Logger + + hub *cfcommon.Hub + listener *cfcommon.RlpListener + listenerLock sync.Mutex + + counterReporter mb.PushReporterV2 + valueReporter mb.PushReporterV2 + containerReporter mb.PushReporterV2 +} + +func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { + // early check that listener can be created + _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) + if err != nil { + return nil, err + + } + + return &ModuleV2{ + BaseModule: base, + log: log, + hub: hub, + }, nil +} + +func (m *ModuleV2) RunCounterReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(reporter, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(nil, m.valueReporter, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunValueReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, reporter, m.containerReporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, nil, m.containerReporter) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) RunContainerReporter(reporter mb.PushReporterV2) { + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, reporter) + m.listenerLock.Unlock() + + <-reporter.Done() + + m.listenerLock.Lock() + m.runReporters(m.counterReporter, m.valueReporter, nil) + m.listenerLock.Unlock() +} + +func (m *ModuleV2) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) { + if m.listener != nil { + m.listener.Stop() + m.listener = nil + } + m.counterReporter = counterReporter + m.valueReporter = valueReporter + m.containerReporter = containerReporter + + start := false + callbacks := cfcommon.RlpListenerCallbacks{} + if m.counterReporter != nil { + start = true + callbacks.Counter = func(evt *cfcommon.EventCounter) { + m.counterReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.valueReporter != nil { + start = true + callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) { + m.valueReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if m.containerReporter != nil { + start = true + callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) { + m.containerReporter.Event(mb.Event{ + Timestamp: evt.Timestamp(), + RootFields: evt.ToFields(), + }) + } + } + if start { + l, err := m.hub.RlpListener(callbacks) + if err != nil { + m.log.Errorf("failed to create RlpListener: %v", err) + return + } + l.Start(context.TODO()) + m.listener = l + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value.go b/x-pack/metricbeat/module/cloudfoundry/value/value.go index 7a30a2c67db..55cb6ca689a 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value.go @@ -25,14 +25,14 @@ func init() { type MetricSet struct { mb.BaseMetricSet - mod *cloudfoundry.Module + mod cloudfoundry.Module } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - mod, ok := base.Module().(*cloudfoundry.Module) + mod, ok := base.Module().(cloudfoundry.Module) if !ok { return nil, fmt.Errorf("must be child of cloudfoundry module") } diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go index 03d11bb6b7e..610a0a8e029 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value_integration_test.go @@ -13,12 +13,26 @@ import ( "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/logp" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" ) func TestFetch(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + t.Run("v1", func(t *testing.T) { + testFetch(t, "v1") + }) + + t.Run("v2", func(t *testing.T) { + testFetch(t, "v2") + }) +} + +func testFetch(t *testing.T, version string) { config := mtest.GetConfig(t, "value") + config["version"] = version ms := mbtest.NewPushMetricSetV2(t, config) events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)