Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send log data in payloads less than max content length in splunkhecexporter #2524

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
65bc7d3
limit splunkhecexporter request content length for logs
bjsignalfx Mar 1, 2021
c66dee4
add unit tests
bjsignalfx Mar 2, 2021
cec865c
Add unit tests
bjsignalfx Mar 4, 2021
0fe0b4d
Update comment
bjsignalfx Mar 4, 2021
4333f91
Add test case to cover context cancellation
bjsignalfx Mar 4, 2021
592ea05
Add test covering pushing invalid logs
bjsignalfx Mar 4, 2021
7006372
Add unit tests
bjsignalfx Mar 5, 2021
0b6169b
Fix failing tests
bjsignalfx Mar 5, 2021
8fce7e2
Fix impi verification error
bjsignalfx Mar 5, 2021
076d467
Add unit tests
bjsignalfx Mar 8, 2021
4d3f4f4
Update docs
bjsignalfx Mar 9, 2021
6eb3576
wip
jrcamp Mar 11, 2021
cb107d8
Merge pull request #1 from jrcamp/hec-max-length
bjsignalfx Mar 16, 2021
748eec7
Handle events buffer dropping last event when buffer len >= max conte…
bjsignalfx Mar 16, 2021
1ac7a74
Refactor pushLogData
bjsignalfx Mar 22, 2021
80038fe
Update error message
bjsignalfx Mar 22, 2021
ca24187
Merge branch 'main' into splunkhecexporter-content-size2
bjsignalfx Mar 23, 2021
f63e9de
Reimplement function subLogs
bjsignalfx Mar 23, 2021
abdf997
Add unit tests
bjsignalfx Mar 24, 2021
3d31393
Update readme
bjsignalfx Mar 24, 2021
72f60e9
Fix comment
bjsignalfx Mar 24, 2021
d92b6d7
Rename local variable
bjsignalfx Mar 25, 2021
13ab6a3
Refactor and add comments
bjsignalfx Mar 26, 2021
a6b83f8
Add comment
bjsignalfx Mar 26, 2021
36f279a
Hard code minimum content length to compress
bjsignalfx Mar 26, 2021
ad98b10
Add comment
bjsignalfx Mar 26, 2021
326b079
Refactor and add comments
bjsignalfx Mar 30, 2021
dcc5e0f
Merge branch 'main' into splunkhecexporter-content-size2
bjsignalfx Mar 30, 2021
1f55e52
Fix broken tests
bjsignalfx Mar 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The following configuration options can also be configured:
- `disable_compression` (default: false): Whether to disable gzip compression over HTTP.
- `timeout` (default: 10s): HTTP timeout when sending data.
- `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS.
- `max_content_length_logs` (default: 2097152): Maximum log data size in bytes per HTTP post limited to 2097152 bytes (2 MiB).
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
Expand Down
183 changes: 174 additions & 9 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type client struct {
headers map[string]string
}

// Minimum number of bytes to compress. 1500 is the MTU of an ethernet frame.
const minCompressionLen = 1500

func (c *client) pushMetricsData(
ctx context.Context,
md pdata.Metrics,
Expand Down Expand Up @@ -116,7 +119,136 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return consumererror.Permanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
return c.postEvents(ctx, body, compressed)
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
c.wg.Add(1)
defer c.wg.Done()

gzipWriter := c.zippers.Get().(*gzip.Writer)
defer c.zippers.Put(gzipWriter)

gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs))
gzipWriter.Reset(gzipBuffer)

defer gzipWriter.Close()

// Callback when each batch is to be sent.
send := func(ctx context.Context, buf *bytes.Buffer) (err error) {
shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression

if shouldCompress {
gzipBuffer.Reset()
gzipWriter.Reset(gzipBuffer)

if _, err = io.Copy(gzipWriter, buf); err != nil {
return fmt.Errorf("failed copying buffer to gzip writer: %v", err)
}

if err = gzipWriter.Flush(); err != nil {
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
}

return c.postEvents(ctx, gzipBuffer, shouldCompress)
}

return c.postEvents(ctx, buf, shouldCompress)
}

return c.pushLogDataInBatches(ctx, ld, send)
}

// pushLogDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) (err error) {
// Length of retained bytes in buffer after truncation.
var bufLen int
// Buffer capacity.
var bufCap = c.config.MaxContentLengthLogs
// A guesstimated value > length of bytes of a single event.
// Added to buffer capacity so that buffer is likely to grow by reslicing when buf.Len() > bufCap.
const bufCapPadding = uint(4096)

// Buffer of JSON encoded Splunk events.
// Expected to grow more than bufCap then truncated to bufLen.
var buf = bytes.NewBuffer(make([]byte, 0, bufCap+bufCapPadding))
var encoder = json.NewEncoder(buf)

var tmpBuf = bytes.NewBuffer(make([]byte, 0, bufCapPadding))

// Index of the log record of the first event in buffer.
var bufFront *logIndex

var permanentErrors []error

var rls = ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
bjsignalfx marked this conversation as resolved.
Show resolved Hide resolved
ills := rls.At(i).InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
for k := 0; k < logs.Len(); k++ {
if bufFront == nil {
bufFront = &logIndex{resource: i, library: j, record: k}
}

// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(logs.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
if err = encoder.Encode(event); err != nil {
permanentErrors = append(permanentErrors, consumererror.Permanent(fmt.Errorf("dropped log event: %v, error: %v", event, err)))
continue
}
buf.WriteString("\r\n\r\n")

// Continue adding events to buffer up to capacity.
// 0 capacity is interpreted as unknown/unbound consistent with ContentLength in http.Request.
if buf.Len() <= int(bufCap) || bufCap == 0 {
// Tracking length of event bytes below capacity in buffer.
bufLen = buf.Len()
continue
}

tmpBuf.Reset()
// Storing event bytes over capacity in buffer before truncating.
if bufCap > 0 {
if over := buf.Len() - bufLen; over <= int(bufCap) {
tmpBuf.Write(buf.Bytes()[bufLen:buf.Len()])
} else {
permanentErrors = append(permanentErrors, consumererror.Permanent(
fmt.Errorf("dropped log event: %s, error: event size %d bytes larger than configured max content length %d bytes", string(buf.Bytes()[bufLen:buf.Len()]), over, bufCap)))
}
}

// Truncating buffer at tracked length below capacity and sending.
buf.Truncate(bufLen)
if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
buf.Reset()

// Writing truncated bytes back to buffer.
tmpBuf.WriteTo(buf)

bufFront, bufLen = nil, buf.Len()
}
}
}

if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}

return consumererror.Combine(permanentErrors)
}

func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bool) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), events)
if err != nil {
return consumererror.Permanent(err)
}
Expand Down Expand Up @@ -148,16 +280,49 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return nil
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
c.wg.Add(1)
defer c.wg.Done()
// subLogs returns a subset of `ld` starting from index `from` to the end.
func subLogs(ld *pdata.Logs, from *logIndex) *pdata.Logs {
if ld == nil || from == nil || from.zero() {
return ld
}

splunkEvents := logDataToSplunk(c.logger, ld, c.config)
if len(splunkEvents) == 0 {
return nil
subset := pdata.NewLogs()

resources := ld.ResourceLogs()
resourcesSub := subset.ResourceLogs()

for i := from.resource; i < resources.Len(); i++ {
resourcesSub.Append(pdata.NewResourceLogs())
resources.At(i).Resource().CopyTo(resourcesSub.At(i - from.resource).Resource())

libraries := resources.At(i).InstrumentationLibraryLogs()
librariesSub := resourcesSub.At(i - from.resource).InstrumentationLibraryLogs()

j := 0
if i == from.resource {
j = from.library
}
for jSub := 0; j < libraries.Len(); j++ {
librariesSub.Append(pdata.NewInstrumentationLibraryLogs())
libraries.At(j).InstrumentationLibrary().CopyTo(librariesSub.At(jSub).InstrumentationLibrary())

logs := libraries.At(j).Logs()
logsSub := librariesSub.At(jSub).Logs()
jSub++

k := 0
if i == from.resource && j == from.library {
k = from.record
}
for kSub := 0; k < logs.Len(); k++ {
logsSub.Append(pdata.NewLogRecord())
logs.At(k).CopyTo(logsSub.At(kSub))
kSub++
}
}
}

return c.sendSplunkEvents(ctx, splunkEvents)
return &subset
}

func encodeBodyEvents(zippers *sync.Pool, evs []*splunk.Event, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) {
Expand Down Expand Up @@ -189,7 +354,7 @@ func encodeBody(zippers *sync.Pool, dps []*splunk.Event, disableCompression bool
// avoid attempting to compress things that fit into a single ethernet frame
func getReader(zippers *sync.Pool, b *bytes.Buffer, disableCompression bool) (io.Reader, bool, error) {
var err error
if !disableCompression && b.Len() > 1500 {
if !disableCompression && b.Len() > minCompressionLen {
buf := new(bytes.Buffer)
w := zippers.Get().(*gzip.Writer)
defer zippers.Put(w)
Expand Down
Loading