Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer metricset #2977

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor kafka for more metricsets
ruflin committed Nov 17, 2016
commit 9e35643ef495e1fb9bcc8b75a6ac2b939aac003f
3 changes: 3 additions & 0 deletions libbeat/scripts/Makefile
Original file line number Diff line number Diff line change
@@ -301,6 +301,9 @@ build-image: write-environment
start-environment: stop-environment
${DOCKER_COMPOSE} up -d

start-env:
${DOCKER_COMPOSE} run beat bash

.PHONY: stop-environment
stop-environment:
-${DOCKER_COMPOSE} stop
14 changes: 3 additions & 11 deletions metricbeat/module/kafka/broker/broker.go
Original file line number Diff line number Diff line change
@@ -7,26 +7,19 @@ import (
metrics "github.com/rcrowley/go-metrics"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
// init adds broker metricset
func init() {
if err := mb.Registry.AddMetricSet("kafka", "broker", New); err != nil {
panic(err)
}
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
// MetricSet type defines broker metricset
type MetricSet struct {
mb.BaseMetricSet
counter int
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
// New creates new broker metricset
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

config := struct{}{}
@@ -37,7 +30,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

return &MetricSet{
BaseMetricSet: base,
counter: 1,
}, nil
}

23 changes: 23 additions & 0 deletions metricbeat/module/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/elastic/beats/metricbeat/mb"
)

func GetClient(client sarama.Client, metricset mb.MetricSet) (sarama.Client, error) {

if client == nil {
config := sarama.NewConfig()
config.Net.DialTimeout = metricset.Module().Config().Timeout
config.Net.ReadTimeout = metricset.Module().Config().Timeout
config.ClientID = "metricbeat"

var err error
client, err = sarama.NewClient([]string{metricset.Host()}, config)
if err != nil {
return nil, err
}
}
return client, nil
}
10 changes: 8 additions & 2 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/kafka"
)

// init registers the partition MetricSet with the central registry.
@@ -37,7 +38,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch partition stats list from kafka
func (m *MetricSet) Fetch() ([]common.MapStr, error) {

if m.client == nil {
var err error
m.client, err = kafka.GetClient(m.client, m)
if err != nil {
return nil, err
}
/*if m.client == nil {
config := sarama.NewConfig()
config.Net.DialTimeout = m.Module().Config().Timeout
config.Net.ReadTimeout = m.Module().Config().Timeout
@@ -48,7 +54,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
return nil, err
}
m.client = client
}
}*/

topics, err := m.client.Topics()
if err != nil {
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (

func TestData(t *testing.T) {

generateKafkaData(t)
kafka.GenerateKafkaData(t)

f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)