Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance kafka broker matching #3129

Merged
merged 1 commit into from
Dec 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Added new flags to import_dashboards (-cacert, -cert, -key, -insecure). {pull}3139[3139] {pull}3163[3163]

*Metricbeat*
- Kafka module broker matching enhancements. {pull}3129[3129]


*Packetbeat*
Expand Down
190 changes: 178 additions & 12 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package partition

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -32,10 +36,12 @@ type MetricSet struct {
topics []string
}

var noID int32 = -1
const noID int32 = -1

var errFailQueryOffset = errors.New("operation failed")

var debugf = logp.MakeDebug("kafka")

// New create a new instance of the partition MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig
Expand Down Expand Up @@ -91,20 +97,15 @@ func (m *MetricSet) connect() (*sarama.Broker, error) {
return nil, err
}

addr := b.Addr()
for _, other := range meta.Brokers {
if other.Addr() == addr {
m.id = other.ID()
break
}
}

if m.id == noID {
other := findMatchingBroker(b.Addr(), meta.Brokers)
if other == nil { // no broker found
closeBroker(b)
err = fmt.Errorf("No advertised broker with address %v found", addr)
return nil, err
return nil, fmt.Errorf("No advertised broker with address %v found", b.Addr())
}

debugf("found matching broker %v with id %v", other.Addr(), other.ID())

m.id = other.ID()
return b, nil
}

Expand Down Expand Up @@ -322,3 +323,168 @@ func checkRetryQuery(err error) (retry, reconnect bool) {

return false, false
}

func findMatchingBroker(
addr string,
brokers []*sarama.Broker,
) *sarama.Broker {
debugf("Try to match broker to: %v", addr)

// compare connection address to list of broker addresses
for _, b := range brokers {
if b.Addr() == addr {
return b
}
}

// get connection 'port'
_, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
port = "9092"
}

// lookup local machines ips for comparing with broker addresses
localIPs, err := interfaceIPs()
if err != nil || len(localIPs) == 0 {
return nil
}
debugf("local machine ips: %v", localIPs)

// try to find broker by comparing the fqdn for each known ip to list of
// brokers
localHosts := lookupHosts(localIPs)
debugf("local machine addresses: %v", localHosts)
for _, host := range localHosts {
debugf("try to match with fqdn: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
}
}

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
debugf("try to match with hostname only: %v (%v)", host, port)

if b := findBroker(host, port, brokers); b != nil {
return b
}
}

// lookup ips for all brokers
debugf("match by ips")
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())
bh, bp, err := net.SplitHostPort(b.Addr())
if err != nil {
continue
}

// port numbers do not match
if bp != port {
continue
}

// lookup all ips for brokers host:
ips, err := net.LookupIP(bh)
debugf("broker %v ips: %v, %v", bh, ips, err)
if err != nil {
continue
}

debugf("broker (%v) ips: %v", bh, ips)

// check if ip is known
if ipsMatch(ips, localIPs) {
return b
}
}

return nil
}

func findBroker(host, port string, brokers []*sarama.Broker) *sarama.Broker {
for _, b := range brokers {
debugf("test broker address: %v", b.Addr())

bh, bp, err := net.SplitHostPort(b.Addr())
if err != nil {
debugf("failed to parse broker address: %v", err)
continue
}

if bh == host && port == bp {
return b
}
}

return nil
}

func interfaceIPs() ([]net.IP, error) {
var ips []net.IP
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
var ip net.IP
ok := true

switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
debugf("non ip address: %v", addr)
ok = false
}

if !ok {
continue
}

ips = append(ips, ip)
}
return ips, nil
}

func lookupHosts(ips []net.IP) []string {
set := map[string]struct{}{}
for _, ip := range ips {
txt, err := ip.MarshalText()
if err != nil {
continue
}

hosts, err := net.LookupAddr(string(txt))
debugf("lookup %v => %v, %v", string(txt), hosts, err)
if err != nil {
continue
}

for _, host := range hosts {
h := strings.TrimSuffix(host, ".")
set[h] = struct{}{}
}
}

hosts := make([]string, 0, len(set))
for host := range set {
hosts = append(hosts, host)
}
return hosts
}

func ipsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
return true
}
}
}
return false
}