From dd361de1fbe4c9c2ac4f3efee6d749269b0220ff Mon Sep 17 00:00:00 2001 From: leorinat <165731848+leorinat@users.noreply.github.com> Date: Mon, 29 Apr 2024 13:39:13 +1200 Subject: [PATCH] [exporter/awskinesisexporter] fix in compressor crashing under heady load due non-safe thread execution (#32589) Fixing a bug that made the execution panic when the load was high enough, specially if the payloads were not very tiny. **Testing:** Executed this with and without the fix locally using heavy load, then ran it in cloud servers using heavier load and a variety of payloads. --- ...mpressor-kinesis-exporter-thread-safe.yaml | 27 +++++ exporter/awskinesisexporter/exporter.go | 8 +- .../internal/batch/batch.go | 24 ++-- .../internal/compress/compresser.go | 103 +++++++++--------- .../internal/compress/compresser_test.go | 83 +++++++++++++- .../internal/compress/noop_compression.go | 30 ----- 6 files changed, 177 insertions(+), 98 deletions(-) create mode 100644 .chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml delete mode 100644 exporter/awskinesisexporter/internal/compress/noop_compression.go diff --git a/.chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml b/.chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml new file mode 100644 index 000000000000..1d3b09425bf8 --- /dev/null +++ b/.chloggen/fix_compressor-kinesis-exporter-thread-safe.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awskinesisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: the compressor was crashing under high load due it not being thread safe. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32589] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: removed compressor abstraction and each execution has its own buffer (so it's thread safe) + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/awskinesisexporter/exporter.go b/exporter/awskinesisexporter/exporter.go index 12169d813f94..acf83d831176 100644 --- a/exporter/awskinesisexporter/exporter.go +++ b/exporter/awskinesisexporter/exporter.go @@ -19,7 +19,6 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/producer" ) @@ -90,16 +89,11 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op return nil, err } - compressor, err := compress.NewCompressor(conf.Encoding.Compression) - if err != nil { - return nil, err - } - encoder, err := batch.NewEncoder( conf.Encoding.Name, batch.WithMaxRecordSize(conf.MaxRecordSize), batch.WithMaxRecordsPerBatch(conf.MaxRecordsPerBatch), - batch.WithCompression(compressor), + batch.WithCompressionType(conf.Compression), ) if err != nil { diff --git a/exporter/awskinesisexporter/internal/batch/batch.go b/exporter/awskinesisexporter/internal/batch/batch.go index 9aab615793cc..8da239ac3a44 100644 --- a/exporter/awskinesisexporter/internal/batch/batch.go +++ b/exporter/awskinesisexporter/internal/batch/batch.go @@ -29,7 +29,7 @@ type Batch struct { maxBatchSize int maxRecordSize int - compression compress.Compressor + compressionType string records []types.PutRecordsRequestEntry } @@ -54,20 +54,18 @@ func WithMaxRecordSize(size int) Option { } } -func WithCompression(compressor compress.Compressor) Option { +func WithCompressionType(compressionType string) Option { return func(bt *Batch) { - if compressor != nil { - bt.compression = compressor - } + bt.compressionType = compressionType } } func New(opts ...Option) *Batch { bt := &Batch{ - maxBatchSize: MaxBatchedRecords, - maxRecordSize: MaxRecordSize, - compression: compress.NewNoopCompressor(), - records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords), + maxBatchSize: MaxBatchedRecords, + maxRecordSize: MaxRecordSize, + compressionType: "none", + records: make([]types.PutRecordsRequestEntry, 0, MaxBatchedRecords), } for _, op := range opts { @@ -78,7 +76,13 @@ func New(opts ...Option) *Batch { } func (b *Batch) AddRecord(raw []byte, key string) error { - record, err := b.compression.Do(raw) + + compressor, err := compress.NewCompressor(b.compressionType) + if err != nil { + return err + } + + record, err := compressor(raw) if err != nil { return err } diff --git a/exporter/awskinesisexporter/internal/compress/compresser.go b/exporter/awskinesisexporter/internal/compress/compresser.go index 9197d19e3556..07a221b0f489 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser.go +++ b/exporter/awskinesisexporter/internal/compress/compresser.go @@ -9,77 +9,82 @@ import ( "compress/gzip" "compress/zlib" "fmt" - "io" ) -type bufferedResetWriter interface { - Write(p []byte) (int, error) - Flush() error - Reset(newWriter io.Writer) - Close() error -} - -type Compressor interface { - Do(in []byte) (out []byte, err error) -} - -var _ Compressor = (*compressor)(nil) - -type compressor struct { - compression bufferedResetWriter -} +type Compressor func(in []byte) ([]byte, error) func NewCompressor(format string) (Compressor, error) { - c := &compressor{ - compression: &noop{}, - } switch format { case "flate": - w, err := flate.NewWriter(nil, flate.BestSpeed) - if err != nil { - return nil, err - } - c.compression = w + return flateCompressor, nil case "gzip": - w, err := gzip.NewWriterLevel(nil, gzip.BestSpeed) - if err != nil { - return nil, err - } - c.compression = w - + return gzipCompressor, nil case "zlib": - w, err := zlib.NewWriterLevel(nil, zlib.BestSpeed) - if err != nil { - return nil, err - } - c.compression = w + return zlibCompressor, nil case "noop", "none": - // Already the default case - default: - return nil, fmt.Errorf("unknown compression format: %s", format) + return noopCompressor, nil + } + + return nil, fmt.Errorf("unknown compression format: %s", format) +} + +func flateCompressor(in []byte) ([]byte, error) { + var buf bytes.Buffer + w, _ := flate.NewWriter(&buf, flate.BestSpeed) + defer w.Close() + + _, err := w.Write(in) + + if err != nil { + return nil, err } - return c, nil + err = w.Flush() + if err != nil { + return nil, err + } + + return buf.Bytes(), nil } -func (c *compressor) Do(in []byte) ([]byte, error) { - buf := new(bytes.Buffer) +func gzipCompressor(in []byte) ([]byte, error) { + var buf bytes.Buffer + w, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + defer w.Close() - c.compression.Reset(buf) + _, err := w.Write(in) - if _, err := c.compression.Write(in); err != nil { + if err != nil { return nil, err } - if err := c.compression.Flush(); err != nil { + err = w.Flush() + if err != nil { return nil, err } - if closer, ok := c.compression.(io.Closer); ok { - if err := closer.Close(); err != nil { - return nil, err - } + return buf.Bytes(), nil +} + +func zlibCompressor(in []byte) ([]byte, error) { + var buf bytes.Buffer + w, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed) + defer w.Close() + + _, err := w.Write(in) + + if err != nil { + return nil, err + } + + err = w.Flush() + if err != nil { + return nil, err } return buf.Bytes(), nil } + +func noopCompressor(in []byte) ([]byte, error) { + return in, nil +} diff --git a/exporter/awskinesisexporter/internal/compress/compresser_test.go b/exporter/awskinesisexporter/internal/compress/compresser_test.go index 93075b845420..324a584ebe90 100644 --- a/exporter/awskinesisexporter/internal/compress/compresser_test.go +++ b/exporter/awskinesisexporter/internal/compress/compresser_test.go @@ -6,6 +6,7 @@ package compress_test import ( "fmt" "math/rand" + "sync" "testing" "time" @@ -35,7 +36,7 @@ func TestCompressorFormats(t *testing.T) { require.NoError(t, err, "Must have a valid compression format") require.NotNil(t, c, "Must have a valid compressor") - out, err := c.Do([]byte(data)) + out, err := c([]byte(data)) assert.NoError(t, err, "Must not error when processing data") assert.NotNil(t, out, "Must have a valid record") }) @@ -94,8 +95,86 @@ func benchmarkCompressor(b *testing.B, format string, length int) { b.ResetTimer() for i := 0; i < b.N; i++ { - out, err := compressor.Do(data) + out, err := compressor(data) assert.NoError(b, err, "Must not error when processing data") assert.NotNil(b, out, "Must have a valid byte array after") } } + +// an issue encountered in the past was a crash due race condition in the compressor, so the +// current implementation creates a new context on each compression request +// this is a test to check no exceptions are raised for executing concurrent compressions +func TestCompressorConcurrent(t *testing.T) { + + timeout := time.After(15 * time.Second) + done := make(chan bool) + go func() { + // do your testing + concurrentCompressFunc(t) + done <- true + }() + + select { + case <-timeout: + t.Fatal("Test didn't finish in time") + case <-done: + } + +} + +func concurrentCompressFunc(t *testing.T) { + // this value should be way higher to make this test more valuable, but the make of this project uses + // max 4 workers, so we had to set this value here + numWorkers := 4 + + var wg sync.WaitGroup + wg.Add(numWorkers) + + errCh := make(chan error, numWorkers) + var errMutex sync.Mutex + + // any single format would do it here, since each exporter can be set to use only one at a time + // and the concurrent issue that was present in the past was independent of the format + compressFunc, err := compress.NewCompressor("gzip") + + if err != nil { + errCh <- err + return + } + + // it is important for the data length to be on the higher side of a record + // since it is where the chances of having race conditions are bigger + dataLength := 131072 + + for j := 0; j < numWorkers; j++ { + go func() { + defer wg.Done() + + source := rand.NewSource(time.Now().UnixMilli()) + genRand := rand.New(source) + + data := make([]byte, dataLength) + for i := 0; i < dataLength; i++ { + data[i] = byte(genRand.Int31()) + } + + result, localErr := compressFunc(data) + if localErr != nil { + errMutex.Lock() + errCh <- localErr + errMutex.Unlock() + return + } + + _ = result + }() + } + + wg.Wait() + + close(errCh) + + for err := range errCh { + t.Errorf("Error encountered on concurrent compression: %v", err) + } +} diff --git a/exporter/awskinesisexporter/internal/compress/noop_compression.go b/exporter/awskinesisexporter/internal/compress/noop_compression.go deleted file mode 100644 index be594b6214f1..000000000000 --- a/exporter/awskinesisexporter/internal/compress/noop_compression.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package compress // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress" - -import "io" - -type noop struct { - data io.Writer -} - -func NewNoopCompressor() Compressor { - return &compressor{ - compression: &noop{}, - } -} - -func (n *noop) Reset(w io.Writer) { - n.data = w -} - -func (n noop) Write(p []byte) (int, error) { - return n.data.Write(p) -} - -func (n noop) Flush() error { - return nil -} - -func (n *noop) Close() error { return nil }