Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of messages larger than 10MB #2470

Merged
merged 3 commits into from
Sep 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Packetbeat*
- Fix mapping for some Packetbeat flow metrics that were not marked as being longs. {issue}2177[2177]
- Fix handling of messages larger than the maximum message size (10MB). {pull}2470[2470]

*Topbeat*

Expand Down Expand Up @@ -101,6 +102,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]
- Add configuration option for the maximum message size. Used to be hard-coded to 10 MB. {pull}2470[2470]

*Topbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ information. If this header is present and contains a valid IP addresses, the
information is used for the `real_ip` and `client_location` indexed
fields.

===== max_message_size

If an individual HTTP message is larger than this setting (in bytes), it will be trimmed
to this size. Unless this value is very small (<1.5K), Packetbeat is able to still correctly
follow the transaction and create an event for it. The default is 10485760 (10 MB).


==== AMQP Configuration Options

Expand Down
4 changes: 4 additions & 0 deletions packetbeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ packetbeat.protocols.http:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s

# Maximum message size. If an HTTP message is larger than this, it will
# be trimmed to this size. Default is 10 MB.
#max_message_size: 10485760

packetbeat.protocols.memcache:
# Enable memcache monitoring. Default: true
#enabled: true
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ packetbeat.protocols.http:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s

# Maximum message size. If an HTTP message is larger than this, it will
# be trimmed to this size. Default is 10 MB.
#max_message_size: 10485760

packetbeat.protocols.memcache:
# Enable memcache monitoring. Default: true
#enabled: true
Expand Down
3 changes: 3 additions & 0 deletions packetbeat/protos/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

type httpConfig struct {
Expand All @@ -14,12 +15,14 @@ type httpConfig struct {
Include_body_for []string `config:"include_body_for"`
Hide_keywords []string `config:"hide_keywords"`
Redact_authorization bool `config:"redact_authorization"`
MaxMessageSize int `config:"max_message_size"`
}

var (
defaultConfig = httpConfig{
ProtocolCommon: config.ProtocolCommon{
TransactionTimeout: protos.DefaultTransactionExpiration,
},
MaxMessageSize: tcp.TCP_MAX_DATA_IN_STREAM,
}
)
19 changes: 12 additions & 7 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type HTTP struct {
HideKeywords []string
RedactAuthorization bool
IncludeBodyFor []string
MaxMessageSize int

parserConfig parserConfig

Expand Down Expand Up @@ -124,6 +125,7 @@ func (http *HTTP) setFromConfig(config *httpConfig) {
http.parserConfig.RealIPHeader = strings.ToLower(config.Real_ip_header)
http.transactionTimeout = config.TransactionTimeout
http.IncludeBodyFor = config.Include_body_for
http.MaxMessageSize = config.MaxMessageSize

if config.Send_all_headers {
http.parserConfig.SendHeaders = true
Expand Down Expand Up @@ -265,19 +267,21 @@ func (http *HTTP) doParse(
detailedf("Payload received: [%s]", pkt.Payload)
}

extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes

st := conn.Streams[dir]
if st == nil {
st = newStream(pkt, tcptuple)
conn.Streams[dir] = st
} else {
// concatenate bytes
st.data = append(st.data, pkt.Payload...)
if len(st.data) > tcp.TCP_MAX_DATA_IN_STREAM {
if len(st.data)+len(pkt.Payload) > http.MaxMessageSize {
if isDebug {
debugf("Stream data too large, dropping TCP stream")
debugf("Stream data too large, ignoring message")
}
conn.Streams[dir] = nil
return conn
extraMsgSize = len(pkt.Payload)
} else {
st.data = append(st.data, pkt.Payload...)
}
}

Expand All @@ -287,7 +291,7 @@ func (http *HTTP) doParse(
}

parser := newParser(&http.parserConfig)
ok, complete := parser.parse(st)
ok, complete := parser.parse(st, extraMsgSize)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
Expand Down Expand Up @@ -322,6 +326,7 @@ func newStream(pkt *protos.Packet, tcptuple *common.TcpTuple) *stream {
func (http *HTTP) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

debugf("Received FIN")
conn := getHTTPConnection(private)
if conn == nil {
return private
Expand Down Expand Up @@ -583,7 +588,7 @@ func parseCookieValue(raw string) string {
func (http *HTTP) extractBody(m *message) []byte {
body := []byte{}

if len(m.ContentType) == 0 || http.shouldIncludeInBody(m.ContentType) {
if len(m.ContentType) > 0 && http.shouldIncludeInBody(m.ContentType) {
if len(m.chunkedBody) > 0 {
body = append(body, m.chunkedBody...)
} else {
Expand Down
53 changes: 46 additions & 7 deletions packetbeat/protos/http/http_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type message struct {
Ts time.Time
hasContentLength bool
headerOffset int
bodyOffset int
version version
connection common.NetString
chunkedLength int
Expand Down Expand Up @@ -46,9 +45,10 @@ type message struct {

Notes []string

//Timing
start int
end int
//Offsets
start int
end int
bodyOffset int

next *message
}
Expand Down Expand Up @@ -87,9 +87,18 @@ func newParser(config *parserConfig) *parser {
return &parser{config: config}
}

func (parser *parser) parse(s *stream) (bool, bool) {
func (parser *parser) parse(s *stream, extraMsgSize int) (bool, bool) {
m := s.message

if extraMsgSize > 0 {
// A packet of extraMsgSize size was seen, but we don't have
// its actual bytes. This is only usable in the `stateBody` state.
if s.parseState != stateBody {
return false, false
}
return parser.eatBody(s, m, extraMsgSize)
}

for s.parseOffset < len(s.data) {
switch s.parseState {
case stateStart:
Expand Down Expand Up @@ -363,14 +372,14 @@ func (parser *parser) parseHeader(m *message, data []byte) (bool, bool, int) {

func (*parser) parseBody(s *stream, m *message) (ok, complete bool) {
if isDebug {
debugf("eat body: %d", s.parseOffset)
debugf("parseBody body: %d", s.parseOffset)
}
if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {

// HTTP/1.0 no content length. Add until the end of the connection
if isDebug {
debugf("close connection, %d", len(s.data)-s.parseOffset)
debugf("http conn close, received %d", len(s.data)-s.parseOffset)
}
s.bodyReceived += (len(s.data) - s.parseOffset)
m.ContentLength += (len(s.data) - s.parseOffset)
Expand All @@ -391,6 +400,36 @@ func (*parser) parseBody(s *stream, m *message) (ok, complete bool) {
}
}

// eatBody acts as if size bytes were received, without having access to
// those bytes.
func (*parser) eatBody(s *stream, m *message, size int) (ok, complete bool) {
if isDebug {
debugf("eatBody body: %d", s.parseOffset)
}
if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {

// HTTP/1.0 no content length. Add until the end of the connection
if isDebug {
debugf("http conn close, received %d", size)
}
s.bodyReceived += size
m.ContentLength += size
return true, false
} else if size >= m.ContentLength-s.bodyReceived {
s.bodyReceived += (m.ContentLength - s.bodyReceived)
m.end = s.parseOffset
m.Size = uint64(m.bodyOffset-m.start) + uint64(m.ContentLength)
return true, true
} else {
s.bodyReceived += size
if isDebug {
debugf("bodyReceived: %d", s.bodyReceived)
}
return true, false
}
}

func (*parser) parseBodyChunkedStart(s *stream, m *message) (cont, ok, complete bool) {
// read hexa length
i := bytes.Index(s.data[s.parseOffset:], constCRLF)
Expand Down
Loading