Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multiline processors #2063

Merged
merged 10 commits into from
Sep 20, 2024
91 changes: 80 additions & 11 deletions internal/elasticsearch/ingest/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package ingest

import (
"bufio"
"bytes"
"fmt"

"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -50,14 +52,15 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
return procs, nil
}

// extract a list of processors from a pipeline definition in YAML format.
// processorsFromYAML extracts a list of processors from a pipeline definition in YAML format.
func processorsFromYAML(content []byte) (procs []Processor, err error) {
var p struct {
Processors []yaml.Node
}
if err = yaml.Unmarshal(content, &p); err != nil {
return nil, err
}

for idx, entry := range p.Processors {
if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 {
return nil, fmt.Errorf("processor#%d is not a single-key map (kind:%v content:%d)", idx, entry.Kind, len(entry.Content))
Expand All @@ -70,22 +73,88 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) {
return nil, fmt.Errorf("error decoding processor#%d type: %w", idx, err)
}
proc.FirstLine = entry.Line
proc.LastLine = lastLine(&entry)
lastLine, err := getProcessorLastLine(idx, p.Processors, proc, content)
if err != nil {
return nil, err
}
proc.LastLine = lastLine

procs = append(procs, proc)
}
return procs, nil
return procs, err
}

// returns the last (greater) line number used by a yaml.Node.
func lastLine(node *yaml.Node) int {
// getProcessorLastLine determines the last line number for the given processor.
func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Processor, content []byte) (int, error) {
if idx < len(processors)-1 {
var endProcessor = processors[idx+1].Line - 1
if endProcessor < currentProcessor.FirstLine {
return currentProcessor.FirstLine, nil
} else {
return processors[idx+1].Line - 1, nil
}
}

return nextProcessorOrEndOfPipeline(content)
mrodm marked this conversation as resolved.
Show resolved Hide resolved
}

// nextProcessorOrEndOfPipeline get the line before the node after the processors node. If there is none, it returns the end of file line
func nextProcessorOrEndOfPipeline(content []byte) (int, error) {
var root yaml.Node
if err := yaml.Unmarshal(content, &root); err != nil {
return 0, fmt.Errorf("error unmarshaling YAML: %v", err)
}

var nodes []*yaml.Node
extractNodesFromMapping(&root, &nodes)
for i, node := range nodes {

if node.Value == "processors" {
if i < len(nodes)-1 {

return nodes[i+1].Line - 1, nil
}
}

}
return countLinesInBytes(content)
mrodm marked this conversation as resolved.
Show resolved Hide resolved
}

// extractNodesFromMapping recursively extracts all nodes from MappingNodes within DocumentNodes.
func extractNodesFromMapping(node *yaml.Node, nodes *[]*yaml.Node) {
if node == nil {
return 0
return
}
last := node.Line
for _, inner := range node.Content {
if line := lastLine(inner); line > last {
last = line

if node.Kind == yaml.DocumentNode {
for _, child := range node.Content {
extractNodesFromMapping(child, nodes)
}
return
}

if node.Kind == yaml.MappingNode {
for _, child := range node.Content {
if child.Kind == yaml.MappingNode || child.Kind == yaml.ScalarNode {
*nodes = append(*nodes, child)
}
extractNodesFromMapping(child, nodes)
}
}
return last
}

// countLinesInBytes counts the number of lines in the given byte slice.
func countLinesInBytes(data []byte) (int, error) {
scanner := bufio.NewScanner(bytes.NewReader(data))
lineCount := 0

for scanner.Scan() {
lineCount++
}

if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("error reading data: %w", err)
}

return lineCount, nil
}
Loading