Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send log data in payloads less than max content length in splunkhecexporter #2524

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
65bc7d3
limit splunkhecexporter request content length for logs
bjsignalfx Mar 1, 2021
c66dee4
add unit tests
bjsignalfx Mar 2, 2021
cec865c
Add unit tests
bjsignalfx Mar 4, 2021
0fe0b4d
Update comment
bjsignalfx Mar 4, 2021
4333f91
Add test case to cover context cancellation
bjsignalfx Mar 4, 2021
592ea05
Add test covering pushing invalid logs
bjsignalfx Mar 4, 2021
7006372
Add unit tests
bjsignalfx Mar 5, 2021
0b6169b
Fix failing tests
bjsignalfx Mar 5, 2021
8fce7e2
Fix impi verification error
bjsignalfx Mar 5, 2021
076d467
Add unit tests
bjsignalfx Mar 8, 2021
4d3f4f4
Update docs
bjsignalfx Mar 9, 2021
6eb3576
wip
jrcamp Mar 11, 2021
cb107d8
Merge pull request #1 from jrcamp/hec-max-length
bjsignalfx Mar 16, 2021
748eec7
Handle events buffer dropping last event when buffer len >= max conte…
bjsignalfx Mar 16, 2021
1ac7a74
Refactor pushLogData
bjsignalfx Mar 22, 2021
80038fe
Update error message
bjsignalfx Mar 22, 2021
ca24187
Merge branch 'main' into splunkhecexporter-content-size2
bjsignalfx Mar 23, 2021
f63e9de
Reimplement function subLogs
bjsignalfx Mar 23, 2021
abdf997
Add unit tests
bjsignalfx Mar 24, 2021
3d31393
Update readme
bjsignalfx Mar 24, 2021
72f60e9
Fix comment
bjsignalfx Mar 24, 2021
d92b6d7
Rename local variable
bjsignalfx Mar 25, 2021
13ab6a3
Refactor and add comments
bjsignalfx Mar 26, 2021
a6b83f8
Add comment
bjsignalfx Mar 26, 2021
36f279a
Hard code minimum content length to compress
bjsignalfx Mar 26, 2021
ad98b10
Add comment
bjsignalfx Mar 26, 2021
326b079
Refactor and add comments
bjsignalfx Mar 30, 2021
dcc5e0f
Merge branch 'main' into splunkhecexporter-content-size2
bjsignalfx Mar 30, 2021
1f55e52
Fix broken tests
bjsignalfx Mar 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return consumererror.Permanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
return c.postEvents(ctx, body, compressed)
}

func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bool) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), events)
if err != nil {
return consumererror.Permanent(err)
}
Expand Down Expand Up @@ -157,14 +161,46 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (numDroppedLogs
c.wg.Add(1)
defer c.wg.Done()

splunkEvents := logDataToSplunk(c.logger, ld, c.config)
if len(splunkEvents) == 0 {
return 0, nil
}
gwriter := c.zippers.Get().(*gzip.Writer)
defer c.zippers.Put(gwriter)

err = c.sendSplunkEvents(ctx, splunkEvents)
if err != nil {
return ld.LogRecordCount(), err
gzipBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLength))
gwriter.Reset(gzipBuf)
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
defer gwriter.Close()

ldWrapper := logDataWrapper{&ld}
eventsCh, cancel := ldWrapper.eventsInChunks(c.logger, c.config)
defer cancel()

for events := range eventsCh {
if events.err != nil {
return ldWrapper.processErr(events.index, events.err)
}

if events.buf.Len() == 0 {
continue
}

// Not compressing if compression disabled or payload fit into a single ethernet frame.
if events.buf.Len() <= 1500 || c.config.DisableCompression {
if err = c.postEvents(ctx, events.buf, false); err != nil {
return ldWrapper.processErr(events.index, err)
}
continue
}

if _, err = gwriter.Write(events.buf.Bytes()); err != nil {
return ldWrapper.processErr(events.index, consumererror.Permanent(err))
}

gwriter.Flush()

if err = c.postEvents(ctx, gzipBuf, true); err != nil {
return ldWrapper.processErr(events.index, err)
}

gzipBuf.Reset()
gwriter.Reset(gzipBuf)
}

return 0, nil
Expand Down
3 changes: 3 additions & 0 deletions exporter/splunkhecexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Config struct {

// insecure_skip_verify skips checking the certificate of the HEC endpoint when sending data over HTTPS. Defaults to false.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`

// MaxContentLength is the Splunk HEC endpoint content length limit. Defaults to 1Mib, the current limit.
MaxContentLength int `mapstructure:"max_content_length"`
}

func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) {
Expand Down
13 changes: 7 additions & 6 deletions exporter/splunkhecexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ func TestLoadConfig(t *testing.T) {
TypeVal: configmodels.Type(typeStr),
NameVal: expectedName,
},
Token: "00000000-0000-0000-0000-0000000000000",
Endpoint: "https://splunk:8088/services/collector",
Source: "otel",
SourceType: "otel",
Index: "metrics",
MaxConnections: 100,
Token: "00000000-0000-0000-0000-0000000000000",
Endpoint: "https://splunk:8088/services/collector",
Source: "otel",
SourceType: "otel",
Index: "metrics",
MaxConnections: 100,
MaxContentLength: 1024 * 1024,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
Expand Down
8 changes: 5 additions & 3 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

const (
// The value of "type" key in configuration.
typeStr = "splunk_hec"
defaultMaxIdleCons = 100
defaultHTTPTimeout = 10 * time.Second
typeStr = "splunk_hec"
defaultMaxIdleCons = 100
defaultHTTPTimeout = 10 * time.Second
defaultMaxContentLength = 1024 * 1024
)

// NewFactory creates a factory for Splunk HEC exporter.
Expand All @@ -54,6 +55,7 @@ func createDefaultConfig() configmodels.Exporter {
QueueSettings: exporterhelper.DefaultQueueSettings(),
DisableCompression: false,
MaxConnections: defaultMaxIdleCons,
MaxContentLength: defaultMaxContentLength,
}
}

