From a852c864b14cdaee319c2540927c007ef2a27de5 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 16 Feb 2024 09:28:34 -0500 Subject: [PATCH] [pkg/ottl]: Add ParseCSV converter (#31081) **Description:** * Adds a new ParseCSV converter that can parse CSV row strings. **Link to tracking Issue:** Closes #30921 **Testing:** * Unit tests * Manually tested the examples with a local build of the collector **Documentation:** * Adds documentation for using the ParseCSV converter. --- .chloggen/feat_ottl_csv-parse-function.yaml | 13 + internal/coreinternal/parseutils/csv.go | 85 +++ internal/coreinternal/parseutils/csv_test.go | 175 ++++++ pkg/ottl/e2e/e2e_test.go | 18 + pkg/ottl/ottlfuncs/README.md | 33 ++ pkg/ottl/ottlfuncs/func_parse_csv.go | 145 +++++ pkg/ottl/ottlfuncs/func_parse_csv_test.go | 560 +++++++++++++++++++ pkg/ottl/ottlfuncs/functions.go | 1 + 8 files changed, 1030 insertions(+) create mode 100755 .chloggen/feat_ottl_csv-parse-function.yaml create mode 100644 internal/coreinternal/parseutils/csv.go create mode 100644 internal/coreinternal/parseutils/csv_test.go create mode 100644 pkg/ottl/ottlfuncs/func_parse_csv.go create mode 100644 pkg/ottl/ottlfuncs/func_parse_csv_test.go diff --git a/.chloggen/feat_ottl_csv-parse-function.yaml b/.chloggen/feat_ottl_csv-parse-function.yaml new file mode 100755 index 000000000000..0cf88fb8455a --- /dev/null +++ b/.chloggen/feat_ottl_csv-parse-function.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds a new ParseCSV converter that can be used to parse CSV strings. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30921] diff --git a/internal/coreinternal/parseutils/csv.go b/internal/coreinternal/parseutils/csv.go new file mode 100644 index 000000000000..5354213f2dde --- /dev/null +++ b/internal/coreinternal/parseutils/csv.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parseutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" + +import ( + "encoding/csv" + "errors" + "fmt" + "io" + "strings" +) + +// ReadCSVRow reads a CSV row from the csv reader, returning the fields parsed from the line. +// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields. +// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated), +// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls. +func ReadCSVRow(row string, delimiter rune, lazyQuotes bool) ([]string, error) { + reader := csv.NewReader(strings.NewReader(row)) + reader.Comma = delimiter + // -1 indicates a variable length of fields + reader.FieldsPerRecord = -1 + reader.LazyQuotes = lazyQuotes + + lines := make([][]string, 0, 1) + for { + line, err := reader.Read() + if errors.Is(err, io.EOF) { + break + } + + if err != nil && len(line) == 0 { + return nil, fmt.Errorf("read csv line: %w", err) + } + + lines = append(lines, line) + } + + // If the input is empty, we might not get any lines + if len(lines) == 0 { + return nil, errors.New("no csv lines found") + } + + /* + This parser is parsing a single value, which came from a single log entry. + Therefore, if there are multiple lines here, it should be assumed that each + subsequent line contains a continuation of the last field in the previous line. + + Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee", + expect reader.Read() to return bodies: + - ["aa","b"] + - ["b","cc","d"] + - ["d","ee"] + */ + + joinedLine := lines[0] + for i := 1; i < len(lines); i++ { + nextLine := lines[i] + + // The first element of the next line is a continuation of the previous line's last element + joinedLine[len(joinedLine)-1] += "\n" + nextLine[0] + + // The remainder are separate elements + for n := 1; n < len(nextLine); n++ { + joinedLine = append(joinedLine, nextLine[n]) + } + } + + return joinedLine, nil +} + +// MapCSVHeaders creates a map of headers[i] -> fields[i]. +func MapCSVHeaders(headers []string, fields []string) (map[string]any, error) { + if len(fields) != len(headers) { + return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields)) + } + + parsedValues := make(map[string]any, len(headers)) + + for i, val := range fields { + parsedValues[headers[i]] = val + } + + return parsedValues, nil +} diff --git a/internal/coreinternal/parseutils/csv_test.go b/internal/coreinternal/parseutils/csv_test.go new file mode 100644 index 000000000000..1d93b89bcf3e --- /dev/null +++ b/internal/coreinternal/parseutils/csv_test.go @@ -0,0 +1,175 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parseutils + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_ParseCSV(t *testing.T) { + testCases := []struct { + name string + row string + delimiter rune + lazyQuotes bool + expectedRow []string + expectedErr string + }{ + { + name: "Typical CSV row", + row: "field1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Quoted CSV row", + row: `field1,"field2,contains delimiter",field3`, + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2,contains delimiter", "field3"}, + }, + { + name: "Bare quote in field (strict)", + row: `field1,field"2,field3`, + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1"}, + }, + { + name: "Bare quote in field (lazy quotes)", + row: `field1,field"2,field3`, + delimiter: ',', + lazyQuotes: true, + expectedRow: []string{"field1", `field"2`, "field3"}, + }, + { + name: "Empty row", + row: "", + delimiter: ',', + lazyQuotes: false, + expectedErr: "no csv lines found", + }, + { + name: "Newlines in field", + row: "field1,fie\nld2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "fie\nld2", "field3"}, + }, + { + name: "Newlines prefix field", + row: "field1,\nfield2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "\nfield2", "field3"}, + }, + { + name: "Newlines suffix field", + row: "field1,field2\n,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2\n", "field3"}, + }, + { + name: "Newlines prefix row", + row: "\nfield1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Newlines suffix row", + row: "field1,field2,field3\n", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"field1", "field2", "field3"}, + }, + { + name: "Newlines in first row", + row: "fiel\nd1,field2,field3", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"fiel\nd1", "field2", "field3"}, + }, + { + name: "Newlines in all rows", + row: "\nfiel\nd1,fie\nld2,fie\nld3\n", + delimiter: ',', + lazyQuotes: false, + expectedRow: []string{"fiel\nd1", "fie\nld2", "fie\nld3"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s, err := ReadCSVRow(tc.row, tc.delimiter, tc.lazyQuotes) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.Equal(t, tc.expectedRow, s) + } + }) + } +} + +func Test_MapCSVHeaders(t *testing.T) { + testCases := []struct { + name string + headers []string + fields []string + expectedMap map[string]any + expectedErr string + }{ + { + name: "Map headers to fields", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2", "Val3"}, + expectedMap: map[string]any{ + "Col1": "Val1", + "Col2": "Val2", + "Col3": "Val3", + }, + }, + { + name: "Missing field", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2"}, + expectedErr: "wrong number of fields: expected 3, found 2", + }, + { + name: "Too many fields", + headers: []string{"Col1", "Col2", "Col3"}, + fields: []string{"Val1", "Val2", "Val3", "Val4"}, + expectedErr: "wrong number of fields: expected 3, found 4", + }, + { + name: "Single field", + headers: []string{"Col1"}, + fields: []string{"Val1"}, + expectedMap: map[string]any{ + "Col1": "Val1", + }, + }, + { + name: "No fields", + headers: []string{}, + fields: []string{}, + expectedMap: map[string]any{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m, err := MapCSVHeaders(tc.headers, tc.fields) + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + } else { + require.Equal(t, tc.expectedMap, m) + } + }) + } +} diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index ceab9dcc306a..26dbaf85be18 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -430,6 +430,24 @@ func Test_e2e_converters(t *testing.T) { tCtx.GetLogRecord().Attributes().PutStr("test", "pass") }, }, + { + statement: `set(attributes["test"], ParseCSV("val1;val2;val3","header1|header2|header3",";","|","strict"))`, + want: func(tCtx ottllog.TransformContext) { + m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test") + m.PutStr("header1", "val1") + m.PutStr("header2", "val2") + m.PutStr("header3", "val3") + }, + }, + { + statement: `set(attributes["test"], ParseCSV("val1,val2,val3","header1|header2|header3",headerDelimiter="|",mode="strict"))`, + want: func(tCtx ottllog.TransformContext) { + m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test") + m.PutStr("header1", "val1") + m.PutStr("header2", "val2") + m.PutStr("header3", "val3") + }, + }, { statement: `set(attributes["test"], ParseJSON("{\"id\":1}"))`, want: func(tCtx ottllog.TransformContext) { diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 32a2d801ba4d..8a4ffafd3050 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -401,6 +401,7 @@ Available Converters: - [Minutes](#minutes) - [Nanoseconds](#nanoseconds) - [Now](#now) +- [ParseCSV](#parsecsv) - [ParseJSON](#parsejson) - [ParseKeyValue](#parsekeyvalue) - [Seconds](#seconds) @@ -799,6 +800,38 @@ Examples: - `UnixSeconds(Now())` - `set(start_time, Now())` +### ParseCSV + +`ParseCSV(target, headers, Optional[delimiter], Optional[headerDelimiter], Optional[mode])` + +The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value. + +`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Leading and trailing newlines in `target` will be stripped. Newlines elswhere in `target` are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in. + +`headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields. + +`delimiter` is an optional string parameter that specifies the delimiter used to split `target` into fields. By default, it is set to `,`. + +`headerDelimiter` is an optional string parameter that specified the delimiter used to split `headers` into fields. By default, it is set to the value of `delimiter`. + +`mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`. +- The `strict` mode provides typical CSV parsing. +- The `lazyQotes` mode provides a relaxed version of CSV parsing where a quote may appear in the middle of a unquoted field. +- The `ignoreQuotes` mode completely ignores any quoting rules for CSV and just splits the row on the delimiter. + +Examples: + +- `ParseCSV("999-999-9999,Joe Smith,joe.smith@example.com", "phone,name,email")` + + +- `ParseCSV(body, "phone|name|email", delimiter="|")` + + +- `ParseCSV(attributes["csv_line"], attributes["csv_headers"], delimiter="|", headerDelimiter=",", mode="lazyQuotes")` + + +- `ParseCSV("\"555-555-5556,Joe Smith\",joe.smith@example.com", "phone,name,email", mode="ignoreQuotes")` + ### ParseJSON `ParseJSON(target)` diff --git a/pkg/ottl/ottlfuncs/func_parse_csv.go b/pkg/ottl/ottlfuncs/func_parse_csv.go new file mode 100644 index 000000000000..dd0a88dc2343 --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_parse_csv.go @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" + +import ( + "context" + "errors" + "fmt" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +const ( + parseCSVModeStrict = "strict" + parseCSVModeLazyQuotes = "lazyQuotes" + parseCSVModeIgnoreQuotes = "ignoreQuotes" +) + +const ( + parseCSVDefaultDelimiter = ',' + parseCSVDefaultMode = parseCSVModeStrict +) + +type ParseCSVArguments[K any] struct { + Target ottl.StringGetter[K] + Header ottl.StringGetter[K] + Delimiter ottl.Optional[string] + HeaderDelimiter ottl.Optional[string] + Mode ottl.Optional[string] +} + +func (p ParseCSVArguments[K]) validate() error { + if !p.Delimiter.IsEmpty() { + if len([]rune(p.Delimiter.Get())) != 1 { + return errors.New("delimiter must be a single character") + } + } + + if !p.HeaderDelimiter.IsEmpty() { + if len([]rune(p.HeaderDelimiter.Get())) != 1 { + return errors.New("header_delimiter must be a single character") + } + } + + return nil +} + +func NewParseCSVFactory[K any]() ottl.Factory[K] { + return ottl.NewFactory("ParseCSV", &ParseCSVArguments[K]{}, createParseCSVFunction[K]) +} + +func createParseCSVFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { + args, ok := oArgs.(*ParseCSVArguments[K]) + if !ok { + return nil, fmt.Errorf("ParseCSVFactory args must be of type *ParseCSVArguments[K]") + } + + if err := args.validate(); err != nil { + return nil, fmt.Errorf("invalid arguments: %w", err) + } + + delimiter := parseCSVDefaultDelimiter + if !args.Delimiter.IsEmpty() { + delimiter = []rune(args.Delimiter.Get())[0] + } + + // headerDelimiter defaults to the chosen delimter, + // since in most cases headerDelimiter == delmiter. + headerDelimiter := string(delimiter) + if !args.HeaderDelimiter.IsEmpty() { + headerDelimiter = args.HeaderDelimiter.Get() + } + + mode := parseCSVDefaultMode + if !args.Mode.IsEmpty() { + mode = args.Mode.Get() + } + + var parseRow parseCSVRowFunc + switch mode { + case parseCSVModeStrict: + parseRow = parseCSVRow(false) + case parseCSVModeLazyQuotes: + parseRow = parseCSVRow(true) + case parseCSVModeIgnoreQuotes: + parseRow = parseCSVRowIgnoreQuotes() + default: + return nil, fmt.Errorf("unknown mode: %s", mode) + } + + return parseCSV(args.Target, args.Header, delimiter, headerDelimiter, parseRow), nil +} + +type parseCSVRowFunc func(row string, delimiter rune) ([]string, error) + +func parseCSV[K any](target, header ottl.StringGetter[K], delimiter rune, headerDelimiter string, parseRow parseCSVRowFunc) ottl.ExprFunc[K] { + return func(ctx context.Context, tCtx K) (any, error) { + targetStr, err := target.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("error getting value for target in ParseCSV: %w", err) + } + + headerStr, err := header.Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("error getting value for header in ParseCSV: %w", err) + } + + if headerStr == "" { + return nil, errors.New("headers must not be an empty string") + } + + headers := strings.Split(headerStr, headerDelimiter) + + fields, err := parseRow(targetStr, delimiter) + if err != nil { + return nil, err + } + + headersToFields, err := parseutils.MapCSVHeaders(headers, fields) + if err != nil { + return nil, fmt.Errorf("map csv headers: %w", err) + } + + pMap := pcommon.NewMap() + err = pMap.FromRaw(headersToFields) + return pMap, err + } +} + +func parseCSVRow(lazyQuotes bool) parseCSVRowFunc { + return func(row string, delimiter rune) ([]string, error) { + return parseutils.ReadCSVRow(row, delimiter, lazyQuotes) + } +} + +func parseCSVRowIgnoreQuotes() parseCSVRowFunc { + return func(row string, delimiter rune) ([]string, error) { + return strings.Split(row, string([]rune{delimiter})), nil + } +} diff --git a/pkg/ottl/ottlfuncs/func_parse_csv_test.go b/pkg/ottl/ottlfuncs/func_parse_csv_test.go new file mode 100644 index 000000000000..859df2d82cb0 --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_parse_csv_test.go @@ -0,0 +1,560 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +func Test_ParseCSV(t *testing.T) { + tests := []struct { + name string + oArgs ottl.Arguments + want map[string]any + createError string + parseError string + }{ + /* Test default mode */ + { + name: "Parse comma separated values", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline in first field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\nnewline,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1\nnewline", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline in middle field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2\nnewline,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2\nnewline", + "col3": "val3", + }, + }, + { + name: "Parse with newline in last field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3\nnewline", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3\nnewline", + }, + }, + { + name: "Parse with newline in multiple fields", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\nnewline1,val2\nnewline2,val3\nnewline3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1\nnewline1", + "col2": "val2\nnewline2", + "col3": "val3\nnewline3", + }, + }, + { + name: "Parse with leading newline", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "\nval1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with trailing newline", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3\n", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse with newline at end of field", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\n,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + want: map[string]any{ + "col1": "val1\n", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse comma separated values with explicit mode", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("strict"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Parse tab separated values", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\tval2\tval3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1\tcol2\tcol3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("\t"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Header delimiter is different from row delimiter", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1\tval2\tval3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1 col2 col3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("\t"), + HeaderDelimiter: ottl.NewTestingOptional(" "), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": "val3", + }, + }, + { + name: "Invalid target (strict mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "error getting value for target in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid header (strict mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + }, + parseError: "error getting value for header in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid args", + oArgs: nil, + createError: "ParseCSVFactory args must be of type *ParseCSVArguments[K]", + }, + { + name: "Parse fails due to header/row column mismatch", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,val3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "wrong number of fields: expected 2, found 3", + }, + { + name: "Parse fails due to header/row column mismatch", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,val3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + }, + parseError: "wrong number of fields: expected 2, found 3", + }, + { + name: "Empty header string (strict)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + }, + parseError: "headers must not be an empty string", + }, + { + name: "Parse fails due to empty row", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + parseError: "no csv lines found", + }, + { + name: "Parse fails for row with bare quotes", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,v"al3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + }, + parseError: "wrong number of fields: expected 3, found 2", + }, + + /* Test parsing with lazy quotes */ + { + name: "Parse lazyQuotes with quote in row", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2,v"al3`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("lazyQuotes"), + }, + want: map[string]any{ + "col1": "val1", + "col2": "val2", + "col3": `v"al3`, + }, + }, + { + name: "Parse lazyQuotes invalid csv", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,"val2,"val3,val4"`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3,col4", nil + }, + }, + Mode: ottl.NewTestingOptional("lazyQuotes"), + }, + parseError: "wrong number of fields: expected 4, found 2", + }, + /* Test parsing ignoring quotes */ + { + name: "Parse quotes invalid csv", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,"val2,"val3,val4"`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3,col4", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + want: map[string]any{ + "col1": "val1", + "col2": `"val2`, + "col3": `"val3`, + "col4": `val4"`, + }, + }, + { + name: "Invalid target (ignoreQuotes mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "error getting value for target in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Invalid header (ignoreQuotes mode)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1,val2`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return nil, errors.New("cannot get") + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "error getting value for header in ParseCSV: error getting value in ottl.StandardStringGetter[interface {}]: cannot get", + }, + { + name: "Empty header string (ignoreQuotes)", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return `val1`, nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "", nil + }, + }, + Mode: ottl.NewTestingOptional("ignoreQuotes"), + }, + parseError: "headers must not be an empty string", + }, + /* Validation tests */ + { + name: "Delimiter is greater than one character", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Delimiter: ottl.NewTestingOptional("bad_delim"), + }, + createError: "invalid arguments: delimiter must be a single character", + }, + { + name: "HeaderDelimiter is greater than one character", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + HeaderDelimiter: ottl.NewTestingOptional("bad_delim"), + }, + createError: "invalid arguments: header_delimiter must be a single character", + }, + { + name: "Bad mode", + oArgs: &ParseCSVArguments[any]{ + Target: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "val1,val2,val3", nil + }, + }, + Header: ottl.StandardStringGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return "col1,col2,col3", nil + }, + }, + Mode: ottl.NewTestingOptional("fake-mode"), + }, + createError: "unknown mode: fake-mode", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exprFunc, err := createParseCSVFunction[any](ottl.FunctionContext{}, tt.oArgs) + if tt.createError != "" { + require.ErrorContains(t, err, tt.createError) + return + } + + require.NoError(t, err) + + result, err := exprFunc(context.Background(), nil) + if tt.parseError != "" { + require.ErrorContains(t, err, tt.parseError) + return + } + + assert.NoError(t, err) + + resultMap, ok := result.(pcommon.Map) + require.True(t, ok) + + require.Equal(t, tt.want, resultMap.AsRaw()) + }) + } +} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 657b88280367..786e270bae36 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -56,6 +56,7 @@ func converters[K any]() []ottl.Factory[K] { NewMinutesFactory[K](), NewNanosecondsFactory[K](), NewNowFactory[K](), + NewParseCSVFactory[K](), NewParseJSONFactory[K](), NewParseKeyValueFactory[K](), NewSecondsFactory[K](),