diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7ef7dfd59145..3998b827f732 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d - Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389] - Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321] - Fix bug affecting -cpuprofile, -memprofile, and -httpprof CLI flags {pull}1415[1415] +- Fix race when multiple outputs access the same event with logstash output manipulating event {issue}1410[1410] {pull}1428[1428] *Packetbeat* diff --git a/libbeat/outputs/logstash/client_test.go b/libbeat/outputs/logstash/client_test.go index 11f13f9c1829..4b9f6d6cb524 100644 --- a/libbeat/outputs/logstash/client_test.go +++ b/libbeat/outputs/logstash/client_test.go @@ -40,7 +40,8 @@ type testDriverCommand struct { } func newLumberjackTestClient(conn *transport.Client) *client { - c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond) + c, err := newLumberjackClient(conn, 3, + testMaxWindowSize, 100*time.Millisecond, "test") if err != nil { panic(err) } @@ -95,7 +96,7 @@ func testSimpleEvent(t *testing.T, factory clientFactory) { defer transp.Close() defer sock.Close() - event := common.MapStr{"name": "me", "line": 10} + event := common.MapStr{"type": "test", "name": "me", "line": 10} client.Publish([]common.MapStr{event}) // receive window message @@ -135,6 +136,7 @@ func testStructuredEvent(t *testing.T, factory clientFactory) { defer sock.Close() event := common.MapStr{ + "type": "test", "name": "test", "struct": common.MapStr{ "field1": 1, @@ -189,6 +191,7 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) { defer client.Stop() client.Publish([]common.MapStr{common.MapStr{ + "type": "test", "message": "hello world", }}) @@ -213,7 +216,7 @@ func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) { defer transp.Close() defer client.Stop() - event := common.MapStr{"name": "me", "line": 10} + event := common.MapStr{"type": "test", "name": "me", "line": 10} for i := 0; i < N; i++ { await := server.Await() diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index f6fae8f627ec..1d32aef5bf3b 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -91,7 +91,7 @@ func makeClientFactory(cfg *logstashConfig, tcfg *transport.Config) mode.ClientF if err != nil { return nil, err } - return newLumberjackClient(t, compressLvl, maxBulkSz, to) + return newLumberjackClient(t, compressLvl, maxBulkSz, to, cfg.Index) } } @@ -107,7 +107,6 @@ func (lj *logstash) PublishEvent( opts outputs.Options, event common.MapStr, ) error { - lj.addMeta(event) return lj.mode.PublishEvent(signaler, opts, event) } @@ -118,19 +117,5 @@ func (lj *logstash) BulkPublish( opts outputs.Options, events []common.MapStr, ) error { - for _, event := range events { - lj.addMeta(event) - } return lj.mode.PublishEvents(trans, opts, events) } - -// addMeta adapts events to be compatible with logstash forwarer messages by renaming -// the "message" field to "line". The lumberjack server in logstash will -// decode/rename the "line" field into "message". -func (lj *logstash) addMeta(event common.MapStr) { - // add metadata for indexing - event["@metadata"] = common.MapStr{ - "beat": lj.index, - "type": event["type"].(string), - } -} diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 91910da34440..9fbdba730b51 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -162,6 +162,7 @@ func TestLogstashTCP(t *testing.T) { // create lumberjack output client config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tcp"), "timeout": 2, } testConnectionType(t, server, testOutputerFactory(t, "", config)) @@ -177,6 +178,7 @@ func TestLogstashTLS(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tls"), "timeout": 2, "tls.certificate_authorities": []string{certName + ".pem"}, } @@ -193,6 +195,7 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-conn-tls-invalid"), "timeout": 2, "max_retries": 1, "tls.insecure": true, @@ -290,6 +293,7 @@ func TestLogstashInvalidTLS(t *testing.T) { config := map[string]interface{}{ "hosts": []string{server.Addr()}, + "index": testLogstashIndex("logstash-tls-invalid"), "timeout": 1, "max_retries": 0, "tls.certificate_authorities": []string{certName + ".pem"}, diff --git a/libbeat/outputs/logstash/protocol.go b/libbeat/outputs/logstash/protocol.go index 4b9538b7b3b9..f4474931a159 100644 --- a/libbeat/outputs/logstash/protocol.go +++ b/libbeat/outputs/logstash/protocol.go @@ -20,6 +20,9 @@ type protocol struct { timeout time.Duration compressLevel int + // the beat name + beat []byte + eventsBuffer *bytes.Buffer } @@ -43,6 +46,7 @@ func newClientProcol( conn net.Conn, timeout time.Duration, compressLevel int, + beat string, ) (*protocol, error) { // validate by creating and discarding zlib writer with configured level @@ -55,11 +59,17 @@ func newClientProcol( w.Close() } + encodedBeat, err := json.Marshal(beat) + if err != nil { + return nil, err + } + return &protocol{ conn: conn, timeout: timeout, compressLevel: compressLevel, eventsBuffer: bytes.NewBuffer(nil), + beat: encodedBeat, }, nil } @@ -263,7 +273,7 @@ func (p *protocol) serializeDataFrame( // payloadLen (bytes): uint32 // payload: JSON document - jsonEvent, err := json.Marshal(event) + jsonEvent, err := serializeEvent(event, p.beat) if err != nil { debug("Fail to json encode event (%v): %#v", err, event) return err @@ -285,6 +295,49 @@ func (p *protocol) serializeDataFrame( return nil } +func serializeEvent(event common.MapStr, beat []byte) ([]byte, error) { + buf := bytes.NewBuffer(nil) + buf.WriteRune('{') + + if _, hasMeta := event["@metadata"]; !hasMeta { + typ := event["type"].(string) + + buf.WriteString(`"@metadata":{"type":`) + tmp, err := json.Marshal(typ) + if err != nil { + return nil, err + } + buf.Write(tmp) + + buf.WriteString(`,"beat":`) + buf.Write(beat) + buf.WriteString(`},`) + } + + for k, v := range event { + // append key + tmp, err := json.Marshal(k) + if err != nil { + return nil, err + } + buf.Write(tmp) + buf.WriteRune(':') + + // append value + tmp, err = json.Marshal(v) + if err != nil { + return nil, err + } + buf.Write(tmp) + buf.WriteRune(',') + } + + b := buf.Bytes() + b[len(b)-1] = '}' + + return b, nil +} + func writeUint32(out io.Writer, v uint32) error { return binary.Write(out, binary.BigEndian, v) } diff --git a/libbeat/outputs/logstash/protocol_test.go b/libbeat/outputs/logstash/protocol_test.go index 373a41392092..7f1ed0f1c3d9 100644 --- a/libbeat/outputs/logstash/protocol_test.go +++ b/libbeat/outputs/logstash/protocol_test.go @@ -55,7 +55,7 @@ func (s *protocolServer) connectPair(compressLevel int) (*mockConn, *protocol, e return nil, nil, err } - proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel) + proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel, "test") if err != nil { return nil, nil, err } @@ -240,7 +240,7 @@ func readMessage(buf *streambuf.Buffer) (*message, error) { func TestInvalidCompressionLevel(t *testing.T) { conn := (net.Conn)(nil) - p, err := newClientProcol(conn, 5*time.Second, 10) + p, err := newClientProcol(conn, 5*time.Second, 10, "test") assert.Nil(t, p) assert.NotNil(t, err) } @@ -273,6 +273,7 @@ func TestProtocolCloseAfterWindowSize(t *testing.T) { // defer transp.conn.Close() transp.sendEvents([]common.MapStr{common.MapStr{ + "type": "test", "message": "hello world", }}) @@ -306,7 +307,11 @@ func testProtocolReturnWindowSizes( events := []common.MapStr{} for i := 0; i < n; i++ { - events = append(events, common.MapStr{"message": string(i)}) + events = append(events, + common.MapStr{ + "type": "test", + "message": string(i), + }) } outEvents, err := transp.sendEvents(events) @@ -328,7 +333,11 @@ func testProtocolReturnWindowSizes( seq, err := transp.awaitACK(uint32(n)) assert.Equal(t, outEvents, events) - assert.Equal(t, docs, events) + assert.Equal(t, len(docs), len(events)) + for i := range docs { + assert.Equal(t, docs[i]["type"], events[i]["type"]) + assert.Equal(t, docs[i]["message"], events[i]["message"]) + } assert.Equal(t, n, int(seq)) assert.Equal(t, n, int(msg.size)) if expectErr { @@ -367,7 +376,11 @@ func TestProtocolFailOnClosedConnection(t *testing.T) { events := []common.MapStr{} for i := 0; i < N; i++ { - events = append(events, common.MapStr{"message": i}) + events = append(events, + common.MapStr{ + "type": "test", + "message": i, + }) } transp.conn.Close() diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 390e635229e1..51e2bef0ed9a 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -46,8 +46,9 @@ func newLumberjackClient( compressLevel int, maxWindowSize int, timeout time.Duration, + beat string, ) (*client, error) { - p, err := newClientProcol(conn, timeout, compressLevel) + p, err := newClientProcol(conn, timeout, compressLevel, beat) if err != nil { return nil, err } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index b01bdf1917f7..cea16e8170c6 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -56,7 +56,8 @@ func (s *clientServer) connectPair(compressLevel int) (*mockConn, *client, error } lc, err := newLumberjackClient(transp, compressLevel, - defaultConfig.BulkMaxSize, 100*time.Millisecond) + defaultConfig.BulkMaxSize, 100*time.Millisecond, + "test") if err != nil { return nil, nil, err }