Skip to content

Commit

Permalink
[exporter/syslog] send syslog messages in batches (#27799)
Browse files Browse the repository at this point in the history
**Description:**

This changes the behavior of the Syslog exporter to send each batch of
Syslog messages in a single request (with messages separated by
newlines), instead of sending each message in a separate request and
closing the connection after each message.

The batching only happens when using TCP. For UDP, each syslog message
is still sent in a separate request, as defined by [the
spec](https://datatracker.ietf.org/doc/html/rfc5426#section-3.1).

This also significantly refactors (and hopefully simplifies) the
exporter's code, extracting the code that formats the syslog messages
from the `sender` type into separate `formatter` types. Hopefully this
will make the development of this component easier.

**Link to tracking Issue:**

-
#21244

**Testing:**

The unit tests have been updated to reflect the refactored codebase. The
integration tests introduced in
#27464
are unchanged, as the format of the output messages hasn't changed.

**Documentation:**

No documentation updates.
  • Loading branch information
andrzej-stencel authored Oct 30, 2023
1 parent 3d94380 commit f2ec166
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 359 deletions.
27 changes: 27 additions & 0 deletions .chloggen/syslog-exporter-single-request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/syslog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: send syslog messages in batches

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21244]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This changes the behavior of the Syslog exporter to send each batch of Syslog messages in a single request (with messages separated by newlines), instead of sending each message in a separate request and closing the connection after each message.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
129 changes: 64 additions & 65 deletions exporter/syslogexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"crypto/tls"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -23,6 +21,7 @@ type syslogexporter struct {
config *Config
logger *zap.Logger
tlsConfig *tls.Config
formatter formatter
}

func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*syslogexporter, error) {
Expand All @@ -37,11 +36,12 @@ func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*sysloge
config: cfg,
logger: createSettings.Logger,
tlsConfig: tlsConfig,
formatter: createFormatter(cfg.Protocol),
}

s.logger.Info("Syslog Exporter configured",
zap.String("endpoint", cfg.Endpoint),
zap.String("Protocol", cfg.Protocol),
zap.String("protocol", cfg.Protocol),
zap.Int("port", cfg.Port),
)

Expand Down Expand Up @@ -69,78 +69,77 @@ func newLogsExporter(
)
}

func (se *syslogexporter) logsToMap(record plog.LogRecord) map[string]any {
attributes := record.Attributes().AsRaw()
return attributes
}

