Skip to content

Commit

Permalink
x-pack/filebeat/input/awss3: add support for CSV decoding (#40896) (#…
Browse files Browse the repository at this point in the history
…41093)

The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz

(cherry picked from commit 939e8b1)

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
mergify[bot] and efd6 authored Oct 4, 2024
1 parent 8f62854 commit efa2484
Show file tree
Hide file tree
Showing 11 changed files with 1,041 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
- Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779]
- Add CSV decoder to awss3 input. {pull}40896[40896]

*Auditbeat*

Expand Down
69 changes: 56 additions & 13 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,71 @@ The file decoding option is used to specify a codec that will be used to
decode the file contents. This can apply to any file stream data.
An example config is shown below:

Currently supported codecs are given below:-

1. <<attrib-decoding-csv,CSV>>: This codec decodes RFC 4180 CSV data streams.
2. <<attrib-decoding-parquet,Parquet>>: This codec decodes parquet compressed data streams.

[id="attrib-decoding-csv"]
[float]
==== `the CSV codec`
The `CSV` codec is used to decode RFC 4180 CSV data streams.
Enabling the codec without other options will use the default codec options.

[source,yaml]
----
decoding.codec.parquet.enabled: true
----
decoding.codec.csv.enabled: true
----

The CSV codec supports five sub attributes to control aspects of CSV decoding.
The `comma` attribute specifies the field separator character used by the CSV
format. If it is not specified, the comma character '`,`' is used. The `comment`
attribute specifies the character that should be interpreted as a comment mark.
If it is specified, lines starting with the character will be ignored. Both
`comma` and `comment` must be single characters. The `lazy_quotes` attribute
controls how quoting in fields is handled. If `lazy_quotes` is true, a quote may
appear in an unquoted field and a non-doubled quote may appear in a quoted field.
The `trim_leading_space` attribute specifies that leading white space should be
ignored, even if the `comma` character is white space. For complete details
of the preceding configuration attribute behaviors, see the CSV decoder
https://pkg.go.dev/encoding/csv#Reader[documentation] The `fields_names`
attribute can be used to specify the column names for the data. If it is
absent, the field names are obtained from the first non-comment line of
data. The number of fields must match the number of field names.

Currently supported codecs are given below:-
An example config is shown below:

1. <<attrib-decoding-parquet,Parquet>>: This codec decodes parquet compressed data streams.
[source,yaml]
----
decoding.codec.csv.enabled: true
decoding.codec.csv.comma: "\t"
decoding.codec.csv.comment: "#"
----

[id="attrib-decoding-parquet"]
[float]
==== `the parquet codec`
The `parquet` codec is used to decode parquet compressed data streams.
Only enabling the codec will use the default codec options. The parquet codec supports
two sub attributes which can make parquet decoding more efficient. The `batch_size` attribute and
the `process_parallel` attribute. The `batch_size` attribute can be used to specify the number of
records to read from the parquet stream at a time. By default the `batch size` is set to `1` and
`process_parallel` is set to `false`. If the `process_parallel` attribute is set to `true` then functions
which read multiple columns will read those columns in parallel from the parquet stream with a
number of readers equal to the number of columns. Setting `process_parallel` to `true` will greatly
increase the rate of processing at the cost of increased memory usage. Having a larger `batch_size`
also helps to increase the rate of processing. An example config is shown below:
Only enabling the codec will use the default codec options.

[source,yaml]
----
decoding.codec.parquet.enabled: true
----

The parquet codec supports two sub attributes which can make parquet decoding
more efficient. The `batch_size` attribute and the `process_parallel`
attribute. The `batch_size` attribute can be used to specify the number of
records to read from the parquet stream at a time. By default the `batch
size` is set to `1` and `process_parallel` is set to `false`. If the
`process_parallel` attribute is set to `true` then functions which read
multiple columns will read those columns in parallel from the parquet stream
with a number of readers equal to the number of columns. Setting
`process_parallel` to `true` will greatly increase the rate of processing at
the cost of increased memory usage. Having a larger `batch_size` also helps
to increase the rate of processing.

An example config is shown below:

[source,yaml]
----
Expand Down
53 changes: 10 additions & 43 deletions x-pack/filebeat/input/awss3/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ package awss3
import (
"fmt"
"io"

"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet"
)

// decoder is an interface for decoding data from an io reader.
// decoder is an interface for decoding data from an io.Reader.
type decoder interface {
// decode reads and decodes data from an io reader based on the codec type.
// It returns the decoded data and an error if the data cannot be decoded.
Expand All @@ -23,6 +21,13 @@ type decoder interface {
close() error
}

// valueDecoder is a decoder that can decode directly to a JSON serialisable value.
type valueDecoder interface {
decoder

decodeValue() (any, error)
}

// newDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
Expand All @@ -32,47 +37,9 @@ func newDecoder(config decoderConfig, r io.Reader) (decoder, error) {
return nil, nil
case config.Codec.Parquet != nil:
return newParquetDecoder(config, r)
case config.Codec.CSV != nil:
return newCSVDecoder(config, r)
default:
return nil, fmt.Errorf("unsupported config value: %v", config)
}
}

// parquetDecoder is a decoder for parquet data.
type parquetDecoder struct {
reader *parquet.BufferedReader
}

// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood.
// It returns an error if the parquet reader cannot be created.
func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) {
reader, err := parquet.NewBufferedReader(r, &parquet.Config{
ProcessParallel: config.Codec.Parquet.ProcessParallel,
BatchSize: config.Codec.Parquet.BatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to create parquet decoder: %w", err)
}
return &parquetDecoder{
reader: reader,
}, nil
}

// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded.
func (pd *parquetDecoder) next() bool {
return pd.reader.Next()
}

// decode reads and decodes a parquet data stream. After reading the parquet data it decodes
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded.
func (pd *parquetDecoder) decode() ([]byte, error) {
data, err := pd.reader.Record()
if err != nil {
return nil, err
}
return data, nil
}

// close closes the parquet decoder and releases the resources.
func (pd *parquetDecoder) close() error {
return pd.reader.Close()
}
48 changes: 48 additions & 0 deletions x-pack/filebeat/input/awss3/decoding_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package awss3

import (
"errors"
"fmt"
"unicode/utf8"
)

// decoderConfig contains the configuration options for instantiating a decoder.
type decoderConfig struct {
Codec *codecConfig `config:"codec"`
Expand All @@ -12,6 +18,48 @@ type decoderConfig struct {
// codecConfig contains the configuration options for different codecs used by a decoder.
type codecConfig struct {
Parquet *parquetCodecConfig `config:"parquet"`
CSV *csvCodecConfig `config:"csv"`
}

func (c *codecConfig) Validate() error {
if c.Parquet != nil && c.CSV != nil {
return errors.New("more than one decoder configured")
}
return nil
}

// csvCodecConfig contains the configuration options for the CSV codec.
type csvCodecConfig struct {
Enabled bool `config:"enabled"`

// Fields is the set of field names. If it is present
// it is used to specify the object names of returned
// values and the FieldsPerRecord field in the csv.Reader.
// Otherwise, names are obtained from the first
// line of the CSV data.
Fields []string `config:"fields_names"`

// The fields below have the same meaning as the
// fields of the same name in csv.Reader.
Comma *configRune `config:"comma"`
Comment configRune `config:"comment"`
LazyQuotes bool `config:"lazy_quotes"`
TrimLeadingSpace bool `config:"trim_leading_space"`
}

type configRune rune

func (r *configRune) Unpack(s string) error {
if s == "" {
return nil
}
n := utf8.RuneCountInString(s)
if n != 1 {
return fmt.Errorf("single character option given more than one character: %q", s)
}
_r, _ := utf8.DecodeRuneInString(s)
*r = configRune(_r)
return nil
}

// parquetCodecConfig contains the configuration options for the parquet codec.
Expand Down
95 changes: 95 additions & 0 deletions x-pack/filebeat/input/awss3/decoding_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"slices"
)

// csvDecoder is a decoder for CSV data.
type csvDecoder struct {
r *csv.Reader

header []string
current []string

err error
}

// newParquetDecoder creates a new CSV decoder.
func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) {
d := csvDecoder{r: csv.NewReader(r)}
d.r.ReuseRecord = true
if config.Codec.CSV.Comma != nil {
d.r.Comma = rune(*config.Codec.CSV.Comma)
}
d.r.Comment = rune(config.Codec.CSV.Comment)
d.r.LazyQuotes = config.Codec.CSV.LazyQuotes
d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace
if len(config.Codec.CSV.Fields) != 0 {
d.r.FieldsPerRecord = len(config.Codec.CSV.Fields)
d.header = config.Codec.CSV.Fields
} else {
h, err := d.r.Read()
if err != nil {
return nil, err
}
d.header = slices.Clone(h)
}
return &d, nil
}

// next advances the decoder to the next data item and returns true if
// there is more data to be decoded.
func (d *csvDecoder) next() bool {
if d.err != nil {
return false
}
d.current, d.err = d.r.Read()
return d.err == nil
}

// decode returns the JSON encoded value of the current CSV line. next must
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
v, err := d.decodeValue()
if err != nil {
return nil, err
}
return json.Marshal(v)
}

// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() (any, error) {
if d.err != nil {
return nil, d.err
}
if len(d.current) == 0 {
return nil, fmt.Errorf("decode called before next")
}
m := make(map[string]string, len(d.header))
// By the time we are here, current must be the same
// length as header; if it was not read, it would be
// zero, but if it was, it must match by the contract
// of the csv.Reader.
for i, n := range d.header {
m[n] = d.current[i]
}
return m, nil
}

// close closes the parquet decoder and releases the resources.
func (d *csvDecoder) close() error {
if d.err == io.EOF {
return nil
}
return d.err
}
52 changes: 52 additions & 0 deletions x-pack/filebeat/input/awss3/decoding_parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"fmt"
"io"

"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet"
)

// parquetDecoder is a decoder for parquet data.
type parquetDecoder struct {
reader *parquet.BufferedReader
}

// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood.
// It returns an error if the parquet reader cannot be created.
func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) {
reader, err := parquet.NewBufferedReader(r, &parquet.Config{
ProcessParallel: config.Codec.Parquet.ProcessParallel,
BatchSize: config.Codec.Parquet.BatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to create parquet decoder: %w", err)
}
return &parquetDecoder{
reader: reader,
}, nil
}

// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded.
func (pd *parquetDecoder) next() bool {
return pd.reader.Next()
}

// decode reads and decodes a parquet data stream. After reading the parquet data it decodes
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded.
func (pd *parquetDecoder) decode() ([]byte, error) {
data, err := pd.reader.Record()
if err != nil {
return nil, err
}
return data, nil
}

// close closes the parquet decoder and releases the resources.
func (pd *parquetDecoder) close() error {
return pd.reader.Close()
}
Loading

0 comments on commit efa2484

Please sign in to comment.