Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch bulk request enhancements #5811

Merged
merged 7 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Use structured logging for the metrics that are periodically logged via the
`logging.metrics` feature. {pull}5915[5915]
- Add the ability to log to the Windows Event Log. {pull}5913[5813]
- Improve Elasticsearch output metrics to count number of dropped and duplicate (if event ID is given) events. {pull}5811[5811]

*Auditbeat*

Expand Down
7 changes: 7 additions & 0 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ var (
errNoTimestamp = errors.New("value is no timestamp")
)

func (e *Event) SetID(id string) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Event.SetID should have comment or be unexported

if e.Meta == nil {
e.Meta = common.MapStr{}
}
e.Meta["id"] = id
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
124 changes: 81 additions & 43 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ type Connection struct {
version string
}

type bulkIndexAction struct {
Index bulkEventMeta `json:"index" struct:"index"`
}

type bulkCreateAction struct {
Create bulkEventMeta `json:"create" struct:"create"`
}

type bulkEventMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type" struct:"_type"`
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
}

type bulkResultStats struct {
acked int // number of events ACKed by Elasticsearch
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
}

var (
nameItems = []byte("items")
nameStatus = []byte("status")
Expand Down Expand Up @@ -283,19 +305,25 @@ func (client *Client) publishEvents(

// check response for transient errors
var failedEvents []publisher.Event
var stats bulkResultStats
if status != 200 {
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
failedEvents = bulkCollectPublishFails(&client.json, data)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

failed := len(failedEvents)
if st := client.observer; st != nil {
acked := len(data) - failed
dropped := stats.nonIndexable
duplicates := stats.duplicates
acked := len(data) - failed - dropped - duplicates

st.Acked(acked)
st.Failed(failed)
st.Dropped(dropped)
st.Duplicate(duplicates)
}

if failed > 0 {
Expand All @@ -318,7 +346,11 @@ func bulkEncodePublishRequest(
okEvents := data[:0]
for i := range data {
event := &data[i].Content
meta := createEventBulkMeta(index, pipeline, event)
meta, err := createEventBulkMeta(index, pipeline, event)
if err != nil {
logp.Err("Failed to encode event meta data: %s", err)
continue
}
if err := body.Add(meta, event); err != nil {
logp.Err("Failed to encode event: %s", err)
continue
Expand All @@ -329,48 +361,44 @@ func bulkEncodePublishRequest(
}

func createEventBulkMeta(
index outil.Selector,
indexSel outil.Selector,
pipelineSel *outil.Selector,
event *beat.Event,
) interface{} {
) (interface{}, error) {
pipeline, err := getPipeline(event, pipelineSel)
if err != nil {
logp.Err("Failed to select pipeline: %v", err)
err := fmt.Errorf("failed to select pipeline: %v", err)
return nil, err
}

if pipeline == "" {
type bulkMetaIndex struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type" struct:"_type"`
}
type bulkMeta struct {
Index bulkMetaIndex `json:"index"`
}
index, err := getIndex(event, indexSel)
if err != nil {
err := fmt.Errorf("failed to select event index: %v", err)
return nil, err
}

return bulkMeta{
Index: bulkMetaIndex{
Index: getIndex(event, index),
DocType: eventType,
},
var id string
if m := event.Meta; m != nil {
if tmp := m["id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
logp.Err("Event ID '%v' is no string value", id)
}
}
}

type bulkMetaIndex struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type" struct:"_type"`
Pipeline string `json:"pipeline" struct:"pipeline"`
}
type bulkMeta struct {
Index bulkMetaIndex `json:"index" struct:"index"`
meta := bulkEventMeta{
Index: index,
DocType: eventType,
Pipeline: pipeline,
ID: id,
}

return bulkMeta{
Index: bulkMetaIndex{
Index: getIndex(event, index),
Pipeline: pipeline,
DocType: eventType,
},
if id != "" {
return bulkCreateAction{meta}, nil
}
return bulkIndexAction{meta}, nil
}

func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) {
Expand All @@ -392,20 +420,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// getIndex returns the full index name
// Index is either defined in the config as part of the output
// or can be overload by the event through setting index
func getIndex(event *beat.Event, index outil.Selector) string {
func getIndex(event *beat.Event, index outil.Selector) (string, error) {
if event.Meta != nil {
if str, exists := event.Meta["index"]; exists {
idx, ok := str.(string)
if ok {
ts := event.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
idx, ts.Year(), ts.Month(), ts.Day()), nil
}
}
}

str, _ := index.Select(event)
return str
return index.Select(event)
}

// bulkCollectPublishFails checks per item errors returning all events
Expand All @@ -415,23 +442,23 @@ func getIndex(event *beat.Event, index outil.Selector) string {
func bulkCollectPublishFails(
reader *jsonReader,
data []publisher.Event,
) []publisher.Event {
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk respose: expected JSON object")
return nil
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil
return nil, bulkResultStats{}
}

// found items array -> continue
Expand All @@ -445,32 +472,43 @@ func bulkCollectPublishFails(
// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk respose: expected items array")
return nil
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
if err != nil {
return nil
return nil, bulkResultStats{}
}

if status < 300 {
stats.acked++
continue // ok value
}

if status == 409 {
// 409 is used to indicate an event with same ID already exists if
// `create` op_type is used.
stats.duplicates++
continue // ok
}

if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
}

debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
stats.fails++
failed = append(failed, data[i])
}

return failed
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
Expand Down
18 changes: 9 additions & 9 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCollectPublishFailsNone(t *testing.T) {
}

reader := newJSONReader(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
assert.Equal(t, 0, len(res))
}

Expand All @@ -102,7 +102,7 @@ func TestCollectPublishFailMiddle(t *testing.T) {
events := []publisher.Event{event, eventFail, event}

reader := newJSONReader(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
assert.Equal(t, 1, len(res))
if len(res) == 1 {
assert.Equal(t, eventFail, res[0])
Expand All @@ -122,7 +122,7 @@ func TestCollectPublishFailAll(t *testing.T) {
events := []publisher.Event{event, event, event}

reader := newJSONReader(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
assert.Equal(t, 3, len(res))
assert.Equal(t, events, res)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestCollectPipelinePublishFail(t *testing.T) {
events := []publisher.Event{event}

reader := newJSONReader(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
assert.Equal(t, 1, len(res))
assert.Equal(t, events, res)
}
Expand All @@ -178,7 +178,7 @@ func TestGetIndexStandard(t *testing.T) {
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, ""))

event := &beat.Event{Timestamp: ts, Fields: fields}
index := getIndex(event, indexSel)
index, _ := getIndex(event, indexSel)
assert.Equal(t, index, "beatname-"+extension)
}

Expand All @@ -204,7 +204,7 @@ func TestGetIndexOverwrite(t *testing.T) {
"index": "dynamicindex",
},
Fields: fields}
index := getIndex(event, indexSel)
index, _ := getIndex(event, indexSel)
expected := "dynamicindex-" + extension
assert.Equal(t, expected, index)
}
Expand All @@ -224,7 +224,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) {
reader := newJSONReader(nil)
for i := 0; i < b.N; i++ {
reader.init(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
if len(res) != 0 {
b.Fail()
}
Expand All @@ -247,7 +247,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) {
reader := newJSONReader(nil)
for i := 0; i < b.N; i++ {
reader.init(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
if len(res) != 1 {
b.Fail()
}
Expand All @@ -269,7 +269,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) {
reader := newJSONReader(nil)
for i := 0; i < b.N; i++ {
reader.init(response)
res := bulkCollectPublishFails(reader, events)
res, _ := bulkCollectPublishFails(reader, events)
if len(res) != 3 {
b.Fail()
}
Expand Down
Loading