Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metricbeat logstash module: accept override cluster UUID from Logstash #15795

Merged
merged 6 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]
- Add collecting AuroraDB metrics in rds metricset. {issue}14142[14142] {pull}16004[16004]
- Reuse connections in SQL module. {pull}16001[16001]
- Improve the `logstash` module (when `xpack.enabled` is set to `true`) to use the override `cluster_uuid` returned by Logstash APIs. {issue}15772[15772] {pull}15795[15795]

*Packetbeat*

Expand Down
35 changes: 30 additions & 5 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,24 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
}, nil
}

// GetPipelines returns the list of pipelines running on a Logstash node
func GetPipelines(m *MetricSet) ([]PipelineState, error) {
// GetPipelines returns the list of pipelines running on a Logstash node and,
// optionally, an override cluster UUID.
func GetPipelines(m *MetricSet) ([]PipelineState, string, error) {
content, err := fetchPath(m.HTTP, "_node/pipelines", "graph=true")
if err != nil {
return nil, errors.Wrap(err, "could not fetch node pipelines")
return nil, "", errors.Wrap(err, "could not fetch node pipelines")
}

pipelinesResponse := struct {
Monitoring struct {
ClusterID string `json:"cluster_uuid"`
} `json:"monitoring"`
Pipelines map[string]PipelineState `json:"pipelines"`
}{}

err = json.Unmarshal(content, &pipelinesResponse)
if err != nil {
return nil, errors.Wrap(err, "could not parse node pipelines response")
return nil, "", errors.Wrap(err, "could not parse node pipelines response")
}

var pipelines []PipelineState
Expand All @@ -156,7 +160,7 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) {
pipelines = append(pipelines, pipeline)
}

return pipelines, nil
return pipelines, pipelinesResponse.Monitoring.ClusterID, nil
}

// CheckPipelineGraphAPIsAvailable returns an error if pipeline graph APIs are not
Expand All @@ -177,6 +181,27 @@ func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error {
return nil
}

// GetVertexClusterUUID returns the correct cluster UUID value for the given Elasticsearch
// vertex from a Logstash pipeline. If the vertex has no cluster UUID associated with it,
// the given override cluster UUID is returned.
func GetVertexClusterUUID(vertex map[string]interface{}, overrideClusterUUID string) string {
c, ok := vertex["cluster_uuid"]
if !ok {
return overrideClusterUUID
}

clusterUUID, ok := c.(string)
if !ok {
return overrideClusterUUID
}

if clusterUUID == "" {
return overrideClusterUUID
}

return clusterUUID
}

func (m *MetricSet) getVersion() (*common.Version, error) {
const rootPath = "/"
content, err := fetchPath(m.HTTP, rootPath, "")
Expand Down
59 changes: 59 additions & 0 deletions metricbeat/module/logstash/logstash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetVertexClusterUUID(t *testing.T) {
tests := map[string]struct {
vertex map[string]interface{}
overrideClusterUUID string
expectedClusterUUID string
}{
"vertex_and_override": {
map[string]interface{}{
"cluster_uuid": "v",
},
"o",
"v",
},
"vertex_only": {
vertex: map[string]interface{}{
"cluster_uuid": "v",
},
expectedClusterUUID: "v",
},
"override_only": {
overrideClusterUUID: "o",
expectedClusterUUID: "o",
},
"none": {
expectedClusterUUID: "",
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.expectedClusterUUID, GetVertexClusterUUID(test.vertex, test.overrideClusterUUID))
})
}
}
19 changes: 6 additions & 13 deletions metricbeat/module/logstash/node/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/elastic/beats/metricbeat/module/logstash"
)

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState) error {
func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState, overrideClusterUUID string) error {
pipelines = getUserDefinedPipelines(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, overrideClusterUUID)
for clusterUUID, pipelines := range clusterToPipelinesMap {
for _, pipeline := range pipelines {
removeClusterUUIDsFromPipeline(pipeline)
Expand Down Expand Up @@ -62,24 +62,17 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.Pipel
return nil
}

func makeClusterToPipelinesMap(pipelines []logstash.PipelineState) map[string][]logstash.PipelineState {
func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClusterUUID string) map[string][]logstash.PipelineState {
var clusterToPipelinesMap map[string][]logstash.PipelineState
clusterToPipelinesMap = make(map[string][]logstash.PipelineState)

for _, pipeline := range pipelines {
var clusterUUIDs []string
for _, vertex := range pipeline.Graph.Graph.Vertices {
c, ok := vertex["cluster_uuid"]
if !ok {
continue
}

clusterUUID, ok := c.(string)
if !ok {
continue
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

// If no cluster UUID was found in this pipeline, assign it a blank one
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/logstash/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return eventMapping(r, content)
}

pipelinesContent, err := logstash.GetPipelines(m.MetricSet)
pipelinesContent, overrideClusterUUID, err := logstash.GetPipelines(m.MetricSet)
if err != nil {
m.Logger().Error(err)
return nil
}

err = eventMappingXPack(r, m, pipelinesContent)
err = eventMappingXPack(r, m, pipelinesContent, overrideClusterUUID)
if err != nil {
m.Logger().Error(err)
}
Expand Down
27 changes: 15 additions & 12 deletions metricbeat/module/logstash/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"time"

"github.com/elastic/beats/metricbeat/module/logstash"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -94,6 +96,9 @@ type nodeInfo struct {
Status string `json:"status"`
HTTPAddress string `json:"http_address"`
Pipeline pipeline `json:"pipeline"`
Monitoring struct {
ClusterID string `json:"cluster_uuid"`
} `json:"monitoring"`
}

type reloads struct {
Expand Down Expand Up @@ -166,7 +171,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
}

pipelines = getUserDefinedPipelines(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, nodeStats.Monitoring.ClusterID)

for clusterUUID, clusterPipelines := range clusterToPipelinesMap {
logstashStats := LogstashStats{
Expand Down Expand Up @@ -197,24 +202,22 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
return nil
}

func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats {
func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID string) map[string][]PipelineStats {
var clusterToPipelinesMap map[string][]PipelineStats
clusterToPipelinesMap = make(map[string][]PipelineStats)

if overrideClusterUUID != "" {
clusterToPipelinesMap[overrideClusterUUID] = pipelines
return clusterToPipelinesMap
}

for _, pipeline := range pipelines {
var clusterUUIDs []string
for _, vertex := range pipeline.Vertices {
c, ok := vertex["cluster_uuid"]
if !ok {
continue
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

clusterUUID, ok := c.(string)
if !ok {
continue
}

clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

// If no cluster UUID was found in this pipeline, assign it a blank one
Expand Down