Skip to content

Commit

Permalink
Avoid sending non-numeric floats in cloud foundry integrations (elast…
Browse files Browse the repository at this point in the history
…ic#22634)

Cloud Foundry integrations are sending some values as they are received
from the Firehose, some of these values can be floats with non-numeric
values (NaN/Inf), that are not supported by JSON and Elasticsearch.

Add defensive code to avoid sending these values to the outputs.

Also, add unit tests using mocked cloud foundry hubs.
  • Loading branch information
jsoriano committed Nov 25, 2020
1 parent 2ae52c3 commit 0619788
Show file tree
Hide file tree
Showing 16 changed files with 686 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change Session ID type from int to string {pull}22359[22359]
- Fix filesystem types on Windows in filesystem metricset. {pull}22531[22531]
- Fix failiures caused by custom beat names with more than 15 characters {pull}22550[22550]
- Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634]
- Update NATS dashboards to leverage connection and route metricsets {pull}22646[22646]

*Packetbeat*
Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF
if !filterFn(env) {
continue
}
event := envelopeToEvent(env)
event := EnvelopeToEvent(env)
if event == nil {
c.log.Debugf("Envelope couldn't be converted to event: %+v", env)
continue
Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/common/cloudfoundry/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func newEventError(env *events.Envelope) *EventError {
}
}

func envelopeToEvent(env *events.Envelope) Event {
func EnvelopeToEvent(env *events.Envelope) Event {
switch *env.EventType {
case events.Envelope_HttpStartStop:
return newEventHttpAccess(env)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/common/cloudfoundry/rlplistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *RlpListener) Start(ctx context.Context) {
for i := range envelopes {
v1s := conversion.ToV1(envelopes[i])
for _, v := range v1s {
evt := envelopeToEvent(v)
evt := EnvelopeToEvent(v)
if evt.EventType() == EventTypeHttpAccess && c.callbacks.HttpAccess != nil {
c.callbacks.HttpAccess(evt.(*EventHttpAccess))
} else if evt.EventType() == EventTypeLog && c.callbacks.Log != nil {
Expand Down
13 changes: 12 additions & 1 deletion x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,30 @@ func init() {
}

type Module interface {
mb.Module
RunCounterReporter(mb.PushReporterV2)
RunContainerReporter(mb.PushReporterV2)
RunValueReporter(mb.PushReporterV2)
}

func newModule(base mb.BaseModule) (mb.Module, error) {
factory := func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub {
return &HubAdapter{cfcommon.NewHub(cfg, name, log)}
}
return NewModuleWithHubFactory(base, factory)
}

type hubFactory func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub

// NewModuleWithHubFactory initializes a module with a hub created with a hub factory
func NewModuleWithHubFactory(base mb.BaseModule, hubFactory hubFactory) (mb.Module, error) {
var cfg cfcommon.Config
if err := base.UnpackConfig(&cfg); err != nil {
return nil, err
}

log := logp.NewLogger("cloudfoundry")
hub := cfcommon.NewHub(&cfg, "metricbeat", log)
hub := hubFactory(&cfg, "metricbeat", log)

switch cfg.Version {
case cfcommon.ConsumerVersionV1:
Expand Down
21 changes: 20 additions & 1 deletion x-pack/metricbeat/module/cloudfoundry/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
)

Expand Down Expand Up @@ -41,5 +42,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Run method provides the module with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporterV2) {
m.mod.RunContainerReporter(reporter)
m.mod.RunContainerReporter(&containerReporter{reporter, m.Logger()})
}

type containerReporter struct {
mb.PushReporterV2

logger *logp.Logger
}

func (r *containerReporter) Event(event mb.Event) bool {
cpuPctKey := "cloudfoundry.container.cpu.pct"
found, err := cloudfoundry.HasNonNumericFloat(event.RootFields, cpuPctKey)
if err != nil {
r.logger.Debugf("Unexpected failure while checking for non-numeric values: %v", err)
}
if found {
event.RootFields.Delete(cpuPctKey)
}
return r.PushReporterV2.Event(event)
}
163 changes: 163 additions & 0 deletions x-pack/metricbeat/module/cloudfoundry/container/container_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !integration

package container

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cloudfoundry/sonde-go/events"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func init() {
if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil {
panic(err)
}
mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet,
mb.WithHostParser(parse.EmptyHostParser),
mb.DefaultMetricSet(),
)
}

func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return New(base)
}

func TestMetricSet(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

config := map[string]interface{}{
"module": "cloudfoundrytest",
"client_id": "dummy",
"client_secret": "dummy",
"api_address": "dummy",
"shard_id": "dummy",
}

ms := mbtest.NewPushMetricSetV2(t, config)
hub := ms.Module().(*mtest.ModuleMock).Hub

go func() {
hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34}))
}()

events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
require.NotEmpty(t, events)

expectedFields := common.MapStr{
"cloudfoundry.app.id": "1234",
"cloudfoundry.container.cpu.pct": float64(12.34),
"cloudfoundry.container.disk.bytes": uint64(0),
"cloudfoundry.container.disk.quota.bytes": uint64(0),
"cloudfoundry.container.instance_index": int32(0),
"cloudfoundry.container.memory.bytes": uint64(1024),
"cloudfoundry.container.memory.quota.bytes": uint64(0),
"cloudfoundry.envelope.deployment": "test",
"cloudfoundry.envelope.index": "index",
"cloudfoundry.envelope.ip": "127.0.0.1",
"cloudfoundry.envelope.job": "test",
"cloudfoundry.envelope.origin": "test",
"cloudfoundry.type": "container",
}
require.Equal(t, expectedFields, events[0].RootFields.Flatten())
}

