Skip to content

Commit

Permalink
x-pack/filebeat/input/gcs: add support for CSV decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 25, 2024
1 parent 4380b9d commit 5ebbb6f
Show file tree
Hide file tree
Showing 13 changed files with 1,169 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
- Add CSV decoding capacity to gcs input {pull}[]


*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/reader/parser"
)

// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON, FileSelectors, TimeStampEpoch & ExpandEventListFromField
Expand All @@ -41,6 +42,8 @@ type config struct {
Buckets []bucket `config:"buckets" validate:"required"`
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket.
FileSelectors []fileSelectorConfig `config:"file_selectors"`
// ReaderConfig is the default parser and decoder configuration.
ReaderConfig readerConfig `config:",inline"`
// TimeStampEpoch - Defines the epoch time in seconds, which is used to filter out objects that are older than the specified timestamp.
TimeStampEpoch *int64 `config:"timestamp_epoch"`
// ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events.
Expand All @@ -58,6 +61,7 @@ type bucket struct {
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}
Expand All @@ -68,6 +72,12 @@ type fileSelectorConfig struct {
// TODO: Add support for reader config in future
}

// readerConfig defines the options for reading the content of an GCS object.
type readerConfig struct {
Parsers parser.Config `config:",inline"`
Decoding decoderConfig `config:"decoding"`
}

type authConfig struct {
CredentialsJSON *jsonCredentialsConfig `config:"credentials_json,omitempty"`
CredentialsFile *fileCredentialsConfig `config:"credentials_file,omitempty"`
Expand Down
47 changes: 47 additions & 0 deletions x-pack/filebeat/input/gcs/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gcs

import (
"fmt"
"io"
)

// decoder is an interface for decoding data from an io.Reader.
type decoder interface {
// decode reads and decodes data from an io reader based on the codec type.
// It returns the decoded data and an error if the data cannot be decoded.
decode() ([]byte, error)
// next advances the decoder to the next data item and returns true if there is more data to be decoded.
next() bool
// close closes the decoder and releases any resources associated with it.
// It returns an error if the decoder cannot be closed.

// more returns whether there are more records to read.
more() bool

close() error
}

// valueDecoder is a decoder that can decode directly to a JSON serialisable value.
type valueDecoder interface {
decoder

decodeValue() ([]byte, map[string]any, error)
}

// newDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) {
switch {
case cfg.Codec == nil:
return nil, nil
case cfg.Codec.CSV != nil:
return newCSVDecoder(cfg, r)
default:
return nil, fmt.Errorf("unsupported config value: %v", cfg)
}
}
54 changes: 54 additions & 0 deletions x-pack/filebeat/input/gcs/decoding_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gcs

import (
"fmt"
"unicode/utf8"
)

// decoderConfig contains the configuration options for instantiating a decoder.
type decoderConfig struct {
Codec *codecConfig `config:"codec"`
}

// codecConfig contains the configuration options for different codecs used by a decoder.
type codecConfig struct {
CSV *csvCodecConfig `config:"csv"`
}

// csvCodecConfig contains the configuration options for the CSV codec.
type csvCodecConfig struct {
Enabled bool `config:"enabled"`

// Fields is the set of field names. If it is present
// it is used to specify the object names of returned
// values and the FieldsPerRecord field in the csv.Reader.
// Otherwise, names are obtained from the first
// line of the CSV data.
Fields []string `config:"fields_names"`

// The fields below have the same meaning as the
// fields of the same name in csv.Reader.
Comma *configRune `config:"comma"`
Comment configRune `config:"comment"`
LazyQuotes bool `config:"lazy_quotes"`
TrimLeadingSpace bool `config:"trim_leading_space"`
}

type configRune rune

func (r *configRune) Unpack(s string) error {
if s == "" {
return nil
}
n := utf8.RuneCountInString(s)
if n != 1 {
return fmt.Errorf("single character option given more than one character: %q", s)
}
_r, _ := utf8.DecodeRuneInString(s)
*r = configRune(_r)
return nil
}
139 changes: 139 additions & 0 deletions x-pack/filebeat/input/gcs/decoding_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package gcs

import (
"bytes"
"encoding/csv"
"fmt"
"io"
"slices"
)

// csvDecoder is a decoder for CSV data.
type csvDecoder struct {
r *csv.Reader

header []string
current []string
coming []string

err error
}

// newParquetDecoder creates a new CSV decoder.
func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) {
d := csvDecoder{r: csv.NewReader(r)}
d.r.ReuseRecord = true
if config.Codec.CSV.Comma != nil {
d.r.Comma = rune(*config.Codec.CSV.Comma)
}
d.r.Comment = rune(config.Codec.CSV.Comment)
d.r.LazyQuotes = config.Codec.CSV.LazyQuotes
d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace
if len(config.Codec.CSV.Fields) != 0 {
d.r.FieldsPerRecord = len(config.Codec.CSV.Fields)
d.header = config.Codec.CSV.Fields
} else {
h, err := d.r.Read()
if err != nil {
return nil, err
}
d.header = slices.Clone(h)
}
var err error
d.coming, err = d.r.Read()
if err != nil {
return nil, err
}
d.current = make([]string, 0, len(d.header))
return &d, nil
}

func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) }

// next advances the decoder to the next data item and returns true if
// there is more data to be decoded.
func (d *csvDecoder) next() bool {
if !d.more() && d.err != nil {
return false
}
d.current = d.current[:len(d.header)]
copy(d.current, d.coming)
d.coming, d.err = d.r.Read()
if d.err == io.EOF {

Check failure on line 66 in x-pack/filebeat/input/gcs/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
d.coming = nil
}
return true
}

// decode returns the JSON encoded value of the current CSV line. next must
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
err := d.check()
if err != nil {
return nil, err
}
var buf bytes.Buffer
buf.WriteByte('{')
for i, n := range d.header {
if i != 0 {
buf.WriteByte(',')
}
buf.WriteByte('"')
buf.WriteString(n)
buf.WriteString(`":"`)
buf.WriteString(d.current[i])
buf.WriteByte('"')
}
buf.WriteByte('}')
d.current = d.current[:0]
return buf.Bytes(), nil
}

// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) {
err := d.check()
if err != nil {
return nil, nil, err
}
m := make(map[string]any, len(d.header))
for i, n := range d.header {
m[n] = d.current[i]
}
d.current = d.current[:0]
b, err := d.decode()
if err != nil {
return nil, nil, err
}
return b, m, nil
}

func (d *csvDecoder) check() error {
if d.err != nil {
if d.err == io.EOF && d.coming == nil {

Check failure on line 118 in x-pack/filebeat/input/gcs/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
return d.err
}
if len(d.current) == 0 {
return fmt.Errorf("decode called before next")
}
// By the time we are here, current must be the same
// length as header; if it was not read, it would be
// zero, but if it was, it must match by the contract
// of the csv.Reader.
return nil
}

// close closes the parquet decoder and releases the resources.
func (d *csvDecoder) close() error {
if d.err == io.EOF {

Check failure on line 135 in x-pack/filebeat/input/gcs/decoding_csv.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil
}
return d.err
}
Loading

0 comments on commit 5ebbb6f

Please sign in to comment.