Skip to content

Commit

Permalink
Do not update event in parallel
Browse files Browse the repository at this point in the history
logstash output used to add metadata to an event. With potentially having
multiple outputs configured this is some bad practice. Instead the logstash
output plugin will add '@metadata' to the event when encoding and only if
'@metadata' is not already present in event
  • Loading branch information
urso committed Apr 20, 2016
1 parent dbcf2c7 commit 35163ae
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
- Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389]
- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321]
- Fix bug affecting -cpuprofile, -memprofile, and -httpprof CLI flags {pull}1415[1415]
- Fix race when multiple outputs access the same event with logstash output manipulating event {issue}1410[1410] {pull}1428[1428]

*Packetbeat*

Expand Down
9 changes: 6 additions & 3 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type testDriverCommand struct {
}

func newLumberjackTestClient(conn *transport.Client) *client {
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond)
c, err := newLumberjackClient(conn, 3,
testMaxWindowSize, 100*time.Millisecond, "test")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func testSimpleEvent(t *testing.T, factory clientFactory) {
defer transp.Close()
defer sock.Close()

event := common.MapStr{"name": "me", "line": 10}
event := common.MapStr{"type": "test", "name": "me", "line": 10}
client.Publish([]common.MapStr{event})

// receive window message
Expand Down Expand Up @@ -135,6 +136,7 @@ func testStructuredEvent(t *testing.T, factory clientFactory) {
defer sock.Close()

event := common.MapStr{
"type": "test",
"name": "test",
"struct": common.MapStr{
"field1": 1,
Expand Down Expand Up @@ -189,6 +191,7 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) {
defer client.Stop()

client.Publish([]common.MapStr{common.MapStr{
"type": "test",
"message": "hello world",
}})

Expand All @@ -213,7 +216,7 @@ func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) {
defer transp.Close()
defer client.Stop()

event := common.MapStr{"name": "me", "line": 10}
event := common.MapStr{"type": "test", "name": "me", "line": 10}

for i := 0; i < N; i++ {
await := server.Await()
Expand Down
17 changes: 1 addition & 16 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func makeClientFactory(cfg *logstashConfig, tcfg *transport.Config) mode.ClientF
if err != nil {
return nil, err
}
return newLumberjackClient(t, compressLvl, maxBulkSz, to)
return newLumberjackClient(t, compressLvl, maxBulkSz, to, cfg.Index)
}
}

Expand All @@ -107,7 +107,6 @@ func (lj *logstash) PublishEvent(
opts outputs.Options,
event common.MapStr,
) error {
lj.addMeta(event)
return lj.mode.PublishEvent(signaler, opts, event)
}

Expand All @@ -118,19 +117,5 @@ func (lj *logstash) BulkPublish(
opts outputs.Options,
events []common.MapStr,
) error {
for _, event := range events {
lj.addMeta(event)
}
return lj.mode.PublishEvents(trans, opts, events)
}

// addMeta adapts events to be compatible with logstash forwarer messages by renaming
// the "message" field to "line". The lumberjack server in logstash will
// decode/rename the "line" field into "message".
func (lj *logstash) addMeta(event common.MapStr) {
// add metadata for indexing
event["@metadata"] = common.MapStr{
"beat": lj.index,
"type": event["type"].(string),
}
}
4 changes: 4 additions & 0 deletions libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestLogstashTCP(t *testing.T) {
// create lumberjack output client
config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tcp"),
"timeout": 2,
}
testConnectionType(t, server, testOutputerFactory(t, "", config))
Expand All @@ -177,6 +178,7 @@ func TestLogstashTLS(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tls"),
"timeout": 2,
"tls.certificate_authorities": []string{certName + ".pem"},
}
Expand All @@ -193,6 +195,7 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tls-invalid"),
"timeout": 2,
"max_retries": 1,
"tls.insecure": true,
Expand Down Expand Up @@ -290,6 +293,7 @@ func TestLogstashInvalidTLS(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-tls-invalid"),
"timeout": 1,
"max_retries": 0,
"tls.certificate_authorities": []string{certName + ".pem"},
Expand Down
55 changes: 54 additions & 1 deletion libbeat/outputs/logstash/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type protocol struct {
timeout time.Duration
compressLevel int

// the beat name
beat []byte

eventsBuffer *bytes.Buffer
}

Expand All @@ -43,6 +46,7 @@ func newClientProcol(
conn net.Conn,
timeout time.Duration,
compressLevel int,
beat string,
) (*protocol, error) {

// validate by creating and discarding zlib writer with configured level
Expand All @@ -55,11 +59,17 @@ func newClientProcol(
w.Close()
}

encodedBeat, err := json.Marshal(beat)
if err != nil {
return nil, err
}

return &protocol{
conn: conn,
timeout: timeout,
compressLevel: compressLevel,
eventsBuffer: bytes.NewBuffer(nil),
beat: encodedBeat,
}, nil
}

Expand Down Expand Up @@ -263,7 +273,7 @@ func (p *protocol) serializeDataFrame(
// payloadLen (bytes): uint32
// payload: JSON document

jsonEvent, err := json.Marshal(event)
jsonEvent, err := serializeEvent(event, p.beat)
if err != nil {
debug("Fail to json encode event (%v): %#v", err, event)
return err
Expand All @@ -285,6 +295,49 @@ func (p *protocol) serializeDataFrame(
return nil
}

func serializeEvent(event common.MapStr, beat []byte) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteRune('{')

if _, hasMeta := event["@metadata"]; !hasMeta {
typ := event["type"].(string)

buf.WriteString(`"@metadata":{"type":`)
tmp, err := json.Marshal(typ)
if err != nil {
return nil, err
}
buf.Write(tmp)

buf.WriteString(`,"beat":`)
buf.Write(beat)
buf.WriteString(`},`)
}

for k, v := range event {
// append key
tmp, err := json.Marshal(k)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(':')

// append value
tmp, err = json.Marshal(v)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(',')
}

b := buf.Bytes()
b[len(b)-1] = '}'

return b, nil
}

func writeUint32(out io.Writer, v uint32) error {
return binary.Write(out, binary.BigEndian, v)
}
23 changes: 18 additions & 5 deletions libbeat/outputs/logstash/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *protocolServer) connectPair(compressLevel int) (*mockConn, *protocol, e
return nil, nil, err
}

proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel)
proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel, "test")
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func readMessage(buf *streambuf.Buffer) (*message, error) {

func TestInvalidCompressionLevel(t *testing.T) {
conn := (net.Conn)(nil)
p, err := newClientProcol(conn, 5*time.Second, 10)
p, err := newClientProcol(conn, 5*time.Second, 10, "test")
assert.Nil(t, p)
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -273,6 +273,7 @@ func TestProtocolCloseAfterWindowSize(t *testing.T) {
// defer transp.conn.Close()

transp.sendEvents([]common.MapStr{common.MapStr{
"type": "test",
"message": "hello world",
}})

Expand Down Expand Up @@ -306,7 +307,11 @@ func testProtocolReturnWindowSizes(

events := []common.MapStr{}
for i := 0; i < n; i++ {
events = append(events, common.MapStr{"message": string(i)})
events = append(events,
common.MapStr{
"type": "test",
"message": string(i),
})
}

outEvents, err := transp.sendEvents(events)
Expand All @@ -328,7 +333,11 @@ func testProtocolReturnWindowSizes(

seq, err := transp.awaitACK(uint32(n))
assert.Equal(t, outEvents, events)
assert.Equal(t, docs, events)
assert.Equal(t, len(docs), len(events))
for i := range docs {
assert.Equal(t, docs[i]["type"], events[i]["type"])
assert.Equal(t, docs[i]["message"], events[i]["message"])
}
assert.Equal(t, n, int(seq))
assert.Equal(t, n, int(msg.size))
if expectErr {
Expand Down Expand Up @@ -367,7 +376,11 @@ func TestProtocolFailOnClosedConnection(t *testing.T) {

events := []common.MapStr{}
for i := 0; i < N; i++ {
events = append(events, common.MapStr{"message": i})
events = append(events,
common.MapStr{
"type": "test",
"message": i,
})
}

transp.conn.Close()
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func newLumberjackClient(
compressLevel int,
maxWindowSize int,
timeout time.Duration,
beat string,
) (*client, error) {
p, err := newClientProcol(conn, timeout, compressLevel)
p, err := newClientProcol(conn, timeout, compressLevel, beat)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (s *clientServer) connectPair(compressLevel int) (*mockConn, *client, error
}

lc, err := newLumberjackClient(transp, compressLevel,
defaultConfig.BulkMaxSize, 100*time.Millisecond)
defaultConfig.BulkMaxSize, 100*time.Millisecond,
"test")
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 35163ae

Please sign in to comment.