Skip to content

Commit

Permalink
[pkg/ottl]: Add ParseCSV converter (#31081)
Browse files Browse the repository at this point in the history
**Description:**
* Adds a new ParseCSV converter that can parse CSV row strings.

**Link to tracking Issue:** Closes #30921

**Testing:**
* Unit tests
* Manually tested the examples with a local build of the collector

**Documentation:**
* Adds documentation for using the ParseCSV converter.
  • Loading branch information
BinaryFissionGames authored Feb 16, 2024
1 parent d0c0e97 commit a852c86
Show file tree
Hide file tree
Showing 8 changed files with 1,030 additions and 0 deletions.
13 changes: 13 additions & 0 deletions .chloggen/feat_ottl_csv-parse-function.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a new ParseCSV converter that can be used to parse CSV strings.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30921]
85 changes: 85 additions & 0 deletions internal/coreinternal/parseutils/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package parseutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils"

import (
"encoding/csv"
"errors"
"fmt"
"io"
"strings"
)

// ReadCSVRow reads a CSV row from the csv reader, returning the fields parsed from the line.
// We make the assumption that the payload we are reading is a single row, so we allow newline characters in fields.
// However, the csv package does not support newlines in a CSV field (it assumes rows are newline separated),
// so in order to support parsing newlines in a field, we need to stitch together the results of multiple Read calls.
func ReadCSVRow(row string, delimiter rune, lazyQuotes bool) ([]string, error) {
reader := csv.NewReader(strings.NewReader(row))
reader.Comma = delimiter
// -1 indicates a variable length of fields
reader.FieldsPerRecord = -1
reader.LazyQuotes = lazyQuotes

lines := make([][]string, 0, 1)
for {
line, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}

if err != nil && len(line) == 0 {
return nil, fmt.Errorf("read csv line: %w", err)
}

lines = append(lines, line)
}

// If the input is empty, we might not get any lines
if len(lines) == 0 {
return nil, errors.New("no csv lines found")
}

/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.
Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/

joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]

// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]

// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
}

return joinedLine, nil
}

// MapCSVHeaders creates a map of headers[i] -> fields[i].
func MapCSVHeaders(headers []string, fields []string) (map[string]any, error) {
if len(fields) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields))
}

parsedValues := make(map[string]any, len(headers))

for i, val := range fields {
parsedValues[headers[i]] = val
}

return parsedValues, nil
}
175 changes: 175 additions & 0 deletions internal/coreinternal/parseutils/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package parseutils

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_ParseCSV(t *testing.T) {
testCases := []struct {
name string
row string
delimiter rune
lazyQuotes bool
expectedRow []string
expectedErr string
}{
{
name: "Typical CSV row",
row: "field1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Quoted CSV row",
row: `field1,"field2,contains delimiter",field3`,
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2,contains delimiter", "field3"},
},
{
name: "Bare quote in field (strict)",
row: `field1,field"2,field3`,
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1"},
},
{
name: "Bare quote in field (lazy quotes)",
row: `field1,field"2,field3`,
delimiter: ',',
lazyQuotes: true,
expectedRow: []string{"field1", `field"2`, "field3"},
},
{
name: "Empty row",
row: "",
delimiter: ',',
lazyQuotes: false,
expectedErr: "no csv lines found",
},
{
name: "Newlines in field",
row: "field1,fie\nld2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "fie\nld2", "field3"},
},
{
name: "Newlines prefix field",
row: "field1,\nfield2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "\nfield2", "field3"},
},
{
name: "Newlines suffix field",
row: "field1,field2\n,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2\n", "field3"},
},
{
name: "Newlines prefix row",
row: "\nfield1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Newlines suffix row",
row: "field1,field2,field3\n",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"field1", "field2", "field3"},
},
{
name: "Newlines in first row",
row: "fiel\nd1,field2,field3",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"fiel\nd1", "field2", "field3"},
},
{
name: "Newlines in all rows",
row: "\nfiel\nd1,fie\nld2,fie\nld3\n",
delimiter: ',',
lazyQuotes: false,
expectedRow: []string{"fiel\nd1", "fie\nld2", "fie\nld3"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s, err := ReadCSVRow(tc.row, tc.delimiter, tc.lazyQuotes)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.Equal(t, tc.expectedRow, s)
}
})
}
}