func TestMetricValuesAreNumbers(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

config := map[string]interface{}{
"module": "cloudfoundrytest",
"client_id": "dummy",
"client_secret": "dummy",
"api_address": "dummy",
"shard_id": "dummy",
}

ms := mbtest.NewPushMetricSetV2(t, config)
hub := ms.Module().(*mtest.ModuleMock).Hub

go func() {
hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "0000", memory: 1024, cpupct: math.NaN()}))
hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34}))
}()

events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms)
require.NotEmpty(t, events)

for _, e := range events {
memory, err := e.RootFields.GetValue("cloudfoundry.container.memory.bytes")
if assert.NoError(t, err, "checking memory") {
assert.Equal(t, uint64(1024), memory.(uint64))
}

app, err := e.RootFields.GetValue("cloudfoundry.app.id")
require.NoError(t, err, "getting app id")

cpuPctKey := "cloudfoundry.container.cpu.pct"
switch app {
case "0000":
_, err := e.RootFields.GetValue(cpuPctKey)
require.Error(t, err, "non-numeric metric shouldn't be there")
case "1234":
v, err := e.RootFields.GetValue(cpuPctKey)
if assert.NoError(t, err, "checking cpu pct") {
assert.Equal(t, 12.34, v.(float64))
}
default:
t.Errorf("unexpected app: %s", app)
}
}
}

type containerMetrics struct {
app string
instance int32
cpupct float64
memory uint64
disk uint64
memoryQuota uint64
diskQuota uint64
}

func containerMetricsEnvelope(metrics containerMetrics) *events.Envelope {
eventType := events.Envelope_ContainerMetric
origin := "test"
deployment := "test"
job := "test"
ip := "127.0.0.1"
index := "index"
timestamp := time.Now().Unix()
return &events.Envelope{
EventType: &eventType,
Timestamp: &timestamp,
Origin: &origin,
Deployment: &deployment,
Job: &job,
Ip: &ip,
Index: &index,
ContainerMetric: &events.ContainerMetric{
ApplicationId: &metrics.app,
InstanceIndex: &metrics.instance,
CpuPercentage: &metrics.cpupct,
MemoryBytes: &metrics.memory,
DiskBytes: &metrics.disk,
MemoryBytesQuota: &metrics.memoryQuota,
DiskBytesQuota: &metrics.diskQuota,
},
}
}
96 changes: 96 additions & 0 deletions x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !integration

package counter

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cloudfoundry/sonde-go/events"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func init() {
if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil {
panic(err)
}
mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet,
mb.WithHostParser(parse.EmptyHostParser),
mb.DefaultMetricSet(),
)
}

func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return New(base)
}

func TestMetricSet(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

config := map[string]interface{}{
"module": "cloudfoundrytest",
"client_id": "dummy",
"client_secret": "dummy",
"api_address": "dummy",
"shard_id": "dummy",
}

ms := mbtest.NewPushMetricSetV2(t, config)
hub := ms.Module().(*mtest.ModuleMock).Hub

go func() {
hub.SendEnvelope(counterMetricEnvelope("requests", 1234, 123))
}()

events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
require.NotEmpty(t, events)

expectedFields := common.MapStr{
"cloudfoundry.counter.delta": uint64(123),
"cloudfoundry.counter.name": "requests",
"cloudfoundry.counter.total": uint64(1234),
"cloudfoundry.envelope.deployment": "test",
"cloudfoundry.envelope.index": "index",
"cloudfoundry.envelope.ip": "127.0.0.1",
"cloudfoundry.envelope.job": "test",
"cloudfoundry.envelope.origin": "test",
"cloudfoundry.type": "counter",
}
require.Equal(t, expectedFields, events[0].RootFields.Flatten())
}

func counterMetricEnvelope(name string, total uint64, delta uint64) *events.Envelope {
eventType := events.Envelope_CounterEvent
origin := "test"
deployment := "test"
job := "test"
ip := "127.0.0.1"
index := "index"
timestamp := time.Now().Unix()
return &events.Envelope{
EventType: &eventType,
Timestamp: &timestamp,
Origin: &origin,
Deployment: &deployment,
Job: &job,
Ip: &ip,
Index: &index,
CounterEvent: &events.CounterEvent{
Name: &name,
Total: &total,
Delta: &delta,
},
}
}
Loading

0 comments on commit 0619788

Please sign in to comment.