Skip to content

Commit

Permalink
Send log data in payloads less than max content length in splunkhecex…
Browse files Browse the repository at this point in the history
…porter (#2524)

Partition log data in payloads that are less than the configured max content length then send. The default max content length is 2MiB.
  • Loading branch information
bjsignalfx authored Mar 30, 2021
1 parent acea466 commit e9fa7d8
Show file tree
Hide file tree
Showing 10 changed files with 620 additions and 174 deletions.
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The following configuration options can also be configured:
- `disable_compression` (default: false): Whether to disable gzip compression over HTTP.
- `timeout` (default: 10s): HTTP timeout when sending data.
- `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS.
- `max_content_length_logs` (default: 2097152): Maximum log data size in bytes per HTTP post limited to 2097152 bytes (2 MiB).

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
Expand Down
183 changes: 174 additions & 9 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type client struct {
headers map[string]string
}

// Minimum number of bytes to compress. 1500 is the MTU of an ethernet frame.
const minCompressionLen = 1500

func (c *client) pushMetricsData(
ctx context.Context,
md pdata.Metrics,
Expand Down Expand Up @@ -116,7 +119,136 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return consumererror.Permanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
return c.postEvents(ctx, body, compressed)
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) {
c.wg.Add(1)
defer c.wg.Done()

gzipWriter := c.zippers.Get().(*gzip.Writer)
defer c.zippers.Put(gzipWriter)

gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs))
gzipWriter.Reset(gzipBuffer)

defer gzipWriter.Close()

// Callback when each batch is to be sent.
send := func(ctx context.Context, buf *bytes.Buffer) (err error) {
shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression

if shouldCompress {
gzipBuffer.Reset()
gzipWriter.Reset(gzipBuffer)

if _, err = io.Copy(gzipWriter, buf); err != nil {
return fmt.Errorf("failed copying buffer to gzip writer: %v", err)
}

if err = gzipWriter.Flush(); err != nil {
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
}

return c.postEvents(ctx, gzipBuffer, shouldCompress)
}

return c.postEvents(ctx, buf, shouldCompress)
}

return c.pushLogDataInBatches(ctx, ld, send)
}

// pushLogDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send func(context.Context, *bytes.Buffer) error) (err error) {
// Length of retained bytes in buffer after truncation.
var bufLen int
// Buffer capacity.
var bufCap = c.config.MaxContentLengthLogs
// A guesstimated value > length of bytes of a single event.
// Added to buffer capacity so that buffer is likely to grow by reslicing when buf.Len() > bufCap.
const bufCapPadding = uint(4096)

// Buffer of JSON encoded Splunk events.
// Expected to grow more than bufCap then truncated to bufLen.
var buf = bytes.NewBuffer(make([]byte, 0, bufCap+bufCapPadding))
var encoder = json.NewEncoder(buf)

var tmpBuf = bytes.NewBuffer(make([]byte, 0, bufCapPadding))

// Index of the log record of the first event in buffer.
var bufFront *logIndex

var permanentErrors []error

var rls = ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
for k := 0; k < logs.Len(); k++ {
if bufFront == nil {
bufFront = &logIndex{resource: i, library: j, record: k}
}

// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(logs.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
if err = encoder.Encode(event); err != nil {
permanentErrors = append(permanentErrors, consumererror.Permanent(fmt.Errorf("dropped log event: %v, error: %v", event, err)))
continue
}
buf.WriteString("\r\n\r\n")

// Continue adding events to buffer up to capacity.
// 0 capacity is interpreted as unknown/unbound consistent with ContentLength in http.Request.
if buf.Len() <= int(bufCap) || bufCap == 0 {
// Tracking length of event bytes below capacity in buffer.
bufLen = buf.Len()
continue
}

tmpBuf.Reset()
// Storing event bytes over capacity in buffer before truncating.
if bufCap > 0 {
if over := buf.Len() - bufLen; over <= int(bufCap) {
tmpBuf.Write(buf.Bytes()[bufLen:buf.Len()])
} else {
permanentErrors = append(permanentErrors, consumererror.Permanent(
fmt.Errorf("dropped log event: %s, error: event size %d bytes larger than configured max content length %d bytes", string(buf.Bytes()[bufLen:buf.Len()]), over, bufCap)))
}
}

// Truncating buffer at tracked length below capacity and sending.
buf.Truncate(bufLen)
if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}
buf.Reset()

// Writing truncated bytes back to buffer.
tmpBuf.WriteTo(buf)

bufFront, bufLen = nil, buf.Len()
}
}
}

if buf.Len() > 0 {
if err = send(ctx, buf); err != nil {
return consumererror.NewLogs(err, *subLogs(&ld, bufFront))
}
}

return consumererror.Combine(permanentErrors)
}

func (c *client) postEvents(ctx context.Context, events io.Reader, compressed bool) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), events)
if err != nil {
return consumererror.Permanent(err)
}
Expand Down Expand Up @@ -148,16 +280,49 @@ func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Ev
return nil
}

func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) error {
c.wg.Add(1)
defer c.wg.Done()
// subLogs returns a subset of `ld` starting from index `from` to the end.
func subLogs(ld *pdata.Logs, from *logIndex) *pdata.Logs {
if ld == nil || from == nil || from.zero() {
return ld
}

splunkEvents := logDataToSplunk(c.logger, ld, c.config)
if len(splunkEvents) == 0 {
return nil
subset := pdata.NewLogs()

resources := ld.ResourceLogs()
resourcesSub := subset.ResourceLogs()

for i := from.resource; i < resources.Len(); i++ {
resourcesSub.Append(pdata.NewResourceLogs())
resources.At(i).Resource().CopyTo(resourcesSub.At(i - from.resource).Resource())

libraries := resources.At(i).InstrumentationLibraryLogs()
librariesSub := resourcesSub.At(i - from.resource).InstrumentationLibraryLogs()

j := 0
if i == from.resource {
j = from.library
}
for jSub := 0; j < libraries.Len(); j++ {
librariesSub.Append(pdata.NewInstrumentationLibraryLogs())
libraries.At(j).InstrumentationLibrary().CopyTo(librariesSub.At(jSub).InstrumentationLibrary())

logs := libraries.At(j).Logs()
logsSub := librariesSub.At(jSub).Logs()
jSub++

k := 0
if i == from.resource && j == from.library {
k = from.record
}
for kSub := 0; k < logs.Len(); k++ {
logsSub.Append(pdata.NewLogRecord())
logs.At(k).CopyTo(logsSub.At(kSub))
kSub++
}
}
}

return c.sendSplunkEvents(ctx, splunkEvents)
return &subset
}

func encodeBodyEvents(zippers *sync.Pool, evs []*splunk.Event, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) {
Expand Down Expand Up @@ -189,7 +354,7 @@ func encodeBody(zippers *sync.Pool, dps []*splunk.Event, disableCompression bool
// avoid attempting to compress things that fit into a single ethernet frame
func getReader(zippers *sync.Pool, b *bytes.Buffer, disableCompression bool) (io.Reader, bool, error) {
var err error
if !disableCompression && b.Len() > 1500 {
if !disableCompression && b.Len() > minCompressionLen {
buf := new(bytes.Buffer)
w := zippers.Get().(*gzip.Writer)
defer zippers.Put(w)
Expand Down
Loading

0 comments on commit e9fa7d8

Please sign in to comment.