func (se *syslogexporter) getTimestamp(record plog.LogRecord) time.Time {
timestamp := record.Timestamp().AsTime()
return timestamp
func (se *syslogexporter) pushLogsData(_ context.Context, logs plog.Logs) error {
batchMessages := strings.ToLower(se.config.Network) == "tcp"
var err error
if batchMessages {
err = se.exportBatch(logs)
} else {
err = se.exportNonBatch(logs)
}
return err
}

func (se *syslogexporter) pushLogsData(_ context.Context, ld plog.Logs) error {
type droppedResourceRecords struct {
resource pcommon.Resource
records []plog.LogRecord
}
var (
errs []error
dropped []droppedResourceRecords
)
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
if droppedRecords, err := se.sendSyslogs(rl); err != nil {
dropped = append(dropped, droppedResourceRecords{
resource: rl.Resource(),
records: droppedRecords,
})
errs = append(errs, err)
func (se *syslogexporter) exportBatch(logs plog.Logs) error {
var payload strings.Builder
for i := 0; i < logs.ResourceLogs().Len(); i++ {
resourceLogs := logs.ResourceLogs().At(i)
for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
scopeLogs := resourceLogs.ScopeLogs().At(j)
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)
formatted := se.formatter.format(logRecord)
payload.WriteString(formatted)
}
}
}
if len(dropped) > 0 {
ld = plog.NewLogs()
for i := range dropped {
rls := ld.ResourceLogs().AppendEmpty()
logRecords := rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
dropped[i].resource.MoveTo(rls.Resource())
for j := 0; j < len(dropped[i].records); j++ {
dropped[i].records[j].MoveTo(logRecords)
}

if payload.Len() > 0 {
sender, err := connect(se.logger, se.config, se.tlsConfig)
if err != nil {
return consumererror.NewLogs(err, logs)
}
defer sender.close()
err = sender.Write(payload.String())
if err != nil {
return consumererror.NewLogs(err, logs)
}
errs = deduplicateErrors(errs)
return consumererror.NewLogs(multierr.Combine(errs...), ld)
}
se.logger.Info("Connected successfully, exporting logs....")
return nil
}

func (se *syslogexporter) sendSyslogs(rl plog.ResourceLogs) ([]plog.LogRecord, error) {
var (
errs []error
droppedRecords []plog.LogRecord
)
slgs := rl.ScopeLogs()
for i := 0; i < slgs.Len(); i++ {
slg := slgs.At(i)
for j := 0; j < slg.LogRecords().Len(); j++ {
lr := slg.LogRecords().At(j)
formattedLine := se.logsToMap(lr)
timestamp := se.getTimestamp(lr)
s, errConn := connect(se.logger, se.config, se.tlsConfig)
if errConn != nil {
droppedRecords = append(droppedRecords, lr)
errs = append(errs, errConn)
continue
}
defer s.close()
err := s.Write(formattedLine, timestamp)
if err != nil {
droppedRecords = append(droppedRecords, lr)
errs = append(errs, err)
func (se *syslogexporter) exportNonBatch(logs plog.Logs) error {
sender, err := connect(se.logger, se.config, se.tlsConfig)
if err != nil {
return consumererror.NewLogs(err, logs)
}
defer sender.close()

errs := []error{}
droppedLogs := plog.NewLogs()
for i := 0; i < logs.ResourceLogs().Len(); i++ {
resourceLogs := logs.ResourceLogs().At(i)
droppedResourceLogs := droppedLogs.ResourceLogs().AppendEmpty()
for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
scopeLogs := resourceLogs.ScopeLogs().At(j)
droppedScopeLogs := droppedResourceLogs.ScopeLogs().AppendEmpty()
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)
formatted := se.formatter.format(logRecord)
err = sender.Write(formatted)
if err != nil {
errs = append(errs, err)
droppedLogRecord := droppedScopeLogs.LogRecords().AppendEmpty()
logRecord.CopyTo(droppedLogRecord)
}
}
}
}
return droppedRecords, multierr.Combine(errs...)

if len(errs) > 0 {
errs = deduplicateErrors(errs)
return consumererror.NewLogs(multierr.Combine(errs...), droppedLogs)
}

return nil
}
29 changes: 29 additions & 0 deletions exporter/syslogexporter/formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter"

import (
"go.opentelemetry.io/collector/pdata/plog"
)

func createFormatter(protocol string) formatter {
if protocol == protocolRFC5424Str {
return newRFC5424Formatter()
}
return newRFC3164Formatter()
}

type formatter interface {
format(plog.LogRecord) string
}

// getAttributeValueOrDefault returns the value of the requested log record's attribute as a string.
// If the attribute was not found, it returns the provided default value.
func getAttributeValueOrDefault(logRecord plog.LogRecord, attributeName string, defaultValue string) string {
value := defaultValue
if attributeValue, found := logRecord.Attributes().Get(attributeName); found {
value = attributeValue.AsString()
}
return value
}
56 changes: 56 additions & 0 deletions exporter/syslogexporter/rfc3164_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package syslogexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter"

import (
"fmt"
"strconv"

"go.opentelemetry.io/collector/pdata/plog"
)

type rfc3164Formatter struct {
}

func newRFC3164Formatter() *rfc3164Formatter {
return &rfc3164Formatter{}
}

func (f *rfc3164Formatter) format(logRecord plog.LogRecord) string {
priorityString := f.formatPriority(logRecord)
timestampString := f.formatTimestamp(logRecord)
hostnameString := f.formatHostname(logRecord)
appnameString := f.formatAppname(logRecord)
messageString := f.formatMessage(logRecord)
appnameMessageDelimiter := ""
if len(appnameString) > 0 && messageString != emptyMessage {
appnameMessageDelimiter = " "
}
formatted := fmt.Sprintf("<%s>%s %s %s%s%s\n", priorityString, timestampString, hostnameString, appnameString, appnameMessageDelimiter, messageString)
return formatted
}

func (f *rfc3164Formatter) formatPriority(logRecord plog.LogRecord) string {
return getAttributeValueOrDefault(logRecord, priority, strconv.Itoa(defaultPriority))
}

func (f *rfc3164Formatter) formatTimestamp(logRecord plog.LogRecord) string {
return logRecord.Timestamp().AsTime().Format("Jan 02 15:04:05")
}

func (f *rfc3164Formatter) formatHostname(logRecord plog.LogRecord) string {
return getAttributeValueOrDefault(logRecord, hostname, emptyValue)
}

func (f *rfc3164Formatter) formatAppname(logRecord plog.LogRecord) string {
value := getAttributeValueOrDefault(logRecord, app, "")
if value != "" {
value += ":"
}
return value
}

func (f *rfc3164Formatter) formatMessage(logRecord plog.LogRecord) string {
return getAttributeValueOrDefault(logRecord, message, emptyMessage)
}
41 changes: 41 additions & 0 deletions exporter/syslogexporter/rfc3164_formatter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package syslogexporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestRFC3164Formatter(t *testing.T) {
expected := "<34>Aug 24 05:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8\n"
logRecord := plog.NewLogRecord()
logRecord.Attributes().PutStr("appname", "su")
logRecord.Attributes().PutStr("hostname", "mymachine")
logRecord.Attributes().PutStr("message", "'su root' failed for lonvick on /dev/pts/8")
logRecord.Attributes().PutInt("priority", 34)
timestamp, err := time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003Z")
require.NoError(t, err)
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))

actual := newRFC3164Formatter().format(logRecord)
assert.NoError(t, err)
assert.Equal(t, expected, actual)

expected = "<165>Aug 24 05:14:15 - -\n"
logRecord = plog.NewLogRecord()
logRecord.Attributes().PutStr("message", "-")
timestamp, err = time.Parse(time.RFC3339Nano, "2003-08-24T05:14:15.000003Z")
require.NoError(t, err)
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))

actual = newRFC3164Formatter().format(logRecord)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}
Loading

0 comments on commit f2ec166

Please sign in to comment.