Skip to content

Commit

Permalink
[Filebeat] add RFC6587 framing support (elastic#23724) (elastic#23784)
Browse files Browse the repository at this point in the history
- Adds new config option "framing"
- supported options are "delimiter" & rfc6587
- delimiter is current option of newline or custom character(s)
  delimiter
- rfc6587 adds support for octet counting and non-transparent framing
  as described in RFC6587
- rfc6587 supports changing of framing on a frame by frame basis
- Default is "delimiter"

Closes elastic#23663

(cherry picked from commit 5cb370e)
  • Loading branch information
leehinman authored Feb 1, 2021
1 parent bda819f commit dbd431c
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added `alternative_host` option to google pubsub input {pull}23215[23215]
- Added username parsing from Cisco ASA message 302013. {pull}21196[21196]
- Added `encode_as` and `decode_as` options to httpjson along with pluggable encoders/decoders {pull}23478[23478]
- Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724]

*Heartbeat*

Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-common-tcp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ The maximum size of the message received over TCP. The default is `20MiB`.

The host and TCP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-tcp-framing"]
==== `framing`

Specify the framing used to split incoming events. Can be one of
`delimiter` or `rfc6587`. `delimiter` uses the characters specified
in `line_delimiter` to split the incoming events. `rfc6587` supports
octet counting and non-transparent framing as described in
https://tools.ietf.org/html/rfc6587[RFC6587]. `line_delimiter` is
used to split the events in non-transparent framing. The default is `delimiter`.

[float]
[id="{beatname_lc}-input-{type}-tcp-line-delimiter"]
==== `line_delimiter`
Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ The file mode of the Unix socket that will be created by Filebeat. This is
expected to be a file mode as an octal string. The default value is the system
default (generally `0755`).

[float]
[id="{beatname_lc}-input-{type}-unix-framing"]
==== `framing`

Specify the framing used to split incoming events. Can be one of
`delimiter` or `rfc6587`. `delimiter` uses the characters specified
in `line_delimiter` to split the incoming events. `rfc6587` supports
octet counting and non-transparent framing as described in
https://tools.ietf.org/html/rfc6587[RFC6587]. `line_delimiter` is
used to split the events in non-transparent framing. The default is `delimiter`.

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ var defaultConfig = config{

type syslogTCP struct {
tcp.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Framing streaming.FramingType `config:"framing"`
}

var defaultTCP = syslogTCP{
Expand Down Expand Up @@ -90,9 +91,9 @@ func factory(
return nil, err
}

splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter))
if err != nil {
return nil, err
}

logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
)

type config struct {
tcp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Framing streaming.FramingType `config:"framing"`
}