Expand Down
147 changes: 137 additions & 10 deletions exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,156 @@
package splunkhecexporter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

func logDataToSplunk(logger *zap.Logger, ld pdata.Logs, config *Config) []*splunk.Event {
var splunkEvents []*splunk.Event
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
for k := 0; k < logs.Len(); k++ {
splunkEvents = append(splunkEvents, mapLogRecordToSplunkEvent(logs.At(k), config, logger))
// eventsBuf is a buffer of JSON encoded Splunk events.
// The events are created from LogRecord(s) where one event maps to one LogRecord.
type eventsBuf struct {
buf *bytes.Buffer
// index is the eventIndex of the 1st event in buf.
index *eventIndex
err error
}

// The index of an event composed of indices of the event's LogRecord.
type eventIndex struct {
// Index of the LogRecord slice element from which the event is created.
log int
// Index of the InstrumentationLibraryLogs slice element parent of the LogRecord.
lib int
// Index of the ResourceLogs slice element parent of the InstrumentationLibraryLogs.
src int
}

type logDataWrapper struct {
*pdata.Logs
}

func (ld *logDataWrapper) eventsInChunks(logger *zap.Logger, config *Config) (chan *eventsBuf, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
eventsCh := make(chan *eventsBuf)

go func() {
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
defer close(eventsCh)

// event buffers a single event.
event := new(bytes.Buffer)
encoder := json.NewEncoder(event)

// events buffers events up to the max content length.
events := &eventsBuf{buf: new(bytes.Buffer)}

rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
ill := rl.At(i).InstrumentationLibraryLogs()
for j := 0; j < ill.Len(); j++ {
l := ill.At(j).Logs()
for k := 0; k < l.Len(); k++ {
select {
case <-ctx.Done():
return
default:
if err := encoder.Encode(mapLogRecordToSplunkEvent(l.At(k), config, logger)); err != nil {
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}
event.WriteString("\r\n\r\n")

// The size of an event must be less than or equal to max content length.
if config.MaxContentLength > 0 && event.Len() > config.MaxContentLength {
err := fmt.Errorf("found a log event bigger than max content length (event: %d bytes, max: %d bytes)", config.MaxContentLength, event.Len())
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Moving the event to events.buf if length will be <= max content length.
// Max content length <= 0 is interpreted as unbound.
if events.buf.Len()+event.Len() <= config.MaxContentLength || config.MaxContentLength <= 0 {
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
// WriteTo() empties and resets buffer event.
if _, err := event.WriteTo(events.buf); err != nil {
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}

// Setting events index using the log record indices of the 1st event.
if events.index == nil {
events.index = &eventIndex{src: i, lib: j, log: k}
}

continue
}

eventsCh <- events

// Creating a new events buffer.
events = &eventsBuf{buf: new(bytes.Buffer)}
// Setting events index using the log record indices of any current leftover event.
if event.Len() != 0 {
events.index = &eventIndex{src: i, lib: j, log: k}
}
}
}
}
}

// Writing any leftover event to eventsBuf buffer `events.buf`.
if _, err := event.WriteTo(events.buf); err != nil {
eventsCh <- &eventsBuf{buf: nil, index: nil, err: consumererror.Permanent(err)}
return
}

eventsCh <- events

}()
return eventsCh, cancel
}

func (ld *logDataWrapper) countLogs(start *eventIndex) int {
count, orig := 0, *ld.InternalRep().Orig
for i := start.src; i < len(orig); i++ {
for j, iLLogs := range orig[i].InstrumentationLibraryLogs {
switch {
case i == start.src && j < start.lib:
continue
default:
count += len(iLLogs.Logs)
}
}
}
return count - start.log
}

func (ld *logDataWrapper) trimLeft(end *eventIndex) *pdata.Logs {
clone := ld.Clone()
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
orig := *clone.InternalRep().Orig
orig = orig[end.src:]
orig[end.src].InstrumentationLibraryLogs = orig[end.src].InstrumentationLibraryLogs[end.lib:]
orig[end.src].InstrumentationLibraryLogs[end.lib].Logs = orig[end.src].InstrumentationLibraryLogs[end.lib].Logs[end.log:]
return &clone
}

return splunkEvents
func (ld *logDataWrapper) processErr(index *eventIndex, err error) (int, error) {
if consumererror.IsPermanent(err) {
return ld.countLogs(index), err
}

if _, ok := err.(consumererror.PartialError); ok {
failedLogs := ld.trimLeft(index)
return failedLogs.LogRecordCount(), consumererror.PartialLogsError(err, *failedLogs)
}
return ld.LogRecordCount(), err
}

func mapLogRecordToSplunkEvent(lr pdata.LogRecord, config *Config, logger *zap.Logger) *splunk.Event {
Expand Down
Loading