Skip to content

Commit

Permalink
limit splunkhecexporter request content length for logs
Browse files Browse the repository at this point in the history
  • Loading branch information
bjsignalfx committed Mar 1, 2021
1 parent e9d82e0 commit 65bc7d3
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 60 deletions.
52 changes: 44 additions & 8 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return consumererror.Permanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
return c.postEvents(ctx, body, compressed)
}

func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bool) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), events)
if err != nil {
return consumererror.Permanent(err)
}
Expand Down Expand Up @@ -157,14 +161,46 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (numDroppedLogs
c.wg.Add(1)
defer c.wg.Done()

splunkEvents := logDataToSplunk(c.logger, ld, c.config)
if len(splunkEvents) == 0 {
return 0, nil
}
gwriter := c.zippers.Get().(*gzip.Writer)
defer c.zippers.Put(gwriter)

err = c.sendSplunkEvents(ctx, splunkEvents)
if err != nil {
return ld.LogRecordCount(), err
gzipBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLength))
gwriter.Reset(gzipBuf)
defer gwriter.Close()

ldWrapper := logDataWrapper{&ld}
eventsCh, cancel := ldWrapper.eventsInChunks(c.logger, c.config)
defer cancel()

for events := range eventsCh {
if events.err != nil {
return ldWrapper.processErr(events.index, events.err)
}

if events.buf.Len() == 0 {
continue
}

// Not compressing if compression disabled or payload fit into a single ethernet frame.
if events.buf.Len() <= 1500 || c.config.DisableCompression {
if err = c.postEvents(ctx, events.buf, false); err != nil {
return ldWrapper.processErr(events.index, err)
}
continue
}

if _, err = gwriter.Write(events.buf.Bytes()); err != nil {
return ldWrapper.processErr(events.index, consumererror.Permanent(err))
}

gwriter.Flush()

if err = c.postEvents(ctx, gzipBuf, true); err != nil {
return ldWrapper.processErr(events.index, err)
}

gzipBuf.Reset()
gwriter.Reset(gzipBuf)
}

return 0, nil
Expand Down
3 changes: 3 additions & 0 deletions exporter/splunkhecexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Config struct {

// insecure_skip_verify skips checking the certificate of the HEC endpoint when sending data over HTTPS. Defaults to false.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`

// MaxContentLength is the Splunk HEC endpoint content length limit. Defaults to 1Mib, the current limit.
MaxContentLength int `mapstructure:"max_content_length"`
}

func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) {
Expand Down
13 changes: 7 additions & 6 deletions exporter/splunkhecexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ func TestLoadConfig(t *testing.T) {
TypeVal: configmodels.Type(typeStr),
NameVal: expectedName,
},
Token: "00000000-0000-0000-0000-0000000000000",
Endpoint: "https://splunk:8088/services/collector",
Source: "otel",
SourceType: "otel",
Index: "metrics",
MaxConnections: 100,
Token: "00000000-0000-0000-0000-0000000000000",
Endpoint: "https://splunk:8088/services/collector",
Source: "otel",
SourceType: "otel",
Index: "metrics",
MaxConnections: 100,
MaxContentLength: 1024 * 1024,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
Expand Down
8 changes: 5 additions & 3 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

const (
// The value of "type" key in configuration.
typeStr = "splunk_hec"
defaultMaxIdleCons = 100
defaultHTTPTimeout = 10 * time.Second
typeStr = "splunk_hec"
defaultMaxIdleCons = 100
defaultHTTPTimeout = 10 * time.Second
defaultMaxContentLength = 1024 * 1024
)

// NewFactory creates a factory for Splunk HEC exporter.
Expand All @@ -54,6 +55,7 @@ func createDefaultConfig() configmodels.Exporter {
QueueSettings: exporterhelper.DefaultQueueSettings(),
DisableCompression: false,
MaxConnections: defaultMaxIdleCons,
MaxContentLength: defaultMaxContentLength,
}
}

Expand Down
147 changes: 137 additions & 10 deletions exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,156 @@
package splunkhecexporter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

