Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] add RFC6587 framing support #23724

Merged
merged 1 commit into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added `encode_as` and `decode_as` options to httpjson along with pluggable encoders/decoders {pull}23478[23478]
- Added `application/x-ndjson` as decode option for httpjson input {pull}23521[23521]
- Added `application/x-www-form-urlencoded` as encode option for httpjson input {pull}23521[23521]
- Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724]

*Heartbeat*

Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-common-tcp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ The maximum size of the message received over TCP. The default is `20MiB`.

The host and TCP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-tcp-framing"]
==== `framing`

Specify the framing used to split incoming events. Can be one of
`delimiter` or `rfc6587`. `delimiter` uses the characters specified
in `line_delimiter` to split the incoming events. `rfc6587` supports
octet counting and non-transparent framing as described in
https://tools.ietf.org/html/rfc6587[RFC6587]. `line_delimiter` is
used to split the events in non-transparent framing. The default is `delimiter`.

[float]
[id="{beatname_lc}-input-{type}-tcp-line-delimiter"]
==== `line_delimiter`
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ The file mode of the Unix socket that will be created by Filebeat. This is
expected to be a file mode as an octal string. The default value is the system
default (generally `0755`).

[float]
[id="{beatname_lc}-input-{type}-unix-framing"]
==== `framing`

Specify the framing used to split incoming events. Can be one of
`delimiter` or `rfc6587`. `delimiter` uses the characters specified
in `line_delimiter` to split the incoming events. `rfc6587` supports
octet counting and non-transparent framing as described in
https://tools.ietf.org/html/rfc6587[RFC6587]. `line_delimiter` is
used to split the events in non-transparent framing. The default is `delimiter`.

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ var defaultConfig = config{

type syslogTCP struct {
tcp.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Framing streaming.FramingType `config:"framing"`
}

var defaultTCP = syslogTCP{
Expand Down Expand Up @@ -90,9 +91,9 @@ func factory(
return nil, err
}

splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter))
if err != nil {
return nil, err
}

logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
)

type config struct {
tcp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Framing streaming.FramingType `config:"framing"`
}

var defaultConfig = config{
Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package tcp

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -75,9 +74,9 @@ func NewInput(
forwarder.Send(event)
}

splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter))
if err != nil {
return nil, err
}

logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
Expand Down
61 changes: 51 additions & 10 deletions filebeat/inputsource/common/streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"net"
"sync"

Expand All @@ -47,6 +48,21 @@ type Listener struct {
listenerFactory ListenerFactory
}

// FramingType are supported framing options for the SplitFunc
type FramingType int

const (
FramingDelimiter = iota
FramingRFC6587
)

var (
framingTypes = map[string]FramingType{
"delimiter": FramingDelimiter,
"rfc6587": FramingRFC6587,
}
)

