From 9364081c013e97df0b74af44b0dc5197a5290248 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 19 Sep 2024 09:45:47 +0930 Subject: [PATCH] x-pack/filebeat/input/awss3: add support for CSV decoding The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 69 +- x-pack/filebeat/input/awss3/decoding.go | 53 +- .../filebeat/input/awss3/decoding_config.go | 40 ++ x-pack/filebeat/input/awss3/decoding_csv.go | 95 +++ .../filebeat/input/awss3/decoding_parquet.go | 52 ++ x-pack/filebeat/input/awss3/decoding_test.go | 148 ++++- x-pack/filebeat/input/awss3/s3_objects.go | 37 +- x-pack/filebeat/input/awss3/testdata/txn.csv | 5 + .../filebeat/input/awss3/testdata/txn.csv.gz | Bin 0 -> 2527 bytes x-pack/filebeat/input/awss3/testdata/txn.json | 594 ++++++++++++++++++ 11 files changed, 1022 insertions(+), 72 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/decoding_csv.go create mode 100644 x-pack/filebeat/input/awss3/decoding_parquet.go create mode 100644 x-pack/filebeat/input/awss3/testdata/txn.csv create mode 100644 x-pack/filebeat/input/awss3/testdata/txn.csv.gz create mode 100644 x-pack/filebeat/input/awss3/testdata/txn.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bd2b258c63f3..70930c69dcb1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -300,6 +300,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662] - Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580] - Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301] +- Add CSV decoder to awss3 input. {pull}[] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 9fa118721512..43d4b102f639 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -123,28 +123,71 @@ The file decoding option is used to specify a codec that will be used to decode the file contents. This can apply to any file stream data. An example config is shown below: +Currently supported codecs are given below:- + + 1. <>: This codec decodes RFC 4180 CSV data streams. + 2. <>: This codec decodes parquet compressed data streams. + +[id="attrib-decoding-csv"] +[float] +==== `the CSV codec` +The `CSV` codec is used to decode RFC 4180 CSV data streams. +Enabling the codec without other options will use the default codec options. + [source,yaml] ---- - decoding.codec.parquet.enabled: true ----- + decoding.codec.csv.enabled: true +---- + +The CSV codec supports five sub attributes to control aspects of CSV decoding. +The `comma` attribute specifies the field separator character used by the CSV +format. If it is not specified, the comma character '`,`' is used. The `comment` +attribute specifies the character that should be interpreted as a comment mark. +If it is specified, lines starting with the character will be ignored. Both +`comma` and `comment` must be single characters. The `lazy_quotes` attribute +controls how quoting in fields is handled. If `lazy_quotes` is true, a quote may +appear in an unquoted field and a non-doubled quote may appear in a quoted field. +The `trim_leading_space` attribute specifies that leading white space should be +ignored, even if the `comma` character is white space. For complete details +of the preceding configuration attribute behaviors, see the CSV decoder +https://pkg.go.dev/encoding/csv#Reader[documentation] The `fields_names` +attribute can be used to specify the column names for the data. If it is +absent, the field names are obtained from the first non-comment line of +data. The number of fields must match the number of field names. -Currently supported codecs are given below:- +An example config is shown below: - 1. <>: This codec decodes parquet compressed data streams. +[source,yaml] +---- + decoding.codec.csv.enabled: true + decoding.codec.csv.comma: "\t" + decoding.codec.csv.comment: "#" +---- [id="attrib-decoding-parquet"] [float] ==== `the parquet codec` The `parquet` codec is used to decode parquet compressed data streams. -Only enabling the codec will use the default codec options. The parquet codec supports -two sub attributes which can make parquet decoding more efficient. The `batch_size` attribute and -the `process_parallel` attribute. The `batch_size` attribute can be used to specify the number of -records to read from the parquet stream at a time. By default the `batch size` is set to `1` and -`process_parallel` is set to `false`. If the `process_parallel` attribute is set to `true` then functions -which read multiple columns will read those columns in parallel from the parquet stream with a -number of readers equal to the number of columns. Setting `process_parallel` to `true` will greatly -increase the rate of processing at the cost of increased memory usage. Having a larger `batch_size` -also helps to increase the rate of processing. An example config is shown below: +Only enabling the codec will use the default codec options. + +[source,yaml] +---- + decoding.codec.parquet.enabled: true +---- + +The parquet codec supports two sub attributes which can make parquet decoding +more efficient. The `batch_size` attribute and the `process_parallel` +attribute. The `batch_size` attribute can be used to specify the number of +records to read from the parquet stream at a time. By default the `batch +size` is set to `1` and `process_parallel` is set to `false`. If the +`process_parallel` attribute is set to `true` then functions which read +multiple columns will read those columns in parallel from the parquet stream +with a number of readers equal to the number of columns. Setting +`process_parallel` to `true` will greatly increase the rate of processing at +the cost of increased memory usage. Having a larger `batch_size` also helps +to increase the rate of processing. + +An example config is shown below: [source,yaml] ---- diff --git a/x-pack/filebeat/input/awss3/decoding.go b/x-pack/filebeat/input/awss3/decoding.go index 57c664915ad1..2fb5b396d1ab 100644 --- a/x-pack/filebeat/input/awss3/decoding.go +++ b/x-pack/filebeat/input/awss3/decoding.go @@ -7,11 +7,9 @@ package awss3 import ( "fmt" "io" - - "github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet" ) -// decoder is an interface for decoding data from an io reader. +// decoder is an interface for decoding data from an io.Reader. type decoder interface { // decode reads and decodes data from an io reader based on the codec type. // It returns the decoded data and an error if the data cannot be decoded. @@ -23,6 +21,13 @@ type decoder interface { close() error } +// valueDecoder is a decoder that can decode directly to a JSON serialisable value. +type valueDecoder interface { + decoder + + decodeValue() (any, error) +} + // newDecoder creates a new decoder based on the codec type. // It returns a decoder type and an error if the codec type is not supported. // If the reader config codec option is not set, it returns a nil decoder and nil error. @@ -32,47 +37,9 @@ func newDecoder(config decoderConfig, r io.Reader) (decoder, error) { return nil, nil case config.Codec.Parquet != nil: return newParquetDecoder(config, r) + case config.Codec.CSV != nil: + return newCSVDecoder(config, r) default: return nil, fmt.Errorf("unsupported config value: %v", config) } } - -// parquetDecoder is a decoder for parquet data. -type parquetDecoder struct { - reader *parquet.BufferedReader -} - -// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood. -// It returns an error if the parquet reader cannot be created. -func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) { - reader, err := parquet.NewBufferedReader(r, &parquet.Config{ - ProcessParallel: config.Codec.Parquet.ProcessParallel, - BatchSize: config.Codec.Parquet.BatchSize, - }) - if err != nil { - return nil, fmt.Errorf("failed to create parquet decoder: %w", err) - } - return &parquetDecoder{ - reader: reader, - }, nil -} - -// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded. -func (pd *parquetDecoder) next() bool { - return pd.reader.Next() -} - -// decode reads and decodes a parquet data stream. After reading the parquet data it decodes -// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded. -func (pd *parquetDecoder) decode() ([]byte, error) { - data, err := pd.reader.Record() - if err != nil { - return nil, err - } - return data, nil -} - -// close closes the parquet decoder and releases the resources. -func (pd *parquetDecoder) close() error { - return pd.reader.Close() -} diff --git a/x-pack/filebeat/input/awss3/decoding_config.go b/x-pack/filebeat/input/awss3/decoding_config.go index ae48bb50ae26..9326e3d7d2e1 100644 --- a/x-pack/filebeat/input/awss3/decoding_config.go +++ b/x-pack/filebeat/input/awss3/decoding_config.go @@ -4,6 +4,11 @@ package awss3 +import ( + "fmt" + "unicode/utf8" +) + // decoderConfig contains the configuration options for instantiating a decoder. type decoderConfig struct { Codec *codecConfig `config:"codec"` @@ -12,6 +17,41 @@ type decoderConfig struct { // codecConfig contains the configuration options for different codecs used by a decoder. type codecConfig struct { Parquet *parquetCodecConfig `config:"parquet"` + CSV *csvCodecConfig `config:"csv"` +} + +// csvCodecConfig contains the configuration options for the CSV codec. +type csvCodecConfig struct { + Enabled bool `config:"enabled"` + + // Fields is the set of field names. If it is present + // it is used to specify the object names of returned + // values and the FieldsPerRecord field in the csv.Reader. + // Otherwise, names are obtained from the first + // line of the CSV data. + Fields []string `config:"fields_names"` + + // The fields below have the same meaning as the + // fields of the same name in csv.Reader. + Comma *configRune `config:"comma"` + Comment configRune `config:"comment"` + LazyQuotes bool `config:"lazy_quotes"` + TrimLeadingSpace bool `config:"trim_leading_space"` +} + +type configRune rune + +func (r *configRune) Unpack(s string) error { + if s == "" { + return nil + } + n := utf8.RuneCountInString(s) + if n != 1 { + return fmt.Errorf("single character option given more than one character: %q", s) + } + _r, _ := utf8.DecodeRuneInString(s) + *r = configRune(_r) + return nil } // parquetCodecConfig contains the configuration options for the parquet codec. diff --git a/x-pack/filebeat/input/awss3/decoding_csv.go b/x-pack/filebeat/input/awss3/decoding_csv.go new file mode 100644 index 000000000000..050063517e1f --- /dev/null +++ b/x-pack/filebeat/input/awss3/decoding_csv.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "encoding/csv" + "encoding/json" + "fmt" + "io" + "slices" +) + +// csvDecoder is a decoder for CSV data. +type csvDecoder struct { + r *csv.Reader + + header []string + current []string + + err error +} + +// newParquetDecoder creates a new CSV decoder. +func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) { + d := csvDecoder{r: csv.NewReader(r)} + d.r.ReuseRecord = true + if config.Codec.CSV.Comma != nil { + d.r.Comma = rune(*config.Codec.CSV.Comma) + } + d.r.Comment = rune(config.Codec.CSV.Comment) + d.r.LazyQuotes = config.Codec.CSV.LazyQuotes + d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace + if len(config.Codec.CSV.Fields) != 0 { + d.r.FieldsPerRecord = len(config.Codec.CSV.Fields) + d.header = config.Codec.CSV.Fields + } else { + h, err := d.r.Read() + if err != nil { + return nil, err + } + d.header = slices.Clone(h) + } + return &d, nil +} + +// next advances the decoder to the next data item and returns true if +// there is more data to be decoded. +func (d *csvDecoder) next() bool { + if d.err != nil { + return false + } + d.current, d.err = d.r.Read() + return d.err == nil +} + +// decode returns the JSON encoded value of the current CSV line. next must +// have been called before any calls to decode. +func (d *csvDecoder) decode() ([]byte, error) { + v, err := d.decodeValue() + if err != nil { + return nil, err + } + return json.Marshal(v) +} + +// decodeValue returns the value of the current CSV line interpreted as +// an object with fields based on the header held by the receiver. next must +// have been called before any calls to decode. +func (d *csvDecoder) decodeValue() (any, error) { + if d.err != nil { + return nil, d.err + } + if len(d.current) == 0 { + return nil, fmt.Errorf("decode called before next") + } + m := make(map[string]string, len(d.header)) + // By the time we are here, current must be the same + // length as header; if it was not read, it would be + // zero, but if it was, it must match by the contract + // of the csv.Reader. + for i, n := range d.header { + m[n] = d.current[i] + } + return m, nil +} + +// close closes the parquet decoder and releases the resources. +func (d *csvDecoder) close() error { + if d.err == io.EOF { + return nil + } + return d.err +} diff --git a/x-pack/filebeat/input/awss3/decoding_parquet.go b/x-pack/filebeat/input/awss3/decoding_parquet.go new file mode 100644 index 000000000000..4ef703e4d112 --- /dev/null +++ b/x-pack/filebeat/input/awss3/decoding_parquet.go @@ -0,0 +1,52 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "fmt" + "io" + + "github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet" +) + +// parquetDecoder is a decoder for parquet data. +type parquetDecoder struct { + reader *parquet.BufferedReader +} + +// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood. +// It returns an error if the parquet reader cannot be created. +func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) { + reader, err := parquet.NewBufferedReader(r, &parquet.Config{ + ProcessParallel: config.Codec.Parquet.ProcessParallel, + BatchSize: config.Codec.Parquet.BatchSize, + }) + if err != nil { + return nil, fmt.Errorf("failed to create parquet decoder: %w", err) + } + return &parquetDecoder{ + reader: reader, + }, nil +} + +// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded. +func (pd *parquetDecoder) next() bool { + return pd.reader.Next() +} + +// decode reads and decodes a parquet data stream. After reading the parquet data it decodes +// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded. +func (pd *parquetDecoder) decode() ([]byte, error) { + data, err := pd.reader.Record() + if err != nil { + return nil, err + } + return data, nil +} + +// close closes the parquet decoder and releases the resources. +func (pd *parquetDecoder) close() error { + return pd.reader.Close() +} diff --git a/x-pack/filebeat/input/awss3/decoding_test.go b/x-pack/filebeat/input/awss3/decoding_test.go index 81f2bbc450c2..841b6ddcbc36 100644 --- a/x-pack/filebeat/input/awss3/decoding_test.go +++ b/x-pack/filebeat/input/awss3/decoding_test.go @@ -6,17 +6,22 @@ package awss3 import ( "encoding/json" + "errors" "os" "path/filepath" + "reflect" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + + conf "github.com/elastic/elastic-agent-libs/config" ) // all test files are read from the "testdata" directory const testDataPath = "testdata" -func TestParquetDecoding(t *testing.T) { +func TestDecoding(t *testing.T) { testCases := []struct { name string file string @@ -26,7 +31,7 @@ func TestParquetDecoding(t *testing.T) { config *readerConfig }{ { - name: "test decoding of a parquet file and compare the number of events with batch size 1", + name: "parquet_batch_size_1", file: "vpc-flow.gz.parquet", numEvents: 1304, config: &readerConfig{ @@ -41,7 +46,7 @@ func TestParquetDecoding(t *testing.T) { }, }, { - name: "test decoding of a parquet file and compare the number of events with batch size 100", + name: "parquet_batch_size_100", file: "vpc-flow.gz.parquet", numEvents: 1304, config: &readerConfig{ @@ -56,7 +61,7 @@ func TestParquetDecoding(t *testing.T) { }, }, { - name: "test decoding of a parquet file and compare the number of events with default parquet config", + name: "parquet_default", file: "vpc-flow.gz.parquet", numEvents: 1304, config: &readerConfig{ @@ -70,7 +75,7 @@ func TestParquetDecoding(t *testing.T) { }, }, { - name: "test decoding of a parquet file and compare the number of events along with the content", + name: "parquet_default_content_check", file: "cloudtrail.parquet", numEvents: 1, assertAgainst: "cloudtrail.json", @@ -86,6 +91,38 @@ func TestParquetDecoding(t *testing.T) { }, }, }, + { + name: "gzip_csv", + file: "txn.csv.gz", + numEvents: 4, + assertAgainst: "txn.json", + config: &readerConfig{ + Decoding: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + }, + { + name: "csv", + file: "txn.csv", + numEvents: 4, + assertAgainst: "txn.json", + config: &readerConfig{ + Decoding: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + }, } for _, tc := range testCases { @@ -97,9 +134,7 @@ func TestParquetDecoding(t *testing.T) { } // uses the s3_objects test method to perform the test events := testProcessS3Object(t, file, tc.contentType, tc.numEvents, sel) - // if assertAgainst is not empty, then compare the events with the target file - // there is a chance for this comparison to become flaky if number of events > 1 as - // the order of events are not guaranteed by beats + // If assertAgainst is not empty, then compare the events with the target file. if tc.assertAgainst != "" { targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst)) assert.Equal(t, len(targetData), len(events)) @@ -128,3 +163,100 @@ func readJSONFromFile(t *testing.T, filepath string) []string { } return data } + +var codecConfigTests = []struct { + name string + yaml string + want decoderConfig + wantErr error +}{ + { + name: "handle_rune", + yaml: ` +codec: + csv: + enabled: true + comma: ' ' + comment: '#' +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + Comment: '#', + }, + }}, + }, + { + name: "no_comma", + yaml: ` +codec: + csv: + enabled: true +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + }, + }}, + }, + { + name: "null_comma", + yaml: ` +codec: + csv: + enabled: true + comma: "\u0000" +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune]('\x00'), + }, + }}, + }, + { + name: "bad_rune", + yaml: ` +codec: + csv: + enabled: true + comma: 'this is too long' +`, + wantErr: errors.New(`single character option given more than one character: "this is too long" accessing 'codec.csv.comma'`), + }, +} + +func TestCodecConfig(t *testing.T) { + for _, test := range codecConfigTests { + t.Run(test.name, func(t *testing.T) { + c, err := conf.NewConfigWithYAML([]byte(test.yaml), "") + if err != nil { + t.Fatalf("unexpected error unmarshaling config: %v", err) + } + + var got decoderConfig + err = c.Unpack(&got) + if !sameError(err, test.wantErr) { + t.Errorf("unexpected error unpacking config: got:%v want:%v", err, test.wantErr) + } + + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(test.want, got)) + } + }) + } +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} + +func ptr[T any](v T) *T { return &v } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 943a36ef0632..82a9e817bc68 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -168,17 +168,37 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { s3Obj.contentType = p.readerConfig.ContentType } - // try to create a decoder from the using the codec config - decoder, err := newDecoder(p.readerConfig.Decoding, reader) + // try to create a dec from the using the codec config + dec, err := newDecoder(p.readerConfig.Decoding, reader) if err != nil { return err } - if decoder != nil { - defer decoder.close() + var evtOffset int64 + switch dec := dec.(type) { + case valueDecoder: + defer dec.close() - var evtOffset int64 - for decoder.next() { - data, err := decoder.decode() + for dec.next() { + val, err := dec.decodeValue() + if err != nil { + if err == io.EOF { + return nil + } + break + } + data, err := json.Marshal(val) + if err != nil { + return err + } + evt := p.createEvent(string(data), evtOffset) + p.publish(p.acker, &evt) + } + + case decoder: + defer dec.close() + + for dec.next() { + data, err := dec.decode() if err != nil { if errors.Is(err, io.EOF) { return nil @@ -190,7 +210,8 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { break } } - } else { + + default: // This is the legacy path. It will be removed in future and clubbed together with the decoder. // Process object content stream. switch { diff --git a/x-pack/filebeat/input/awss3/testdata/txn.csv b/x-pack/filebeat/input/awss3/testdata/txn.csv new file mode 100644 index 000000000000..80ca65df21ef --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/txn.csv @@ -0,0 +1,5 @@ +date time time-taken cs-bytes sc-bytes bytes c-ip s-ip cs-username cs-method cs-uri-scheme cs-uri-query cs-user-agent cs-content-type sc-status sc-content-type cs-dns cs-host cs-uri cs-uri-port cs-referer x-cs-session-id x-cs-access-method x-cs-app x-s-country x-s-latitude x-s-longitude x-s-location x-s-region x-s-zipcode x-c-country x-c-latitude x-c-longitude x-c-location x-c-region x-c-zipcode x-c-os x-c-browser x-c-browser-version x-c-device x-cs-site x-cs-timestamp x-cs-page-id x-cs-userip x-cs-traffic-type x-cs-tunnel-id x-category x-other-category x-type x-server-ssl-err x-client-ssl-err x-transaction-id x-request-id x-cs-sni x-cs-domain-fronted-sni x-category-id x-other-category-id x-sr-headers-name x-sr-headers-value x-cs-ssl-ja3 x-sr-ssl-ja3s x-ssl-bypass x-ssl-bypass-reason x-r-cert-subject-cn x-r-cert-issuer-cn x-r-cert-startdate x-r-cert-enddate x-r-cert-valid x-r-cert-expired x-r-cert-untrusted-root x-r-cert-incomplete-chain x-r-cert-self-signed x-r-cert-revoked x-r-cert-revocation-check x-r-cert-mismatch x-cs-ssl-fronting-error x-cs-ssl-handshake-error x-sr-ssl-handshake-error x-sr-ssl-client-certificate-error x-sr-ssl-malformed-ssl x-s-custom-signing-ca-error x-cs-ssl-engine-action x-cs-ssl-engine-action-reason x-sr-ssl-engine-action x-sr-ssl-engine-action-reason x-ssl-policy-src-ip x-ssl-policy-dst-ip x-ssl-policy-dst-host x-ssl-policy-dst-host-source x-ssl-policy-categories x-ssl-policy-action x-ssl-policy-name x-cs-ssl-version x-cs-ssl-cipher x-sr-ssl-version x-sr-ssl-cipher x-cs-src-ip-egress x-s-dp-name x-cs-src-ip x-cs-src-port x-cs-dst-ip x-cs-dst-port x-sr-src-ip x-sr-src-port x-sr-dst-ip x-sr-dst-port x-cs-ip-connect-xff x-cs-ip-xff x-cs-connect-host x-cs-connect-port x-cs-connect-user-agent x-cs-url x-cs-uri-path x-cs-http-version rs-status x-cs-app-category x-cs-app-cci x-cs-app-ccl x-cs-app-tags x-cs-app-suite x-cs-app-instance-id x-cs-app-instance-name x-cs-app-instance-tag x-cs-app-activity x-cs-app-from-user x-cs-app-to-user x-cs-app-object-type x-cs-app-object-name x-cs-app-object-id x-rs-file-type x-rs-file-category x-rs-file-language x-rs-file-size x-rs-file-md5 x-rs-file-sha256 x-error x-c-local-time x-policy-action x-policy-name x-policy-src-ip x-policy-dst-ip x-policy-dst-host x-policy-dst-host-source x-policy-justification-type x-policy-justification-reason x-sc-notification-name +2024-08-05 16:24:20 64 2971 2050 5021 10.5.78.159 204.79.197.237 "vikash.ranjan@riverbed.com" GET https cc=US&setlang=en-US "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Cortana 1.14.7.19041; 10.0.0.0.19045.2006) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19045" - 200 "application/json; charset=utf-8" www.bing.com www.bing.com /client/config?cc=US&setlang=en-US 443 - 3683772769278232507 "Client" "Microsoft Bing" "US" 47.682899 -122.120903 "Redmond" "Washington" "N/A" "US" 29.775400 -95.598000 "Houston" "Texas" "77079" "Windows 10" "Edge" "18.19045" "Windows Device" "bing" 1722875060 5762388460300455936 10.5.78.159 CloudApp - "Search Engines" - http_transaction - - 2696581500064586450 2901306739654139904 www.bing.com - 551 - - - 28a2c9bd18a11de089ef85a160da29e4 NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.5.78.159 69.192.139.97 www.bing.com Sni "Search Engines" Decrypt - TLSv1.2 ECDHE-RSA-AES256-GCM-SHA384 NotChecked NotChecked 208.185.23.18 "US-ATL2" 10.5.78.159 25941 69.192.139.97 443 - - 10.144.54.201 842 - - - - - https://www.bing.com/client/config?cc=US&setlang=en-US /client/config HTTP1.1 200 "Search Engines" 58 low "Consumer,Unsanctioned" - - - - "Browse" - - - - - - - - - - - - "2024-08-05 11:24:00" "allow" "NetskopeAllow" 10.5.78.159 204.79.197.237 www.bing.com HttpHostHeader - - - +2024-08-05 16:24:19 - 18 0 18 10.70.0.19 - "nadav@skyformation.onmicrosoft.com" PRI - - - - - - - us-west1-b-osconfig.googleapis.com * 443 - 0 "Client" - - - - - - - "US" 45.605600 -121.180700 "The Dalles" "Oregon" "97058" - - - - - 1722875059 0 10.70.0.19 - - "Technology" "Cloud Storage" http_transaction - - 2035489204758272484 0 us-west1-b-osconfig.googleapis.com - 564 "7" - - 7a15285d4efc355608b304698cd7f9ab NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.70.0.19 142.250.99.95 us-west1-b-osconfig.googleapis.com Sni "Technology, Cloud Storage" Decrypt - TLSv1.3 TLS_AES_256_GCM_SHA384 NotChecked NotChecked 34.82.190.203 "US-SEA2" 10.70.0.19 32951 142.250.99.95 443 - - - - - - - - - - - HTTP1.1 - - - - - - - - - - - - - - - - - - - - - http-malformed "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:20 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875060 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1350739992944030464 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:23 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875063 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1615432978285898071 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - diff --git a/x-pack/filebeat/input/awss3/testdata/txn.csv.gz b/x-pack/filebeat/input/awss3/testdata/txn.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..52e8fb20539a40e3127997a8877e3346ea1c6dfc GIT binary patch literal 2527 zcmV<52_W_#iwFP!00000|8!Mbn(H0bbhgbUzHnwq|6>2XLC4)%VlQk~5hX z8-672pb3p{=C<+y!!HguQuufp>rJT=J&Y^wJt84{jZSZ)s>1!Ywi1GuU1 zNW^-&SfG8sJs|mT1e0f`J({y);=;tRj2xl47&0TgH1dzzDdsaYsG?C7T~yJiin=Pg zsEV3@9}@r55>%t%&Z6zg#CJzrV|G@lR7Ex1_QLMC@R^BJ09OSjIM0qsk_&Q#z`h}5Vg2ZTpxj=E-JsSh9=tisTXHd6SuP)h~ZfLajROK?bu%t3Z>G+Ca(iOk>6x4M7D5x!w?6}#b;=P{O(9cn} zRCM)aCJ(i^<%p6Ggs*OJSX^Kzjr9ZEOIYo^UU~-)m(;@^y`v|na2amuach}27@z-vD9l_V}`#yRRBdnDHW2YPaiJ6tYDfd9TlNM^3p&sZOZ`R-AEh#wip6xKRu^l@ui93>YPn zC{36&xGRVhPGr)pQQ)aoh}0*B5uyi@}YZ z^{Wf#)4K|ioA7trbvBcP!pr;CH(@ux?L3B~=YSi9IEPR1H3|J_y#+=XM#tb(u=Bm% z{)w_AoYLYIl$%ZG?%m>TJ7zLhBro9Q+u8i)`ZaXzgM%wqKZI8>S@+@5Wh$EIGRN8| zhsrK(-+nfmHY=Ageej+@iltLXEc$P2JGQ}RABW)2psJwi zQz5In*7-53gI~6*KmH37A#x0q7dlsptHN-ldCnz8peK9d1Yv9IJ`77E9YaWAdQ0X37LiI6t6mGNf*^V2cloVK5Y9x_H(^%vzJPilU zx%*@XT$Q5AL^sbgF^WP&2$wGy{d;`rwrw5H7W)*L;#@093v4J=@LX$A(43-;G>G(Z ze@(jZ)F6onLsB{0*K3&iRqNdlDTv+lVZ>$|Y#k}UE###zh15)9>V=RR{}k(@ET@Ia z@kp@TV1B1$q#z~Ro1+75d2JRA)0VNuQEl8(OUnw{SZpjXcKwa^vt2Hm*F# z|8rBHur@;gW?>j-&Ni+;g!=ure*h9uW=ReP001A02mk;800003ol{MZ+aM6VSL%Nl zajXgg3^wekRyOKZ>c^^f&la)qht@HXVYAA=FHR24M%gy1#P7RhgVRI`r8yOuR6OHCi3odxR17i|fU*;{%rME7Y=mv=L`H3DlTeXr zRX16y%?7@^knhbfn$2kAV&NJ~tKQa6wn4Wp{^b9gz72PJ{)q(nIJy@=YagPP!S^Si z$sVC!RnLaQ1SeclqBYU-mg_C1;e47O;bNHgww@2W5bv@4TI?A~mT#4>i;By%`n>v9 z6%Tn5hk0E`B8cLV7$wmFiz{)l&iO0K3D=Tc5I?#5T0E5HW3=)cyVqi|Bi@>E`3L_p zn2Wae?|uVN)}`m^0{{RYiwFP!000001MO2yZxb;Py(94-ET7=8mcQ0(=R^rXMW9ro z6vQd=I!?2!v7@y&DF43RO%SwAqzDd3sG7^nc>LbXXdcCs7g!d!1Vl_IpHK`6fJY-3 zVMH+^gybA_c58jIXr=wFYhIZUL)=AV!grXT@0&JjYg*eC+POx1V?Ifhti}7z4^ssW`)9egCbP>rF5k?jtY_y-&Lwaf~soe#ase zrLroqcs$bH)+ucksnb4clbYZM?#uXAH?Hq2cwxIPbQ8j{;+)9I_4NnPg+4Xj#z~Bh zEjp{0i+Y!GoLt-zpq=d!=zZ(M4-brf2?1dxu9#AaW)#dY7d&@!&X#^!ZK%?kQ?W8i z8C+XZtJciWLUy;gJ1rix~Jj5R~yfP0#Er7$S zXT$dMEQ5ck{W7j~Yxmg!-P@6G7E>O(iR*!LzptCa`U~*Jnr^pAhx~C;Nf1#2ky4