func logDataToSplunk(logger *zap.Logger, ld pdata.Logs, config *Config) []*splunk.Event {
var splunkEvents []*splunk.Event
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
for k := 0; k < logs.Len(); k++ {
splunkEvents = append(splunkEvents, mapLogRecordToSplunkEvent(logs.At(k), config, logger))
// eventsBuf is a buffer of JSON encoded Splunk events.
// The events are created from LogRecord(s) where one event maps to one LogRecord.
type eventsBuf struct {
buf *bytes.Buffer
// index is the eventIndex of the 1st event in buf.
index *eventIndex
err error
}

// The index of an event composed of indices of the event's LogRecord.
type eventIndex struct {
// Index of the LogRecord slice element from which the event is created.
log int
// Index of the InstrumentationLibraryLogs slice element parent of the LogRecord.
lib int
// Index of the ResourceLogs slice element parent of the InstrumentationLibraryLogs.
src int
}

type logDataWrapper struct {
*pdata.Logs
}

func (ld *logDataWrapper) eventsInChunks(logger *zap.Logger, config *Config) (chan *eventsBuf, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
eventsCh := make(chan *eventsBuf)

go func() {
defer close(eventsCh)

// event buffers a single event.
event := new(bytes.Buffer)
encoder := json.NewEncoder(event)

// events buffers events up to the max content length.
events := &eventsBuf{buf: new(bytes.Buffer)}

rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
ill := rl.At(i).InstrumentationLibraryLogs()
for j := 0; j < ill.Len(); j++ {
l := ill.At(j).Logs()
for k := 0; k < l.Len(); k++ {
select {
case <-ctx.Done():
return
default:
if err := encoder.Encode(mapLogRecordToSplunkEvent(l.At(k), config, logger)); err != nil {
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}
event.WriteString("\r\n\r\n")

// The size of an event must be less than or equal to max content length.
if config.MaxContentLength > 0 && event.Len() > config.MaxContentLength {
err := fmt.Errorf("found a log event bigger than max content length (event: %d bytes, max: %d bytes)", config.MaxContentLength, event.Len())
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}

// Moving the event to events.buf if length will be <= max content length.
// Max content length <= 0 is interpreted as unbound.
if events.buf.Len()+event.Len() <= config.MaxContentLength || config.MaxContentLength <= 0 {
// WriteTo() empties and resets buffer event.
if _, err := event.WriteTo(events.buf); err != nil {
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}

// Setting events index using the log record indices of the 1st event.
if events.index == nil {
events.index = &eventIndex{src: i, lib: j, log: k}
}

continue
}

eventsCh <- events

// Creating a new events buffer.
events = &eventsBuf{buf: new(bytes.Buffer)}
// Setting events index using the log record indices of any current leftover event.
if event.Len() != 0 {
events.index = &eventIndex{src: i, lib: j, log: k}
}
}
}
}
}

// Writing any leftover event to eventsBuf buffer `events.buf`.
if _, err := event.WriteTo(events.buf); err != nil {
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}

eventsCh <- events

}()
return eventsCh, cancel
}

func (ld *logDataWrapper) countLogs(start *eventIndex) int {
count, orig := 0, *ld.InternalRep().Orig
for i := start.src; i < len(orig); i++ {
for j, iLLogs := range orig[i].InstrumentationLibraryLogs {
switch {
case i == start.src && j < start.lib:
continue
default:
count += len(iLLogs.Logs)
}
}
}
return count - start.log
}

func (ld *logDataWrapper) trimLeft(end *eventIndex) *pdata.Logs {
clone := ld.Clone()
orig := *clone.InternalRep().Orig
orig = orig[end.src:]
orig[end.src].InstrumentationLibraryLogs = orig[end.src].InstrumentationLibraryLogs[end.lib:]
orig[end.src].InstrumentationLibraryLogs[end.lib].Logs = orig[end.src].InstrumentationLibraryLogs[end.lib].Logs[end.log:]
return &clone
}

return splunkEvents
func (ld *logDataWrapper) processErr(index *eventIndex, err error) (int, error) {
if consumererror.IsPermanent(err) {
return ld.countLogs(index), err
}

if _, ok := err.(consumererror.PartialError); ok {
failedLogs := ld.trimLeft(index)
return failedLogs.LogRecordCount(), consumererror.PartialLogsError(err, *failedLogs)
}
return ld.LogRecordCount(), err
}

func mapLogRecordToSplunkEvent(lr pdata.LogRecord, config *Config, logger *zap.Logger) *splunk.Event {
Expand Down
Loading

0 comments on commit 65bc7d3

Please sign in to comment.