// NewListener creates a new Listener
func NewListener(family inputsource.Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
Expand Down Expand Up @@ -168,18 +184,43 @@ func (l *Listener) unregisterHandler() {
l.clientsCount.Dec()
}

// SplitFunc allows to create a `bufio.SplitFunc` based on a delimiter provided.
func SplitFunc(lineDelimiter []byte) bufio.SplitFunc {
// SplitFunc allows to create a `bufio.SplitFunc` based on a framing &
// delimiter provided.
func SplitFunc(framing FramingType, lineDelimiter []byte) (bufio.SplitFunc, error) {
if len(lineDelimiter) == 0 {
return nil
return nil, fmt.Errorf("line delimiter required")
}
switch framing {
case FramingDelimiter:
// This will work for most usecases and will also
// strip \r if present. CustomDelimiter, need to
// match completely and the delimiter will be
// completely removed from the returned byte slice
if bytes.Equal(lineDelimiter, []byte("\n")) {
return bufio.ScanLines, nil
}
return FactoryDelimiter(lineDelimiter), nil
case FramingRFC6587:
return FactoryRFC6587Framing(lineDelimiter), nil
default:
return nil, fmt.Errorf("unknown SplitFunc for framing %d and line delimiter %s", framing, string(lineDelimiter))
}

}

// Unpack for config
func (f *FramingType) Unpack(value string) error {
ft, ok := framingTypes[value]
if !ok {
availableTypes := make([]string, len(framingTypes))
i := 0
for t := range framingTypes {
availableTypes[i] = t
i++
}
return fmt.Errorf("invalid framing type '%s', supported types: %v", value, availableTypes)

ld := []byte(lineDelimiter)
if bytes.Equal(ld, []byte("\n")) {
// This will work for most usecases and will also strip \r if present.
// CustomDelimiter, need to match completely and the delimiter will be completely removed from
// the returned byte slice
return bufio.ScanLines
}
return FactoryDelimiter(ld)
*f = ft
return nil
}
41 changes: 41 additions & 0 deletions filebeat/inputsource/common/streaming/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package streaming
import (
"bufio"
"bytes"
"strconv"
)

// FactoryDelimiter return a function to split line using a custom delimiter supporting multibytes
Expand All @@ -46,3 +47,43 @@ func dropDelimiter(data []byte, delimiter []byte) []byte {
}
return data
}

// FactoryRFC6587Framing returns a function that splits based on octet
// counting or non-transparent framing as defined in RFC6587. Allows
// for custom delimter for non-transparent framing.
func FactoryRFC6587Framing(delimiter []byte) bufio.SplitFunc {
return func(data []byte, eof bool) (int, []byte, error) {
if eof && len(data) == 0 {
return 0, nil, nil
}
// need at least one character to see if octet or
// non transparent framing
if len(data) <= 1 {
return 0, nil, nil
}
// It can be assumed that octet-counting framing is
// used if a syslog frame starts with a digit RFC6587
if bytes.ContainsAny(data[0:1], "0123456789") {
if i := bytes.IndexByte(data, ' '); i > 0 {
length, err := strconv.Atoi(string(data[0:i]))
if err != nil {
return 0, nil, err
}
end := length + i + 1
if len(data) >= end {
return end, data[i+1 : end], nil
}
}
//request more data
return 0, nil, nil
}
if i := bytes.Index(data, delimiter); i >= 0 {
return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil
}
if eof {
return len(data), dropDelimiter(data, delimiter), nil
}
// request more data
return 0, nil, nil
}
}
72 changes: 72 additions & 0 deletions filebeat/inputsource/common/streaming/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,75 @@ func TestCustomDelimiter(t *testing.T) {
})
}
}

func TestOctetCounting(t *testing.T) {
tests := []struct {
name string
input string
expected []string
delimiter []byte
}{
{
name: "non-transparent",
input: "<9> message 0\n<6> msg 1\n<3> message 2",
expected: []string{
"<9> message 0",
"<6> msg 1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
{
name: "octet counting",
input: "13 <9> message 09 <6> msg 113 <3> message 2",
expected: []string{
"<9> message 0",
"<6> msg 1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
{
name: "octet counting, embedded newline",
input: "14 <9> message \n010 <6> msg \n114 <3> message \n2",
expected: []string{
"<9> message \n0",
"<6> msg \n1",
"<3> message \n2",
},
delimiter: []byte("\n"),
},
{
name: "octet, non-transparent, octet",
input: "14 <9> message \n0<6> msg 1\n14 <3> message \n2",
expected: []string{
"<9> message \n0",
"<6> msg 1",
"<3> message \n2",
},
delimiter: []byte("\n"),
},
{
name: "non-transparent, octet, non-transparent",
input: "<9> message 0\n10 <6> msg \n1<3> message 2",
expected: []string{
"<9> message 0",
"<6> msg \n1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
buf := strings.NewReader(test.input)
scanner := bufio.NewScanner(buf)
scanner.Split(FactoryRFC6587Framing(test.delimiter))
var elements []string
for scanner.Scan() {
elements = append(elements, scanner.Text())
}
assert.EqualValues(t, test.expected, elements)
})
}
}
Loading