Skip to content

Commit

Permalink
Fix/followup fix kinesis compression (#32860)
Browse files Browse the repository at this point in the history
**Description:** 
Follow up fix on kinesis exporter compression. The streams were not
closed properly so the compressed data was not generating compression
footers


**Testing:** 
* tested compressed and uncompressed data locally
* modified compressor test to validate uncompressed data by content and
not just size
  • Loading branch information
leorinat authored May 7, 2024
1 parent aaeff91 commit 391429f
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_followup-fix-kinesis-compression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awskinesisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fixed compressed data not generating the compression footers

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32860]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
19 changes: 16 additions & 3 deletions exporter/awskinesisexporter/internal/compress/compresser.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func NewCompressor(format string) (Compressor, error) {
func flateCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := flate.NewWriter(&buf, flate.BestSpeed)
defer w.Close()

_, err := w.Write(in)

Expand All @@ -44,13 +43,17 @@ func flateCompressor(in []byte) ([]byte, error) {
return nil, err
}

err = w.Close()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func gzipCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
defer w.Close()

_, err := w.Write(in)

Expand All @@ -59,6 +62,12 @@ func gzipCompressor(in []byte) ([]byte, error) {
}

err = w.Flush()
if err != nil {

return nil, err
}

err = w.Close()
if err != nil {
return nil, err
}
Expand All @@ -69,7 +78,6 @@ func gzipCompressor(in []byte) ([]byte, error) {
func zlibCompressor(in []byte) ([]byte, error) {
var buf bytes.Buffer
w, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed)
defer w.Close()

_, err := w.Write(in)

Expand All @@ -82,6 +90,11 @@ func zlibCompressor(in []byte) ([]byte, error) {
return nil, err
}

err = w.Close()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

Expand Down
85 changes: 84 additions & 1 deletion exporter/awskinesisexporter/internal/compress/compresser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
package compress_test

import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"math/rand"
"sync"
"testing"
Expand All @@ -29,7 +34,8 @@ func TestCompressorFormats(t *testing.T) {
{format: "flate"},
}

const data = "You know nothing Jon Snow"
data := createRandomString(1024)

for _, tc := range testCases {
t.Run(fmt.Sprintf("format_%s", tc.format), func(t *testing.T) {
c, err := compress.NewCompressor(tc.format)
Expand All @@ -39,12 +45,44 @@ func TestCompressorFormats(t *testing.T) {
out, err := c([]byte(data))
assert.NoError(t, err, "Must not error when processing data")
assert.NotNil(t, out, "Must have a valid record")

// now data gets decompressed and the original string gets compared with the decompressed one
var dc []byte
var err2 error

switch tc.format {
case "gzip":
dc, err2 = decompressGzip(out)
case "zlib":
dc, err2 = decompressZlib(out)
case "flate":
dc, err2 = decompressFlate(out)
case "noop", "none":
dc = out
default:
dc = out
}

assert.NoError(t, err2)
assert.Equal(t, data, string(dc))
})
}
_, err := compress.NewCompressor("invalid-format")
assert.Error(t, err, "Must error when an invalid compression format is given")
}

func createRandomString(length int) string {
// some characters for the random generation
const letterBytes = " ,.;:*-+/[]{}<>abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

b := make([]byte, length)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}

return string(b)
}

func BenchmarkNoopCompressor_1000Bytes(b *testing.B) {
benchmarkCompressor(b, "none", 1000)
}
Expand Down Expand Up @@ -178,3 +216,48 @@ func concurrentCompressFunc(t *testing.T) {
t.Errorf("Error encountered on concurrent compression: %v", err)
}
}

func decompressGzip(input []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(input))
if err != nil {
return nil, err
}

defer r.Close()

decompressedData, err := io.ReadAll(r)
if err != nil {
return nil, err
}

return decompressedData, nil
}

func decompressZlib(input []byte) ([]byte, error) {
r, err := zlib.NewReader(bytes.NewReader(input))
if err != nil {
return nil, err
}

defer r.Close()

decompressedData, err := io.ReadAll(r)
if err != nil {
return nil, err
}

return decompressedData, nil
}

func decompressFlate(input []byte) ([]byte, error) {

r := flate.NewReader(bytes.NewReader(input))
defer r.Close()

decompressedData, err := io.ReadAll(r)
if err != nil {
return nil, err
}

return decompressedData, nil
}

0 comments on commit 391429f

Please sign in to comment.