diff --git a/glide.yaml b/glide.yaml index dcfa3088eee..446be0f62ba 100644 --- a/glide.yaml +++ b/glide.yaml @@ -62,7 +62,7 @@ import: - package: github.com/miekg/dns version: 5d001d020961ae1c184f9f8152fdc73810481677 - package: github.com/Shopify/sarama - version: fix/sasl-handshake + version: enh/offset-replica-id repo: https://github.com/urso/sarama - package: github.com/rcrowley/go-metrics version: ab2277b1c5d15c3cba104e9cbddbdfc622df5ad8 diff --git a/libbeat/docker-compose.yml b/libbeat/docker-compose.yml index e946aa9ce73..1620948e1b8 100644 --- a/libbeat/docker-compose.yml +++ b/libbeat/docker-compose.yml @@ -54,6 +54,8 @@ services: expose: - 9092 - 2181 + environment: + - ADVERTISED_HOST=kafka # Overloading kibana with a simple image as it is not needed here kibana: diff --git a/metricbeat/_meta/beat.full.yml b/metricbeat/_meta/beat.full.yml index 8d93ea40588..6b9b3c35f1b 100644 --- a/metricbeat/_meta/beat.full.yml +++ b/metricbeat/_meta/beat.full.yml @@ -94,6 +94,27 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" #------------------------------- MongoDB Module ------------------------------ #- module: mongodb diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 6c9d16a6177..00e568fe07b 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -1959,7 +1959,14 @@ Oldest offset of the partition. [float] -=== kafka.partition.partition +== partition Fields + +Partition data. + + + +[float] +=== kafka.partition.partition.id type: long @@ -1967,7 +1974,55 @@ Partition id. [float] -=== kafka.partition.topic +=== kafka.partition.partition.leader + +type: long + +Leader id (broker). + + +[float] +=== kafka.partition.partition.isr + +type: list + +List of isr ids. + + +[float] +=== kafka.partition.partition.replica + +type: long + +Replica id (broker). + + +[float] +=== kafka.partition.partition.insync_replica + +type: boolean + +Indicates if replica is included in the in-sync replicate set (ISR). + + +[float] +=== kafka.partition.partition.error.code + +type: long + +Error code from fetching partition. + + +[float] +=== kafka.partition.topic.error.code + +type: long + +topic error code. + + +[float] +=== kafka.partition.topic.name type: keyword diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index 8fee81b5054..39461fa8e74 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -24,6 +24,27 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" ---- [float] diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index dd073e4cc7d..f3d37c42b68 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -94,6 +94,27 @@ metricbeat.modules: #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" #------------------------------- MongoDB Module ------------------------------ #- module: mongodb diff --git a/metricbeat/metricbeat.template-es2x.json b/metricbeat/metricbeat.template-es2x.json index e5141696a17..32948d67e9f 100644 --- a/metricbeat/metricbeat.template-es2x.json +++ b/metricbeat/metricbeat.template-es2x.json @@ -950,12 +950,43 @@ } }, "partition": { - "type": "long" + "properties": { + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "id": { + "type": "long" + }, + "insync_replica": { + "type": "boolean" + }, + "leader": { + "type": "long" + }, + "replica": { + "type": "long" + } + } }, "topic": { - "ignore_above": 1024, - "index": "not_analyzed", - "type": "string" + "properties": { + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "name": { + "ignore_above": 1024, + "index": "not_analyzed", + "type": "string" + } + } } } } diff --git a/metricbeat/metricbeat.template.json b/metricbeat/metricbeat.template.json index 2edac8ca891..0749ed3d9b4 100644 --- a/metricbeat/metricbeat.template.json +++ b/metricbeat/metricbeat.template.json @@ -957,11 +957,42 @@ } }, "partition": { - "type": "long" + "properties": { + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "id": { + "type": "long" + }, + "insync_replica": { + "type": "boolean" + }, + "leader": { + "type": "long" + }, + "replica": { + "type": "long" + } + } }, "topic": { - "ignore_above": 1024, - "type": "keyword" + "properties": { + "error": { + "properties": { + "code": { + "type": "long" + } + } + }, + "name": { + "ignore_above": 1024, + "type": "keyword" + } + } } } } diff --git a/metricbeat/module/kafka/_meta/config.yml b/metricbeat/module/kafka/_meta/config.yml index b68543e7c2b..9880c06e272 100644 --- a/metricbeat/module/kafka/_meta/config.yml +++ b/metricbeat/module/kafka/_meta/config.yml @@ -4,3 +4,24 @@ #period: 10s #hosts: ["localhost:9092"] + #client_id: metricbeat + + #metadata.retries: 3 + #metadata.backoff: 250ms + + # List of Topics to query metadata for. If empty, all topics will be queried. + #topics: [] + + # Optional SSL. By default is off. + # List of root certificates for HTTPS server verifications + #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] + + # Certificate for SSL client authentication + #ssl.certificate: "/etc/pki/client/cert.pem" + + # Client Certificate Key + #ssl.key: "/etc/pki/client/cert.key" + + # SASL authentication + #username: "" + #password: "" diff --git a/metricbeat/module/kafka/partition/_meta/data.json b/metricbeat/module/kafka/partition/_meta/data.json index 34e0398e2b3..44d98e32ccf 100644 --- a/metricbeat/module/kafka/partition/_meta/data.json +++ b/metricbeat/module/kafka/partition/_meta/data.json @@ -11,14 +11,19 @@ "id": 0 }, "offset": { - "newest": 13, + "newest": 11, "oldest": 0 }, - "partition": 0, - "replicas": [ - 0 - ], - "topic": "testtopic" + "partition": { + "error": 0, + "id": 0, + "insync_replica": true, + "leader": 0, + "replica": 0 + }, + "topic": { + "name": "test-metricbeat-8760238589576171408" + } } }, "metricset": { diff --git a/metricbeat/module/kafka/partition/_meta/fields.yml b/metricbeat/module/kafka/partition/_meta/fields.yml index f6d76ccdc8c..8c7f923085c 100644 --- a/metricbeat/module/kafka/partition/_meta/fields.yml +++ b/metricbeat/module/kafka/partition/_meta/fields.yml @@ -16,14 +16,49 @@ type: long description: > Oldest offset of the partition. + - name: partition + type: group + description: > + Partition data. + fields: + - name: id + type: long + description: > + Partition id. + + - name: leader + type: long + description: > + Leader id (broker). + - name: isr + type: list + description: > + List of isr ids. + - name: replica + type: long + description: > + Replica id (broker). + + - name: insync_replica + type: boolean + description: > + Indicates if replica is included in the in-sync replicate set (ISR). + + - name: error.code + type: long + description: > + Error code from fetching partition. + + - name: topic.error.code type: long description: > - Partition id. - - name: topic + topic error code. + - name: topic.name type: keyword description: > Topic name + - name: broker.id type: long description: > @@ -32,3 +67,5 @@ type: keyword description: > Broker address + + diff --git a/metricbeat/module/kafka/partition/config.go b/metricbeat/module/kafka/partition/config.go new file mode 100644 index 00000000000..ba3da484e8f --- /dev/null +++ b/metricbeat/module/kafka/partition/config.go @@ -0,0 +1,41 @@ +package partition + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/outputs" +) + +type connConfig struct { + Metadata metaConfig `config:"metadata"` + TLS *outputs.TLSConfig `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + ClientID string `config:"client_id"` + Topics []string `config:"topics"` +} + +type metaConfig struct { + Retries int `config:"retries" validate:"min=0"` + Backoff time.Duration `config:"backoff" validate:"min=0"` +} + +var defaultConfig = connConfig{ + Metadata: metaConfig{ + Retries: 3, + Backoff: 250 * time.Millisecond, + }, + TLS: nil, + Username: "", + Password: "", + ClientID: "metricbeat", +} + +func (c *connConfig) Validate() error { + if c.Username != "" && c.Password == "" { + return fmt.Errorf("password must be set when username is configured") + } + + return nil +} diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 9a25640f3ff..6cb29d6987a 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -1,8 +1,14 @@ package partition import ( + "errors" + "fmt" + "io" + "time" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -19,75 +25,292 @@ func init() { // MetricSet type defines all fields of the partition MetricSet type MetricSet struct { mb.BaseMetricSet - client sarama.Client + + broker *sarama.Broker + cfg *sarama.Config + id int32 + topics []string } -// New creates a new instance of the partition MetricSet +var noID int32 = -1 + +var errFailQueryOffset = errors.New("operation failed") + +// New create a new instance of the partition MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - logp.Warn("EXPERIMENTAL: The %v %v metricset is experimental", base.Module().Name(), base.Name()) + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + tls, err := outputs.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + + cfg := sarama.NewConfig() + cfg.Net.DialTimeout = base.Module().Config().Timeout + cfg.Net.ReadTimeout = base.Module().Config().Timeout + cfg.ClientID = config.ClientID + cfg.Metadata.Retry.Max = config.Metadata.Retries + cfg.Metadata.Retry.Backoff = config.Metadata.Backoff + if tls != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tls.BuildModuleConfig("") + } + if config.Username != "" { + cfg.Net.SASL.Enable = true + cfg.Net.SASL.User = config.Username + cfg.Net.SASL.Password = config.Password + } - return &MetricSet{BaseMetricSet: base}, nil + broker := sarama.NewBroker(base.Host()) + return &MetricSet{ + BaseMetricSet: base, + broker: broker, + cfg: cfg, + id: noID, + topics: config.Topics, + }, nil +} + +func (m *MetricSet) connect() (*sarama.Broker, error) { + b := m.broker + if err := b.Open(m.cfg); err != nil { + return nil, err + } + + if m.id != noID { + return b, nil + } + + // current broker is bootstrap only. Get metadata to find id: + meta, err := queryMetadataWithRetry(b, m.cfg, m.topics) + if err != nil { + closeBroker(b) + return nil, err + } + + addr := b.Addr() + for _, other := range meta.Brokers { + if other.Addr() == addr { + m.id = other.ID() + break + } + } + + if m.id == noID { + closeBroker(b) + err = fmt.Errorf("No advertised broker with address %v found", addr) + return nil, err + } + + return b, nil } // Fetch partition stats list from kafka func (m *MetricSet) Fetch() ([]common.MapStr, error) { - if m.client == nil { - config := sarama.NewConfig() - config.Net.DialTimeout = m.Module().Config().Timeout - config.Net.ReadTimeout = m.Module().Config().Timeout - config.ClientID = "metricbeat" - - client, err := sarama.NewClient([]string{m.Host()}, config) - if err != nil { - return nil, err - } - m.client = client + b, err := m.connect() + if err != nil { + return nil, err } - topics, err := m.client.Topics() + defer closeBroker(b) + response, err := queryMetadataWithRetry(b, m.cfg, m.topics) if err != nil { return nil, err } events := []common.MapStr{} - for _, topic := range topics { - partitions, err := m.client.Partitions(topic) - if err != nil { - logp.Err("Fetch partition info for topic %s: %s", topic, err) + evtBroker := common.MapStr{ + "id": m.id, + "address": b.Addr(), + } + + for _, topic := range response.Topics { + evtTopic := common.MapStr{ + "name": topic.Name, + "error": common.MapStr{ + "code": topic.Err, + }, } - for _, partition := range partitions { - newestOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetNewest) - if err != nil { - logp.Err("Fetching newest offset information for partition %s in topic %s: %s", partition, topic, err) + for _, partition := range topic.Partitions { + // partition offsets can be queried from leader only + if m.id != partition.Leader { + continue } - oldestOffset, err := m.client.GetOffset(topic, partition, sarama.OffsetOldest) - if err != nil { - logp.Err("Fetching oldest offset information for partition %s in topic %s: %s", partition, topic, err) - } + // collect offsets for all replicas + for _, id := range partition.Replicas { + + // Get oldest and newest available offsets + offOldest, offNewest, offOK, err := queryOffsetRange(b, id, topic.Name, partition.ID) + + if !offOK { + if err == nil { + err = errFailQueryOffset + } - broker, err := m.client.Leader(topic, partition) - if err != nil { - logp.Err("Fetching brocker for partition %s in topic %s: %s", partition, topic, err) + logp.Err("Failed to query kafka partition (%v:%v) offsets: %v", + topic.Name, partition.ID, err) + continue + } + + // create event + event := common.MapStr{ + "topic": evtTopic, + "broker": evtBroker, + "partition": common.MapStr{ + "id": partition.ID, + "error": common.MapStr{ + "code": partition.Err, + }, + "leader": partition.Leader, + "replica": id, + "insync_replica": hasID(id, partition.Isr), + }, + "offset": common.MapStr{ + "newest": offNewest, + "oldest": offOldest, + }, + } + + events = append(events, event) } + } + } - event := common.MapStr{ - "topic": topic, - "partition": partition, - "offset": common.MapStr{ - "oldest": oldestOffset, - "newest": newestOffset, - }, - "broker": common.MapStr{ - "id": broker.ID(), - "address": broker.Addr(), - }, + return events, nil +} + +func hasID(id int32, lst []int32) bool { + for _, other := range lst { + if id == other { + return true + } + } + return false +} + +// queryOffsetRange queries the broker for the oldest and the newest offsets in +// a kafka topics partition for a given replica. +func queryOffsetRange( + b *sarama.Broker, + replicaID int32, + topic string, + partition int32, +) (int64, int64, bool, error) { + oldest, okOld, err := queryOffset(b, replicaID, topic, partition, sarama.OffsetOldest) + if err != nil { + return -1, -1, false, err + } + + newest, okNew, err := queryOffset(b, replicaID, topic, partition, sarama.OffsetNewest) + if err != nil { + return -1, -1, false, err + } + + return oldest, newest, okOld && okNew, nil +} + +func queryOffset( + b *sarama.Broker, + replicaID int32, + topic string, + partition int32, + time int64, +) (int64, bool, error) { + req := &sarama.OffsetRequest{} + if replicaID != noID { + req.SetReplicaID(replicaID) + } + req.AddBlock(topic, partition, time, 1) + resp, err := b.GetAvailableOffsets(req) + if err != nil { + return -1, false, err + } + + block := resp.GetBlock(topic, partition) + if len(block.Offsets) == 0 { + return -1, false, nil + } + + return block.Offsets[0], true, nil +} + +func closeBroker(b *sarama.Broker) { + if ok, _ := b.Connected(); ok { + b.Close() + } +} + +func queryMetadataWithRetry( + b *sarama.Broker, + cfg *sarama.Config, + topics []string, +) (r *sarama.MetadataResponse, err error) { + err = withRetry(b, cfg, func() (e error) { + r, e = b.GetMetadata(&sarama.MetadataRequest{topics}) + return + }) + return +} + +func withRetry( + b *sarama.Broker, + cfg *sarama.Config, + f func() error, +) error { + var err error + for max := 0; max < cfg.Metadata.Retry.Max; max++ { + if ok, _ := b.Connected(); !ok { + if err = b.Open(cfg); err == nil { + err = f() } + } else { + err = f() + } + + if err == nil { + return nil + } + + retry, reconnect := checkRetryQuery(err) + if !retry { + return err + } - events = append(events, event) + time.Sleep(cfg.Metadata.Retry.Backoff) + if reconnect { + closeBroker(b) } } + return err +} - return events, nil +func checkRetryQuery(err error) (retry, reconnect bool) { + if err == nil { + return false, false + } + + if err == io.EOF { + return true, true + } + + k, ok := err.(sarama.KError) + if !ok { + return false, false + } + + switch k { + case sarama.ErrLeaderNotAvailable, sarama.ErrReplicaNotAvailable, + sarama.ErrOffsetsLoadInProgress, sarama.ErrRebalanceInProgress: + return true, false + case sarama.ErrRequestTimedOut, sarama.ErrBrokerNotAvailable, + sarama.ErrNetworkException: + return true, true + } + + return false, false } diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index 41aca7ecc8f..a76f75b22c2 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -64,13 +64,13 @@ func TestTopic(t *testing.T) { // Its possible that other topics exists -> select the right data for _, data := range dataBefore { - if data["topic"] == testTopic { + if data["topic"].(common.MapStr)["name"] == testTopic { offsetBefore = data["offset"].(common.MapStr)["newest"].(int64) } } for _, data := range dataAfter { - if data["topic"] == testTopic { + if data["topic"].(common.MapStr)["name"] == testTopic { offsetAfter = data["offset"].(common.MapStr)["newest"].(int64) } } diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go index c66d8f70911..2f74df3d577 100644 --- a/vendor/github.com/Shopify/sarama/offset_request.go +++ b/vendor/github.com/Shopify/sarama/offset_request.go @@ -22,11 +22,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) { } type OffsetRequest struct { - blocks map[string]map[int32]*offsetRequestBlock + replicaID *int32 + blocks map[string]map[int32]*offsetRequestBlock + + storeReplicaID int32 } func (r *OffsetRequest) encode(pe packetEncoder) error { - pe.putInt32(-1) // replica ID is always -1 for clients + if r.replicaID == nil { + // default replica ID is always -1 for clients + pe.putInt32(-1) + } else { + pe.putInt32(*r.replicaID) + } + err := pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -100,6 +109,11 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion { return minVersion } +func (r *OffsetRequest) SetReplicaID(id int32) { + r.storeReplicaID = id + r.replicaID = &r.storeReplicaID +} + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetRequestBlock)