Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not update event in parallel #1428

Merged
merged 1 commit into from
Apr 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
- Unterminated environment variable expressions in config files will now cause an error {pull}1389[1389]
- Fix issue with the automatic template loading when Elasticsearch is not available on Beat start. {issue}1321[1321]
- Fix bug affecting -cpuprofile, -memprofile, and -httpprof CLI flags {pull}1415[1415]
- Fix race when multiple outputs access the same event with logstash output manipulating event {issue}1410[1410] {pull}1428[1428]

*Packetbeat*

Expand Down
9 changes: 6 additions & 3 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type testDriverCommand struct {
}

func newLumberjackTestClient(conn *transport.Client) *client {
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 100*time.Millisecond)
c, err := newLumberjackClient(conn, 3,
testMaxWindowSize, 100*time.Millisecond, "test")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func testSimpleEvent(t *testing.T, factory clientFactory) {
defer transp.Close()
defer sock.Close()

event := common.MapStr{"name": "me", "line": 10}
event := common.MapStr{"type": "test", "name": "me", "line": 10}
client.Publish([]common.MapStr{event})

// receive window message
Expand Down Expand Up @@ -135,6 +136,7 @@ func testStructuredEvent(t *testing.T, factory clientFactory) {
defer sock.Close()

event := common.MapStr{
"type": "test",
"name": "test",
"struct": common.MapStr{
"field1": 1,
Expand Down Expand Up @@ -189,6 +191,7 @@ func testCloseAfterWindowSize(t *testing.T, factory clientFactory) {
defer client.Stop()

client.Publish([]common.MapStr{common.MapStr{
"type": "test",
"message": "hello world",
}})

Expand All @@ -213,7 +216,7 @@ func testMultiFailMaxTimeouts(t *testing.T, factory clientFactory) {
defer transp.Close()
defer client.Stop()

event := common.MapStr{"name": "me", "line": 10}
event := common.MapStr{"type": "test", "name": "me", "line": 10}

for i := 0; i < N; i++ {
await := server.Await()
Expand Down
17 changes: 1 addition & 16 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func makeClientFactory(cfg *logstashConfig, tcfg *transport.Config) mode.ClientF
if err != nil {
return nil, err
}
return newLumberjackClient(t, compressLvl, maxBulkSz, to)
return newLumberjackClient(t, compressLvl, maxBulkSz, to, cfg.Index)
}
}

Expand All @@ -107,7 +107,6 @@ func (lj *logstash) PublishEvent(
opts outputs.Options,
event common.MapStr,
) error {
lj.addMeta(event)
return lj.mode.PublishEvent(signaler, opts, event)
}

Expand All @@ -118,19 +117,5 @@ func (lj *logstash) BulkPublish(
opts outputs.Options,
events []common.MapStr,
) error {
for _, event := range events {
lj.addMeta(event)
}
return lj.mode.PublishEvents(trans, opts, events)
}

// addMeta adapts events to be compatible with logstash forwarer messages by renaming
// the "message" field to "line". The lumberjack server in logstash will
// decode/rename the "line" field into "message".
func (lj *logstash) addMeta(event common.MapStr) {
// add metadata for indexing
event["@metadata"] = common.MapStr{
"beat": lj.index,
"type": event["type"].(string),
}
}
4 changes: 4 additions & 0 deletions libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestLogstashTCP(t *testing.T) {
// create lumberjack output client
config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tcp"),
"timeout": 2,
}
testConnectionType(t, server, testOutputerFactory(t, "", config))
Expand All @@ -177,6 +178,7 @@ func TestLogstashTLS(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tls"),
"timeout": 2,
"tls.certificate_authorities": []string{certName + ".pem"},
}
Expand All @@ -193,6 +195,7 @@ func TestLogstashInvalidTLSInsecure(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-conn-tls-invalid"),
"timeout": 2,
"max_retries": 1,
"tls.insecure": true,
Expand Down Expand Up @@ -290,6 +293,7 @@ func TestLogstashInvalidTLS(t *testing.T) {

config := map[string]interface{}{
"hosts": []string{server.Addr()},
"index": testLogstashIndex("logstash-tls-invalid"),
"timeout": 1,
"max_retries": 0,
"tls.certificate_authorities": []string{certName + ".pem"},
Expand Down
55 changes: 54 additions & 1 deletion libbeat/outputs/logstash/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type protocol struct {
timeout time.Duration
compressLevel int

// the beat name
beat []byte

eventsBuffer *bytes.Buffer
}

Expand All @@ -43,6 +46,7 @@ func newClientProcol(
conn net.Conn,
timeout time.Duration,
compressLevel int,
beat string,
) (*protocol, error) {

// validate by creating and discarding zlib writer with configured level
Expand All @@ -55,11 +59,17 @@ func newClientProcol(
w.Close()
}

encodedBeat, err := json.Marshal(beat)
if err != nil {
return nil, err
}

return &protocol{
conn: conn,
timeout: timeout,
compressLevel: compressLevel,
eventsBuffer: bytes.NewBuffer(nil),
beat: encodedBeat,
}, nil
}

Expand Down Expand Up @@ -263,7 +273,7 @@ func (p *protocol) serializeDataFrame(
// payloadLen (bytes): uint32
// payload: JSON document

jsonEvent, err := json.Marshal(event)
jsonEvent, err := serializeEvent(event, p.beat)
if err != nil {
debug("Fail to json encode event (%v): %#v", err, event)
return err
Expand All @@ -285,6 +295,49 @@ func (p *protocol) serializeDataFrame(
return nil
}

func serializeEvent(event common.MapStr, beat []byte) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some unit tests for this function would be good.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's tested (block-box testing sendEvents), as we've got some logstash unit tests forwarding events via sendEvents with server part decoding json messages for validation.

buf := bytes.NewBuffer(nil)
buf.WriteRune('{')

if _, hasMeta := event["@metadata"]; !hasMeta {
typ := event["type"].(string)

buf.WriteString(`"@metadata":{"type":`)
tmp, err := json.Marshal(typ)
if err != nil {
return nil, err
}
buf.Write(tmp)

buf.WriteString(`,"beat":`)
buf.Write(beat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also be passed through json.Marshal or otherwise escaped? We have to be careful with JSON injection if we're doing this by hand.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.Marshal is done line 62. Didn't want to marshal over and over again

buf.WriteString(`},`)
}

for k, v := range event {
// append key
tmp, err := json.Marshal(k)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(':')

// append value
tmp, err = json.Marshal(v)
if err != nil {
return nil, err
}
buf.Write(tmp)
buf.WriteRune(',')
}

b := buf.Bytes()
b[len(b)-1] = '}'

return b, nil
}

func writeUint32(out io.Writer, v uint32) error {
return binary.Write(out, binary.BigEndian, v)
}
23 changes: 18 additions & 5 deletions libbeat/outputs/logstash/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *protocolServer) connectPair(compressLevel int) (*mockConn, *protocol, e
return nil, nil, err
}

proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel)
proto, err := newClientProcol(transp, 100*time.Millisecond, compressLevel, "test")
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func readMessage(buf *streambuf.Buffer) (*message, error) {

func TestInvalidCompressionLevel(t *testing.T) {
conn := (net.Conn)(nil)
p, err := newClientProcol(conn, 5*time.Second, 10)
p, err := newClientProcol(conn, 5*time.Second, 10, "test")
assert.Nil(t, p)
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -273,6 +273,7 @@ func TestProtocolCloseAfterWindowSize(t *testing.T) {
// defer transp.conn.Close()

transp.sendEvents([]common.MapStr{common.MapStr{
"type": "test",
"message": "hello world",
}})

Expand Down Expand Up @@ -306,7 +307,11 @@ func testProtocolReturnWindowSizes(

events := []common.MapStr{}
for i := 0; i < n; i++ {
events = append(events, common.MapStr{"message": string(i)})
events = append(events,
common.MapStr{
"type": "test",
"message": string(i),
})
}

outEvents, err := transp.sendEvents(events)
Expand All @@ -328,7 +333,11 @@ func testProtocolReturnWindowSizes(

seq, err := transp.awaitACK(uint32(n))
assert.Equal(t, outEvents, events)
assert.Equal(t, docs, events)
assert.Equal(t, len(docs), len(events))
for i := range docs {
assert.Equal(t, docs[i]["type"], events[i]["type"])
assert.Equal(t, docs[i]["message"], events[i]["message"])
}
assert.Equal(t, n, int(seq))
assert.Equal(t, n, int(msg.size))
if expectErr {
Expand Down Expand Up @@ -367,7 +376,11 @@ func TestProtocolFailOnClosedConnection(t *testing.T) {

events := []common.MapStr{}
for i := 0; i < N; i++ {
events = append(events, common.MapStr{"message": i})
events = append(events,
common.MapStr{
"type": "test",
"message": i,
})
}

transp.conn.Close()
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func newLumberjackClient(
compressLevel int,
maxWindowSize int,
timeout time.Duration,
beat string,
) (*client, error) {
p, err := newClientProcol(conn, timeout, compressLevel)
p, err := newClientProcol(conn, timeout, compressLevel, beat)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (s *clientServer) connectPair(compressLevel int) (*mockConn, *client, error
}

lc, err := newLumberjackClient(transp, compressLevel,
defaultConfig.BulkMaxSize, 100*time.Millisecond)
defaultConfig.BulkMaxSize, 100*time.Millisecond,
"test")
if err != nil {
return nil, nil, err
}
Expand Down