var defaultConfig = config{
Expand Down
7 changes: 3 additions & 4 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package tcp

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -75,9 +74,9 @@ func NewInput(
forwarder.Send(event)
}

splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
splitFunc, err := streaming.SplitFunc(config.Framing, []byte(config.LineDelimiter))
if err != nil {
return nil, err
}

logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
Expand Down
61 changes: 51 additions & 10 deletions filebeat/inputsource/common/streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"net"
"sync"

Expand All @@ -47,6 +48,21 @@ type Listener struct {
listenerFactory ListenerFactory
}

// FramingType are supported framing options for the SplitFunc
type FramingType int

const (
FramingDelimiter = iota
FramingRFC6587
)

var (
framingTypes = map[string]FramingType{
"delimiter": FramingDelimiter,
"rfc6587": FramingRFC6587,
}
)

// NewListener creates a new Listener
func NewListener(family inputsource.Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
Expand Down Expand Up @@ -176,18 +192,43 @@ func (l *Listener) unregisterHandler() {
l.clientsCount.Dec()
}

// SplitFunc allows to create a `bufio.SplitFunc` based on a delimiter provided.
func SplitFunc(lineDelimiter []byte) bufio.SplitFunc {
// SplitFunc allows to create a `bufio.SplitFunc` based on a framing &
// delimiter provided.
func SplitFunc(framing FramingType, lineDelimiter []byte) (bufio.SplitFunc, error) {
if len(lineDelimiter) == 0 {
return nil
return nil, fmt.Errorf("line delimiter required")
}
switch framing {
case FramingDelimiter:
// This will work for most usecases and will also
// strip \r if present. CustomDelimiter, need to
// match completely and the delimiter will be
// completely removed from the returned byte slice
if bytes.Equal(lineDelimiter, []byte("\n")) {
return bufio.ScanLines, nil
}
return FactoryDelimiter(lineDelimiter), nil
case FramingRFC6587:
return FactoryRFC6587Framing(lineDelimiter), nil
default:
return nil, fmt.Errorf("unknown SplitFunc for framing %d and line delimiter %s", framing, string(lineDelimiter))
}

}

// Unpack for config
func (f *FramingType) Unpack(value string) error {
ft, ok := framingTypes[value]
if !ok {
availableTypes := make([]string, len(framingTypes))
i := 0
for t := range framingTypes {
availableTypes[i] = t
i++
}
return fmt.Errorf("invalid framing type '%s', supported types: %v", value, availableTypes)

ld := []byte(lineDelimiter)
if bytes.Equal(ld, []byte("\n")) {
// This will work for most usecases and will also strip \r if present.
// CustomDelimiter, need to match completely and the delimiter will be completely removed from
// the returned byte slice
return bufio.ScanLines
}
return FactoryDelimiter(ld)
*f = ft
return nil
}
41 changes: 41 additions & 0 deletions filebeat/inputsource/common/streaming/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package streaming
import (
"bufio"
"bytes"
"strconv"
)

// FactoryDelimiter return a function to split line using a custom delimiter supporting multibytes
Expand All @@ -46,3 +47,43 @@ func dropDelimiter(data []byte, delimiter []byte) []byte {
}
return data
}

// FactoryRFC6587Framing returns a function that splits based on octet
// counting or non-transparent framing as defined in RFC6587. Allows
// for custom delimter for non-transparent framing.
func FactoryRFC6587Framing(delimiter []byte) bufio.SplitFunc {
return func(data []byte, eof bool) (int, []byte, error) {
if eof && len(data) == 0 {
return 0, nil, nil
}
// need at least one character to see if octet or
// non transparent framing
if len(data) <= 1 {
return 0, nil, nil
}
// It can be assumed that octet-counting framing is
// used if a syslog frame starts with a digit RFC6587
if bytes.ContainsAny(data[0:1], "0123456789") {
if i := bytes.IndexByte(data, ' '); i > 0 {
length, err := strconv.Atoi(string(data[0:i]))
if err != nil {
return 0, nil, err
}
end := length + i + 1
if len(data) >= end {
return end, data[i+1 : end], nil
}
}
//request more data
return 0, nil, nil
}
if i := bytes.Index(data, delimiter); i >= 0 {
return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil
}
if eof {
return len(data), dropDelimiter(data, delimiter), nil
}
// request more data
return 0, nil, nil
}
}
72 changes: 72 additions & 0 deletions filebeat/inputsource/common/streaming/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,75 @@ func TestCustomDelimiter(t *testing.T) {
})
}
}

func TestOctetCounting(t *testing.T) {
tests := []struct {
name string
input string
expected []string
delimiter []byte
}{
{
name: "non-transparent",
input: "<9> message 0\n<6> msg 1\n<3> message 2",
expected: []string{
"<9> message 0",
"<6> msg 1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
{
name: "octet counting",
input: "13 <9> message 09 <6> msg 113 <3> message 2",
expected: []string{
"<9> message 0",
"<6> msg 1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
{
name: "octet counting, embedded newline",
input: "14 <9> message \n010 <6> msg \n114 <3> message \n2",
expected: []string{
"<9> message \n0",
"<6> msg \n1",
"<3> message \n2",
},
delimiter: []byte("\n"),
},
{
name: "octet, non-transparent, octet",
input: "14 <9> message \n0<6> msg 1\n14 <3> message \n2",
expected: []string{
"<9> message \n0",
"<6> msg 1",
"<3> message \n2",
},
delimiter: []byte("\n"),
},
{
name: "non-transparent, octet, non-transparent",
input: "<9> message 0\n10 <6> msg \n1<3> message 2",
expected: []string{
"<9> message 0",
"<6> msg \n1",
"<3> message 2",
},
delimiter: []byte("\n"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
buf := strings.NewReader(test.input)
scanner := bufio.NewScanner(buf)
scanner.Split(FactoryRFC6587Framing(test.delimiter))
var elements []string
for scanner.Scan() {
elements = append(elements, scanner.Text())
}
assert.EqualValues(t, test.expected, elements)
})
}
}
Loading

0 comments on commit dbd431c

Please sign in to comment.