Skip to content

Commit

Permalink
[Heartbeat] Fix id/summary with multi-url configs (#10408)
Browse files Browse the repository at this point in the history
[Heartbeat] Fix id/summary with multi-url configs

When using configurations that specify multiple URLs in the config there were two issues:

1. They would all share the same ID, when IDs are supposed to be unique to an URL
2. The Job summary would summarize all the URLs, instead of all the DNS entries per URL.

This patch fixes this by introducing a new WrapAllSeparately wrapper that wraps top level
jobs with a new JobWrapperFactory, that lets us create JobWrapper instances with separate
scopes.
  • Loading branch information
andrewvc authored Jan 31, 2019
1 parent cb1c6fe commit 98e38be
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 198 deletions.
13 changes: 6 additions & 7 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/common/mapval"
btesting "github.com/elastic/beats/libbeat/testing"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func testRequest(t *testing.T, testURL string) *beat.Event {
Expand Down Expand Up @@ -192,7 +191,7 @@ func TestUpStatuses(t *testing.T) {
t.Run(fmt.Sprintf("Test OK HTTP status %d", status), func(t *testing.T) {
server, event := checkServer(t, hbtest.HelloWorldHandler(status))

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "http"),
Expand All @@ -212,7 +211,7 @@ func TestDownStatuses(t *testing.T) {
t.Run(fmt.Sprintf("test down status %d", status), func(t *testing.T) {
server, event := checkServer(t, hbtest.HelloWorldHandler(status))

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("127.0.0.1", "down", "http"),
Expand Down Expand Up @@ -249,7 +248,7 @@ func TestLargeResponse(t *testing.T) {
_, err = job(event)
require.NoError(t, err)

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "http"),
Expand Down Expand Up @@ -282,7 +281,7 @@ func runHTTPSServerCheck(

event := testTLSRequest(t, server.URL, mergedExtraConfig)

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "http"),
Expand Down Expand Up @@ -348,7 +347,7 @@ func TestConnRefusedJob(t *testing.T) {

event := testRequest(t, url)

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks(ip, "down", "http"),
Expand All @@ -370,7 +369,7 @@ func TestUnreachableJob(t *testing.T) {

event := testRequest(t, url)

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks(ip, "down", "http"),
Expand Down
9 changes: 4 additions & 5 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/mapval"
btesting "github.com/elastic/beats/libbeat/testing"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestUpEndpointJob(t *testing.T) {

event := testTCPCheck(t, "localhost", port)

mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "tcp"),
Expand Down Expand Up @@ -145,7 +144,7 @@ func TestTLSConnection(t *testing.T) {
defer os.Remove(certFile.Name())

event := testTLSTCPCheck(t, ip, port, certFile.Name())
mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.TLSChecks(0, 0, cert),
Expand All @@ -166,7 +165,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) {
event := testTCPCheck(t, ip, port)

dialErr := fmt.Sprintf("dial tcp %s:%d", ip, port)
mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
Expand All @@ -184,7 +183,7 @@ func TestUnreachableEndpointJob(t *testing.T) {
event := testTCPCheck(t, ip, port)

dialErr := fmt.Sprintf("dial tcp %s:%d", ip, port)
mapvaltest.Test(
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
Expand Down
22 changes: 21 additions & 1 deletion heartbeat/monitors/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package jobs

import "github.com/elastic/beats/libbeat/beat"
import (
"github.com/elastic/beats/libbeat/beat"
)

// A Job represents a unit of execution, and may return multiple continuation jobs.
type Job func(event *beat.Event) ([]Job, error)
Expand Down Expand Up @@ -46,6 +48,24 @@ func WrapAll(jobs []Job, wrappers ...JobWrapper) []Job {
return wrapped
}

// JobWrapperFactory can be used to created new instances of JobWrappers.
type JobWrapperFactory func() JobWrapper

// WrapAllSeparately wraps the given jobs using the given JobWrapperFactory instances.
// This enables us to use a different JobWrapper for the jobs passed in, but recursively apply
// the same wrapper to their children.
func WrapAllSeparately(jobs []Job, factories ...JobWrapperFactory) []Job {
var wrapped []Job
for _, j := range jobs {
for _, factory := range factories {
wrapper := factory()
j = Wrap(j, wrapper)
}
wrapped = append(wrapped, j)
}
return wrapped
}

// Wrap wraps the given Job and also any continuations with the given JobWrapper.
func Wrap(job Job, wrapper JobWrapper) Job {
return func(event *beat.Event) ([]Job, error) {
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/jobs/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/mapval"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func TestWrapAll(t *testing.T) {
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestWrapAll(t *testing.T) {
fr := results[idx].Fields

validator := mapval.Strict(mapval.MustCompile(rf))
mapvaltest.Test(t, validator, fr)
mapval.Test(t, validator, fr)
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/mapval"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/scheduler"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func TestMonitor(t *testing.T) {
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestMonitor(t *testing.T) {
pcClient.Close()

for _, event := range pcClient.Publishes() {
mapvaltest.Test(t, mockEventMonitorValidator(""), event.Fields)
mapval.Test(t, mockEventMonitorValidator(""), event.Fields)
}
} else {
// Let's yield this goroutine so we don't spin
Expand Down
61 changes: 42 additions & 19 deletions heartbeat/monitors/wrappers/monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,61 @@ import (
"time"

"github.com/gofrs/uuid"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"

"github.com/elastic/beats/heartbeat/eventext"
"github.com/elastic/beats/heartbeat/look"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// WrapCommon applies the common wrappers that all monitor jobs get.
func WrapCommon(js []jobs.Job, id string, name string, typ string) []jobs.Job {
return jobs.WrapAll(
js,
addMonitorStatus,
addMonitorDuration,
addMonitorMeta(id, name, typ),
makeAddSummary(id, uint16(len(js))),
)
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorStatus,
addMonitorDuration,
), func() jobs.JobWrapper {
return addMonitorMeta(id, name, typ, len(js) > 1)
}, func() jobs.JobWrapper {
return makeAddSummary()
})
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(id string, name string, typ string) jobs.JobWrapper {
func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return WithFields(
common.MapStr{
"monitor": common.MapStr{
"id": id,
"name": name,
"type": typ,
return func(event *beat.Event) ([]jobs.Job, error) {
cont, e := job(event)
thisID := id

if isMulti {
url, err := event.GetValue("url.full")
if err != nil {
logp.Error(errors.Wrap(err, "Mandatory url.full key missing!"))
url = "n/a"
}
urlHash, _ := hashstructure.Hash(url, nil)
thisID = fmt.Sprintf("%s-%x", id, urlHash)
}

eventext.MergeEventFields(
event,
common.MapStr{
"monitor": common.MapStr{
"id": thisID,
"name": name,
"type": typ,
},
},
},
job,
)
)

return cont, e
}
}
}

Expand Down Expand Up @@ -99,7 +122,7 @@ func addMonitorDuration(job jobs.Job) jobs.Job {
}

// makeAddSummary summarizes the job, adding the `summary` field to the last event emitted.
func makeAddSummary(id string, numJobs uint16) jobs.JobWrapper {
func makeAddSummary() jobs.JobWrapper {
// This is a tricky method. The way this works is that we track the state across jobs in the
// state struct here.
state := struct {
Expand All @@ -114,7 +137,7 @@ func makeAddSummary(id string, numJobs uint16) jobs.JobWrapper {
}
// Note this is not threadsafe, must be called from a mutex
resetState := func() {
state.remaining = numJobs
state.remaining = 1
state.up = 0
state.down = 0
state.generation++
Expand Down
Loading

0 comments on commit 98e38be

Please sign in to comment.