diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 430a29c74a6..aac53fc2ec4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -320,6 +320,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191] - Add support for username in cisco asa security negotiation logs {pull}26975[26975] - Relax time parsing and capture group and session type in Cisco ASA module {issue}24710[24710] {pull}28325[28325] +- Correctly track bytes read when max_bytes is exceeded. {issue}28317[28317] {pull}28352[28352] *Heartbeat* diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 78331a7d246..d2db172706b 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -75,8 +75,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { }, nil } -// Next reads the next line until the new line character -func (r *LineReader) Next() ([]byte, int, error) { +// Next reads the next line until the new line character. The return +// value b is the byte slice that contains the next line. The return +// value n is the number of bytes that were consumed from the +// underlying reader to read the next line. If the LineReader is +// configured with maxBytes n may be larger than the length of b due +// to skipped lines. +func (r *LineReader) Next() (b []byte, n int, err error) { // This loop is need in case advance detects an line ending which turns out // not to be one when decoded. If that is the case, reading continues. for { @@ -162,6 +167,7 @@ func (r *LineReader) advance() error { for idx != -1 && idx > r.maxBytes { r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) err = r.inBuffer.Advance(idx + len(r.nl)) + r.byteCount += idx + len(r.nl) r.inBuffer.Reset() r.inOffset = 0 idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) @@ -175,6 +181,7 @@ func (r *LineReader) advance() error { return err } r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped) + r.byteCount += skipped idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) } } diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index e198a7a342e..17fdfcf1039 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -366,7 +366,7 @@ func TestMaxBytesLimit(t *testing.T) { // Read decodec lines and test var idx int for i := 0; ; i++ { - b, n, err := reader.Next() + b, _, err := reader.Next() if err != nil { if err == io.EOF { break @@ -387,12 +387,7 @@ func TestMaxBytesLimit(t *testing.T) { break } - gotLen := n - len(nl) s := string(b[:len(b)-len(nl)]) - if len(line) != gotLen { - t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen) - } - if line != s { t.Fatalf("lines do not match, expected: %s got: %s", line, s) }