Skip to content

Commit

Permalink
[exporter/awskinesisexporter] fix in compressor crashing under heady …
Browse files Browse the repository at this point in the history
…load due non-safe thread execution (#32589)

Fixing a bug that made the execution panic when the load was high
enough, specially if the payloads were not very tiny.


**Testing:** 
Executed this with and without the fix locally using heavy load, then
ran it in cloud servers using heavier load and a variety of payloads.
  • Loading branch information
leorinat authored Apr 29, 2024
1 parent 9501bb6 commit dd361de
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 98 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awskinesisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: the compressor was crashing under high load due it not being thread safe.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32589]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: removed compressor abstraction and each execution has its own buffer (so it's thread safe)

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
8 changes: 1 addition & 7 deletions exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/producer"
)

Expand Down Expand Up @@ -90,16 +89,11 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op
return nil, err
}

compressor, err := compress.NewCompressor(conf.Encoding.Compression)
if err != nil {
return nil, err
}

encoder, err := batch.NewEncoder(
conf.Encoding.Name,
batch.WithMaxRecordSize(conf.MaxRecordSize),
batch.WithMaxRecordsPerBatch(conf.MaxRecordsPerBatch),
batch.WithCompression(compressor),
batch.WithCompressionType(conf.Compression),
)

if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions exporter/awskinesisexporter/internal/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Batch struct {
maxBatchSize int
maxRecordSize int

compression compress.Compressor
compressionType string

records []types.PutRecordsRequestEntry
}
Expand All @@ -54,20 +54,18 @@ func WithMaxRecordSize(size int) Option {
}
}

func WithCompression(compressor compress.Compressor) Option {
func WithCompressionType(compressionType string) Option {
return func(bt *Batch) {
if compressor != nil {
bt.compression = compressor
}
bt.compressionType = compressionType
}
}

func New(opts ...Option) *Batch {
bt := &Batch{
maxBatchSize: MaxBatchedRecords,
maxRecordSize: MaxRecordSize,
compression: compress.NewNoopCompressor(),
records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords),
maxBatchSize: MaxBatchedRecords,
maxRecordSize: MaxRecordSize,
compressionType: "none",
records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords),
}

for _, op := range opts {
Expand All @@ -78,7 +76,13 @@ func New(opts ...Option) *Batch {
}

func (b *Batch) AddRecord(raw []byte, key string) error {
record, err := b.compression.Do(raw)

compressor, err := compress.NewCompressor(b.compressionType)
if err != nil {
return err
}

record, err := compressor(raw)
if err != nil {
return err
}
Expand Down
103 changes: 54 additions & 49 deletions exporter/awskinesisexporter/internal/compress/compresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,82 @@ import (
"compress/gzip"
"compress/zlib"
"fmt"
"io"
)

type bufferedResetWriter interface {
Write(p []byte) (int, error)
Flush() error
Reset(newWriter io.Writer)
Close() error
}

type Compressor interface {
Do(in []byte) (out []byte, err error)
}

var _ Compressor = (*compressor)(nil)

type compressor struct {
compression bufferedResetWriter
}
type Compressor func(in []byte) ([]byte, error)

func NewCompressor(format string) (Compressor, error) {
c := &compressor{
compression: &noop{},
}
switch format {
case "flate":
w, err := flate.NewWriter(nil, flate.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w
return flateCompressor, nil
case "gzip":
w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w

return gzipCompressor, nil
case "zlib":
w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed)
if err != nil {
return nil, err
}
c.compression = w
return zlibCompressor, nil
case "noop", "none":
// Already the default case
default:
return nil, fmt.Errorf("unknown compression format: %s", format)
return noopCompressor, nil
}

return nil, fmt.Errorf("unknown compression format: %s", format)
}

func flateCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := flate.NewWriter(&buf, flate.BestSpeed)
defer w.Close()

_, err := w.Write(in)

if err != nil {
return nil, err
}

return c, nil
err = w.Flush()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func (c *compressor) Do(in []byte) ([]byte, error) {
buf := new(bytes.Buffer)
func gzipCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
defer w.Close()

c.compression.Reset(buf)
_, err := w.Write(in)

if _, err := c.compression.Write(in); err != nil {
if err != nil {
return nil, err
}

if err := c.compression.Flush(); err != nil {
err = w.Flush()
if err != nil {
return nil, err
}

if closer, ok := c.compression.(io.Closer); ok {
if err := closer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func zlibCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed)
defer w.Close()

_, err := w.Write(in)

if err != nil {
return nil, err
}

err = w.Flush()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func noopCompressor(in []byte) ([]byte, error) {
return in, nil
}
83 changes: 81 additions & 2 deletions exporter/awskinesisexporter/internal/compress/compresser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package compress_test
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -35,7 +36,7 @@ func TestCompressorFormats(t *testing.T) {
require.NoError(t, err, "Must have a valid compression format")
require.NotNil(t, c, "Must have a valid compressor")

out, err := c.Do([]byte(data))
out, err := c([]byte(data))
assert.NoError(t, err, "Must not error when processing data")
assert.NotNil(t, out, "Must have a valid record")
})
Expand Down Expand Up @@ -94,8 +95,86 @@ func benchmarkCompressor(b *testing.B, format string, length int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
out, err := compressor.Do(data)
out, err := compressor(data)
assert.NoError(b, err, "Must not error when processing data")
assert.NotNil(b, out, "Must have a valid byte array after")
}
}

// an issue encountered in the past was a crash due race condition in the compressor, so the
// current implementation creates a new context on each compression request
// this is a test to check no exceptions are raised for executing concurrent compressions
func TestCompressorConcurrent(t *testing.T) {

timeout := time.After(15 * time.Second)
done := make(chan bool)
go func() {
// do your testing
concurrentCompressFunc(t)
done <- true
}()

select {
case <-timeout:
t.Fatal("Test didn't finish in time")
case <-done:
}

}

func concurrentCompressFunc(t *testing.T) {
// this value should be way higher to make this test more valuable, but the make of this project uses
// max 4 workers, so we had to set this value here
numWorkers := 4

var wg sync.WaitGroup
wg.Add(numWorkers)

errCh := make(chan error, numWorkers)
var errMutex sync.Mutex

// any single format would do it here, since each exporter can be set to use only one at a time
// and the concurrent issue that was present in the past was independent of the format
compressFunc, err := compress.NewCompressor("gzip")

if err != nil {
errCh <- err
return
}

// it is important for the data length to be on the higher side of a record
// since it is where the chances of having race conditions are bigger
dataLength := 131072

for j := 0; j < numWorkers; j++ {
go func() {
defer wg.Done()

source := rand.NewSource(time.Now().UnixMilli())
genRand := rand.New(source)

data := make([]byte, dataLength)
for i := 0; i < dataLength; i++ {
data[i] = byte(genRand.Int31())
}

result, localErr := compressFunc(data)
if localErr != nil {
errMutex.Lock()
errCh <- localErr
errMutex.Unlock()
return
}

_ = result
}()
}

wg.Wait()

close(errCh)

for err := range errCh {
t.Errorf("Error encountered on concurrent compression: %v", err)
}
}
30 changes: 0 additions & 30 deletions exporter/awskinesisexporter/internal/compress/noop_compression.go

This file was deleted.

0 comments on commit dd361de

Please sign in to comment.