func Test_MapCSVHeaders(t *testing.T) {
testCases := []struct {
name string
headers []string
fields []string
expectedMap map[string]any
expectedErr string
}{
{
name: "Map headers to fields",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2", "Val3"},
expectedMap: map[string]any{
"Col1": "Val1",
"Col2": "Val2",
"Col3": "Val3",
},
},
{
name: "Missing field",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2"},
expectedErr: "wrong number of fields: expected 3, found 2",
},
{
name: "Too many fields",
headers: []string{"Col1", "Col2", "Col3"},
fields: []string{"Val1", "Val2", "Val3", "Val4"},
expectedErr: "wrong number of fields: expected 3, found 4",
},
{
name: "Single field",
headers: []string{"Col1"},
fields: []string{"Val1"},
expectedMap: map[string]any{
"Col1": "Val1",
},
},
{
name: "No fields",
headers: []string{},
fields: []string{},
expectedMap: map[string]any{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m, err := MapCSVHeaders(tc.headers, tc.fields)
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.Equal(t, tc.expectedMap, m)
}
})
}
}
18 changes: 18 additions & 0 deletions pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,24 @@ func Test_e2e_converters(t *testing.T) {
tCtx.GetLogRecord().Attributes().PutStr("test", "pass")
},
},
{
statement: `set(attributes["test"], ParseCSV("val1;val2;val3","header1|header2|header3",";","|","strict"))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
m.PutStr("header1", "val1")
m.PutStr("header2", "val2")
m.PutStr("header3", "val3")
},
},
{
statement: `set(attributes["test"], ParseCSV("val1,val2,val3","header1|header2|header3",headerDelimiter="|",mode="strict"))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
m.PutStr("header1", "val1")
m.PutStr("header2", "val2")
m.PutStr("header3", "val3")
},
},
{
statement: `set(attributes["test"], ParseJSON("{\"id\":1}"))`,
want: func(tCtx ottllog.TransformContext) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/ottl/ottlfuncs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ Available Converters:
- [Minutes](#minutes)
- [Nanoseconds](#nanoseconds)
- [Now](#now)
- [ParseCSV](#parsecsv)
- [ParseJSON](#parsejson)
- [ParseKeyValue](#parsekeyvalue)
- [Seconds](#seconds)
Expand Down Expand Up @@ -799,6 +800,38 @@ Examples:
- `UnixSeconds(Now())`
- `set(start_time, Now())`

### ParseCSV

`ParseCSV(target, headers, Optional[delimiter], Optional[headerDelimiter], Optional[mode])`

The `ParseCSV` Converter returns a `pcommon.Map` struct that contains the result of parsing the `target` string as CSV. The resultant map is structured such that it is a mapping of field name -> field value.

`target` is a Getter that returns a string. This string should be a CSV row. if `target` is not a properly formatted CSV row, or if the number of fields in `target` does not match the number of fields in `headers`, `ParseCSV` will return an error. Leading and trailing newlines in `target` will be stripped. Newlines elswhere in `target` are not treated as row delimiters during parsing, and will be treated as though they are part of the field that are placed in.

`headers` is a Getter that returns a string. This string should be a CSV header, specifying the names of the CSV fields.

`delimiter` is an optional string parameter that specifies the delimiter used to split `target` into fields. By default, it is set to `,`.

`headerDelimiter` is an optional string parameter that specified the delimiter used to split `headers` into fields. By default, it is set to the value of `delimiter`.

`mode` is an optional string paramater that specifies the parsing mode. Valid values are `strict`, `lazyQuotes`, and `ignoreQuotes`. By default, it is set to `strict`.
- The `strict` mode provides typical CSV parsing.
- The `lazyQotes` mode provides a relaxed version of CSV parsing where a quote may appear in the middle of a unquoted field.
- The `ignoreQuotes` mode completely ignores any quoting rules for CSV and just splits the row on the delimiter.

Examples:

- `ParseCSV("999-999-9999,Joe Smith,joe.smith@example.com", "phone,name,email")`


- `ParseCSV(body, "phone|name|email", delimiter="|")`


- `ParseCSV(attributes["csv_line"], attributes["csv_headers"], delimiter="|", headerDelimiter=",", mode="lazyQuotes")`


- `ParseCSV("\"555-555-5556,Joe Smith\",joe.smith@example.com", "phone,name,email", mode="ignoreQuotes")`

### ParseJSON

`ParseJSON(target)`
Expand Down
Loading

0 comments on commit a852c86

Please sign in to comment.