diff --git a/decompress.go b/decompress.go index aa7fb7498..a01cefaa5 100644 --- a/decompress.go +++ b/decompress.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "fmt" - "io" "sync" snappy "github.com/eapache/go-xerial-snappy" @@ -19,6 +18,19 @@ var ( } gzipReaderPool sync.Pool + + bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } + + bytesPool = sync.Pool{ + New: func() interface{} { + res := make([]byte, 0, 4096) + return &res + }, + } ) func decompress(cc CompressionCodec, data []byte) ([]byte, error) { @@ -38,9 +50,17 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { return nil, err } - defer gzipReaderPool.Put(reader) + buffer := bufferPool.Get().(*bytes.Buffer) + _, err = buffer.ReadFrom(reader) + // copy the buffer to a new slice with the correct length + // reuse gzipReader and buffer + gzipReaderPool.Put(reader) + res := make([]byte, buffer.Len()) + copy(res, buffer.Bytes()) + buffer.Reset() + bufferPool.Put(buffer) - return io.ReadAll(reader) + return res, err case CompressionSnappy: return snappy.Decode(data) case CompressionLZ4: @@ -50,11 +70,28 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { } else { reader.Reset(bytes.NewReader(data)) } - defer lz4ReaderPool.Put(reader) + buffer := bufferPool.Get().(*bytes.Buffer) + _, err := buffer.ReadFrom(reader) + // copy the buffer to a new slice with the correct length + // reuse lz4Reader and buffer + lz4ReaderPool.Put(reader) + res := make([]byte, buffer.Len()) + copy(res, buffer.Bytes()) + buffer.Reset() + bufferPool.Put(buffer) - return io.ReadAll(reader) + return res, err case CompressionZSTD: - return zstdDecompress(ZstdDecoderParams{}, nil, data) + buffer := *bytesPool.Get().(*[]byte) + var err error + buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data) + // copy the buffer to a new slice with the correct length and reuse buffer + res := make([]byte, len(buffer)) + copy(res, buffer) + buffer = buffer[:0] + bytesPool.Put(&buffer) + + return res, err default: return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)} }