From 35163aeef2e0dffce73cc171071dddca6f17356d Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 19 Apr 2016 23:38:26 +0200 Subject: [PATCH] Do not update event in parallel logstash output used to add metadata to an event. With potentially having multiple outputs configured this is some bad practice. Instead the logstash output plugin will add '@metadata' to the event when encoding and only if '@metadata' is not already present in event --- CHANGELOG.asciidoc | 1 + libbeat/outputs/logstash/client_test.go | 9 ++-- libbeat/outputs/logstash/logstash.go | 17 +------ libbeat/outputs/logstash/logstash_test.go | 4 ++ libbeat/outputs/logstash/protocol.go | 55 ++++++++++++++++++++++- libbeat/outputs/logstash/protocol_test.go | 23 +++++++--- libbeat/outputs/logstash/sync.go | 3 +- libbeat/outputs/logstash/sync_test.go | 3 +- 8 files changed, 88 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7ef7dfd59145..3998b827f732 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d - Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389] - Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321] - Fix bug affecting -cpuprofile, -memprofile, and -httpprof CLI flags {pull}1415[1415] +- Fix race when multiple outputs access the same event with logstash output manipulating event {issue}1410[1410] {pull}1428[1428] *Packetbeat* diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index 11f13f9c1829..4b9f6d6cb524 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -40,7 +40,8 @@ type testDriverCommand struct { } func newLumberjackTestClient(conn *transport.Client) *client { - c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond) + c, err := newLumberjackClient(conn, 3, + testMaxWindowSize, 100*time.Millisecond, "test") if err != nil { panic(err) } @@ -95,7 +96,7 @@ func testSimpleEvent(t *testing.T, factory clientFactory) { defer transp.Close() defer sock.Close() - event := common.MapStr{"name": "me", "line": 10} + event := common.MapStr{"type": "test", "name": "me", "line": 10} client.Publish([]common.MapStr{event}) // receive window message @@ -135,6 +136,7 @@ func testStructuredEvent(t *testing.T, factory clientFactory) { defer sock.Close() event := common.MapStr{ + "type": "test", "name": "test", "struct": common.MapStr{ "field1": 1, @@ -189,6 +191,7 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) { defer client.Stop() client.Publish([]common.MapStr{common.MapStr{ + "type": "test", "message": "hello world", }}) @@ -213,7 +216,7 @@ func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) { defer transp.Close() defer client.Stop() - event := common.MapStr{"name": "me", "line": 10} + event := common.MapStr{"type": "test", "name": "me", "line": 10} for i := 0; i < N; i++ { await := server.Await() diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index f6fae8f627ec..1d32aef5bf3b 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -91,7 +91,7 @@ func makeClientFactory(cfg *logstashConfig, tcfg *transport.Config) mode.ClientF if err != nil { return nil, err } - return newLumberjackClient(t, compressLvl, maxBulkSz, to) + return newLumberjackClient(t, compressLvl, maxBulkSz, to, cfg.Index) } } @@ -107,7 +107,6 @@ func (lj *logstash) PublishEvent( opts outputs.Options, event common.MapStr, ) error { - lj.addMeta(event) return lj.mode.PublishEvent(signaler, opts, event) } @@ -118,19 +117,5 @@ func (lj *logstash) BulkPublish( opts outputs.Options, events []common.MapStr, ) error { - for _, event := range events { - lj.addMeta(event) - } return lj.mode.PublishEvents(trans, opts, events) } - -// addMeta adapts events to be compatible with logstash forwarer messages by renaming -// the "message" field to "line". The lumberjack server in logstash will -// decode/rename the "line" field into "message". -func (lj *logstash) addMeta(event common.MapStr) { - // add metadata for indexing - event["@metadata"] = common.MapStr{ - "beat": lj.index, - "type": event["type"].(string), - } -} diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 91910da34440..9fbdba730b51 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -162,6 +162,7 @@ func TestLogstashTCP(t *testing.T) { // create lumberjack output client config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tcp"), "timeout": 2, } testConnectionType(t, server, testOutputerFactory(t, "", config)) @@ -177,6 +178,7 @@ func TestLogstashTLS(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tls"), "timeout": 2, "tls.certificate_authorities": []string{certName + ".pem"}, } @@ -193,6 +195,7 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tls-invalid"), "timeout": 2, "max_retries": 1, "tls.insecure": true, @@ -290,6 +293,7 @@ func TestLogstashInvalidTLS(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-tls-invalid"), "timeout": 1, "max_retries": 0, "tls.certificate_authorities": []string{certName + ".pem"}, diff --git a/libbeat/outputs/logstash/protocol.go b/libbeat/outputs/logstash/protocol.go index 4b9538b7b3b9..f4474931a159 100644 --- a/libbeat/outputs/logstash/protocol.go +++ b/libbeat/outputs/logstash/protocol.go @@ -20,6 +20,9 @@ type protocol struct { timeout time.Duration compressLevel int + // the beat name + beat []byte + eventsBuffer *bytes.Buffer } @@ -43,6 +46,7 @@ func newClientProcol( conn net.Conn, timeout time.Duration, compressLevel int, + beat string, ) (*protocol, error) { // validate by creating and discarding zlib writer with configured level @@ -55,11 +59,17 @@ func newClientProcol( w.Close() } + encodedBeat, err := json.Marshal(beat) + if err != nil { + return nil, err + } + return &protocol{ conn: conn, timeout: timeout, compressLevel: compressLevel, eventsBuffer: bytes.NewBuffer(nil), + beat: encodedBeat, }, nil } @@ -263,7 +273,7 @@ func (p *protocol) serializeDataFrame( // payloadLen (bytes): uint32 // payload: JSON document - jsonEvent, err := json.Marshal(event) + jsonEvent, err := serializeEvent(event, p.beat) if err != nil { debug("Fail to json encode event (%v): %#v", err, event) return err @@ -285,6 +295,49 @@ func (p *protocol) serializeDataFrame( return nil } +func serializeEvent(event common.MapStr, beat []byte) ([]byte, error) { + buf := bytes.NewBuffer(nil) + buf.WriteRune('{') + + if _, hasMeta := event["@metadata"]; !hasMeta { + typ := event["type"].(string) + + buf.WriteString(`"@metadata":{"type":`) + tmp, err := json.Marshal(typ) + if err != nil { + return nil, err + } + buf.Write(tmp) + + buf.WriteString(`,"beat":`) + buf.Write(beat) + buf.WriteString(`},`) + } + + for k, v := range event { + // append key + tmp, err := json.Marshal(k) + if err != nil { + return nil, err + } + buf.Write(tmp) + buf.WriteRune(':') + + // append value + tmp, err = json.Marshal(v) + if err != nil { + return nil, err + } + buf.Write(tmp) + buf.WriteRune(',') + } + + b := buf.Bytes() + b[len(b)-1] = '}' + + return b, nil +} + func writeUint32(out io.Writer, v uint32) error { return binary.Write(out, binary.BigEndian, v) } diff --git a/libbeat/outputs/logstash/protocol_test.go b/libbeat/outputs/logstash/protocol_test.go index 373a41392092..7f1ed0f1c3d9 100644 --- a/libbeat/outputs/logstash/protocol_test.go +++ b/libbeat/outputs/logstash/protocol_test.go @@ -55,7 +55,7 @@ func (s *protocolServer) connectPair(compressLevel int) (*mockConn, *protocol, e return nil, nil, err } - proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel) + proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel, "test") if err != nil { return nil, nil, err } @@ -240,7 +240,7 @@ func readMessage(buf *streambuf.Buffer) (*message, error) { func TestInvalidCompressionLevel(t *testing.T) { conn := (net.Conn)(nil) - p, err := newClientProcol(conn, 5*time.Second, 10) + p, err := newClientProcol(conn, 5*time.Second, 10, "test") assert.Nil(t, p) assert.NotNil(t, err) } @@ -273,6 +273,7 @@ func TestProtocolCloseAfterWindowSize(t *testing.T) { // defer transp.conn.Close() transp.sendEvents([]common.MapStr{common.MapStr{ + "type": "test", "message": "hello world", }}) @@ -306,7 +307,11 @@ func testProtocolReturnWindowSizes( events := []common.MapStr{} for i := 0; i < n; i++ { - events = append(events, common.MapStr{"message": string(i)}) + events = append(events, + common.MapStr{ + "type": "test", + "message": string(i), + }) } outEvents, err := transp.sendEvents(events) @@ -328,7 +333,11 @@ func testProtocolReturnWindowSizes( seq, err := transp.awaitACK(uint32(n)) assert.Equal(t, outEvents, events) - assert.Equal(t, docs, events) + assert.Equal(t, len(docs), len(events)) + for i := range docs { + assert.Equal(t, docs[i]["type"], events[i]["type"]) + assert.Equal(t, docs[i]["message"], events[i]["message"]) + } assert.Equal(t, n, int(seq)) assert.Equal(t, n, int(msg.size)) if expectErr { @@ -367,7 +376,11 @@ func TestProtocolFailOnClosedConnection(t *testing.T) { events := []common.MapStr{} for i := 0; i < N; i++ { - events = append(events, common.MapStr{"message": i}) + events = append(events, + common.MapStr{ + "type": "test", + "message": i, + }) } transp.conn.Close() diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 390e635229e1..51e2bef0ed9a 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -46,8 +46,9 @@ func newLumberjackClient( compressLevel int, maxWindowSize int, timeout time.Duration, + beat string, ) (*client, error) { - p, err := newClientProcol(conn, timeout, compressLevel) + p, err := newClientProcol(conn, timeout, compressLevel, beat) if err != nil { return nil, err } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index b01bdf1917f7..cea16e8170c6 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -56,7 +56,8 @@ func (s *clientServer) connectPair(compressLevel int) (*mockConn, *client, error } lc, err := newLumberjackClient(transp, compressLevel, - defaultConfig.BulkMaxSize, 100*time.Millisecond) + defaultConfig.BulkMaxSize, 100*time.Millisecond, + "test") if err != nil { return nil, nil, err }