Skip to content

Commit

Permalink
fix(outputs.syslog): Trim field-names belonging to explicit SDIDs cor…
Browse files Browse the repository at this point in the history
…rectly (#16014)
  • Loading branch information
srebhan authored Oct 15, 2024
1 parent c66c2c7 commit 0b1581c
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 12 deletions.
9 changes: 5 additions & 4 deletions plugins/outputs/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) {
}
}
for _, metric := range metrics {
var msg *rfc5424.SyslogMessage
if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil {
msg, err := s.mapper.MapMetricToSyslogMessage(metric)
if err != nil {
s.Log.Errorf("Failed to create syslog message: %v", err)
continue
}
var msgBytesWithFraming []byte
if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil {

msgBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(msg)
if err != nil {
s.Log.Errorf("Failed to convert syslog message with framing: %v", err)
continue
}
Expand Down
19 changes: 11 additions & 8 deletions plugins/outputs/syslog/syslog_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,25 @@ func (sm *SyslogMapper) mapStructuredData(metric telegraf.Metric, msg *rfc5424.S
}

func (sm *SyslogMapper) mapStructuredDataItem(key, value string, msg *rfc5424.SyslogMessage) {
// Do not add already reserved keys
if sm.reservedKeys[key] {
return
}
isExplicitSdid := false

// Add keys matching one of the sd-IDs
for _, sdid := range sm.Sdids {
k := strings.TrimLeft(key, sdid+sm.Separator)
if len(key) > len(k) {
isExplicitSdid = true
if k := strings.TrimPrefix(key, sdid+sm.Separator); key != k {
msg.SetParameter(sdid, k, value)
break
return
}
}
if !isExplicitSdid && len(sm.DefaultSdid) > 0 {
k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator)
msg.SetParameter(sm.DefaultSdid, k, value)

// Add remaining keys with the default sd-ID if configured
if sm.DefaultSdid == "" {
return
}
k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator)
msg.SetParameter(sm.DefaultSdid, k, value)
}

func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
Expand Down
156 changes: 156 additions & 0 deletions plugins/outputs/syslog/syslog_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package syslog

import (
"bytes"
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/leodido/go-syslog/v4/nontransparent"
)
Expand Down Expand Up @@ -428,3 +434,153 @@ func TestStartupErrorBehaviorRetry(t *testing.T) {
wg.Wait()
require.NotEmpty(t, string(buf))
}

func TestCases(t *testing.T) {
// Get all testcase directories
folders, err := os.ReadDir("testcases")
require.NoError(t, err)

// Register the plugin
outputs.Add("syslog", func() telegraf.Output { return newSyslog() })

for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}

t.Run(f.Name(), func(t *testing.T) {
testcasePath := filepath.Join("testcases", f.Name())
configFilename := filepath.Join(testcasePath, "telegraf.conf")
inputFilename := filepath.Join(testcasePath, "input.influx")
expectedFilename := filepath.Join(testcasePath, "expected.out")
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")

// Get parser to parse input and expected output
parser := &influx.Parser{}
require.NoError(t, parser.Init())

// Load the input data
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
require.NoError(t, err)

// Read the expected output if any
var expected []byte
if _, err := os.Stat(expectedFilename); err == nil {
expected, err = os.ReadFile(expectedFilename)
require.NoError(t, err)
}

// Read the expected output if any
var expectedError string
if _, err := os.Stat(expectedErrorFilename); err == nil {
expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename)
require.NoError(t, err)
require.Len(t, expectedErrors, 1)
expectedError = expectedErrors[0]
}

// Configure the plugin
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFilename))
require.Len(t, cfg.Outputs, 1)

// Create a mock-server to receive the data
server, err := newMockServer()
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
server.listen()
}()
defer server.close()

// Setup the plugin
plugin := cfg.Outputs[0].Output.(*Syslog)
plugin.Address = "udp://" + server.address()
plugin.Log = testutil.Logger{}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Write the data and wait for it to arrive
err = plugin.Write(input)
if expectedError != "" {
require.ErrorContains(t, err, expectedError)
return
}
require.NoError(t, err)
require.NoError(t, plugin.Close())

require.Eventuallyf(t, func() bool {
return server.len() >= len(expected)
}, 3*time.Second, 100*time.Millisecond, "received %q", server.message())

// Check the received data
require.Equal(t, string(expected), server.message())
})
}
}

type mockServer struct {
conn *net.UDPConn

data bytes.Buffer
err error

sync.Mutex
}

func newMockServer() (*mockServer, error) {
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
return nil, err
}

conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

return &mockServer{conn: conn}, nil
}

func (s *mockServer) address() string {
return s.conn.LocalAddr().String()
}

func (s *mockServer) listen() {
buf := make([]byte, 2048)
for {
n, err := s.conn.Read(buf)
if err != nil {
s.err = err
return
}
s.Lock()
_, _ = s.data.Write(buf[:n])
s.Unlock()
}
}

func (s *mockServer) close() error {
if s.conn == nil {
return nil
}

return s.conn.Close()
}

func (s *mockServer) message() string {
s.Lock()
defer s.Unlock()
return s.data.String()
}

func (s *mockServer) len() int {
s.Lock()
defer s.Unlock()
return s.data.Len()
}
1 change: 1 addition & 0 deletions plugins/outputs/syslog/testcases/issue_16012/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
342 <13>1 2024-10-11T21:30:04Z draco Telegraf - scc-change-logs [additional entityUid="6b580296-7199-47b5-9736-9b91329c284e" lastEventDate="2024-10-09T19:26:13Z" status="COMPLETED" uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3"][events action="UPDATE" date="2024-10-09T19:26:08Z" description="Changed ASA Config" diff="" username="user@mydomain.com"]
1 change: 1 addition & 0 deletions plugins/outputs/syslog/testcases/issue_16012/input.influx
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scc-change-logs,host=draco events_description="Changed ASA Config",events_diff="@@ -5,1 +5,1 @@\\n-: Written by lockhart at 18:53:02.210 UTC Tue Oct 8 2024\\n+: Written by lockhart at 19:24:54.048 UTC Wed Oct 9 2024\\n@@ -135,2 +135,0 @@\\n-object network 1.1.1.1\\n-host 1.1.1.1\\n@@ -239,0 +237,2 @@\\n+object network 1.1.1.1\\n+host 1.1.1.1\\n@@ -1108,1 +1108,1 @@\\n-Cryptochecksum:b06f479add1a10f8388a2958d0ee0018\\n+Cryptochecksum:b858dfb10323f3dbc9694a49b8c94168",events_username="user@mydomain.com",events_date="2024-10-09T19:26:08Z",events_action="UPDATE",uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3",status="COMPLETED",lastEventDate="2024-10-09T19:26:13Z",entityUid="6b580296-7199-47b5-9736-9b91329c284e" 1728682204000000000
4 changes: 4 additions & 0 deletions plugins/outputs/syslog/testcases/issue_16012/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[[outputs.syslog]]
address = "udp://127.0.0.1:0"
default_sdid = "additional"
sdids = ["events"]

0 comments on commit 0b1581c

Please sign in to comment.