-
hi, I am using zstd to compress and decompress small messages received by a service. I know that the message size should be around 500MB to 1KB in size. There are a huge number of these messages that are processed concurrently. I would like to reduce unnecessary memory allocations during compression and decompression. I am wondering which is the recommended way to use this package. In particular, I'm unsure about which objects should be global, which should be put into a
package compression
import (
"bytes"
"io"
"sync"
"errors"
"github.com/klauspost/compress/zstd"
)
type zstdPoolCompressor struct {
writerPool sync.Pool
readerPool sync.Pool
}
var (
ZSTDPoolCompressor Compressor = &zstdPoolCompressor{
writerPool: sync.Pool{
New: func() interface{} {
ret := &zstdWriter{}
ret.buf = bytes.NewBuffer(make([]byte, 0, 1024))
ret.enc, _ = zstd.NewWriter(ret.buf)
return ret
},
},
readerPool: sync.Pool{
New: func() interface{} {
ret := &zstdReader{}
ret.br = bytes.NewReader(nil)
ret.buf = bytes.NewBuffer(make([]byte, 0, 1024))
ret.tmp = make([]byte, 32<<10)
return ret
},
},
}
)
func (z *zstdPoolCompressor) Compress(src []byte) ([]byte, error) {
w := z.writerPool.Get().(*zstdWriter)
defer z.writerPool.Put(w)
ret, err := w.Compress(src)
if err != nil {
return nil, errors.New("zstd error")
}
return ret, nil
}
func (z *zstdPoolCompressor) Decompress(src []byte) ([]byte, error) {
r := z.readerPool.Get().(*zstdReader)
defer z.readerPool.Put(r)
ret, err := r.Decompress(src)
if err != nil {
return nil, errors.New("zstd error")
}
return ret, nil
}
type zstdWriter struct {
buf *bytes.Buffer
enc *zstd.Encoder
}
type zstdReader struct {
br *bytes.Reader
buf *bytes.Buffer
dec *zstd.Decoder
tmp []byte // for io.CopyBuffer
}
func (w *zstdWriter) Compress(src []byte) ([]byte, error) {
w.buf.Reset()
w.enc.Reset(w.buf)
_, err := w.enc.Write(src)
if err != nil {
return nil, err
}
err = w.enc.Close()
if err != nil {
return nil, err
}
return append([]byte{}, w.buf.Bytes()...), nil // copy, the w.b will be reused later
}
func (r *zstdReader) Decompress(src []byte) ([]byte, error) {
r.br.Reset(src)
var err error
if r.dec == nil {
r.dec, err = zstd.NewReader(r.br)
} else {
err = r.dec.Reset(r.br)
}
if err != nil {
return nil, err
}
r.buf.Reset()
_, err = io.CopyBuffer(r.buf, r.dec, r.tmp)
if err != nil {
return nil, err
}
return append([]byte{}, r.buf.Bytes()...), nil // copy, the r.bw will be reused later
}
package compression
import (
"bytes"
"sync"
"github.com/klauspost/compress/zstd"
)
type zstdBufCompressor struct{}
var (
ZSTDBufCompressor = &zstdBufCompressor{}
// Global Decoder
dec, _ = zstd.NewReader(nil)
// Global Encoder
enc, _ = zstd.NewWriter(nil)
// Buffer pool
bufPool = &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 4096))
},
}
)
func (z *zstdBufCompressor) Compress(src []byte) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
encodedBytes := enc.EncodeAll(src, buf.Bytes())
return append([]byte{}, encodedBytes...), nil
}
func (z *zstdBufCompressor) Decompress(src []byte) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
decodedBytes, err := dec.DecodeAll(src, buf.Bytes())
if err != nil {
return nil, err
}
return append([]byte{}, decodedBytes...), nil
}
package compression
import (
"github.com/klauspost/compress/zstd"
)
type zstdCompressor struct{}
var (
ZSTDCompressor = &zstdCompressor{}
// Global Decoder
decoder, _ = zstd.NewReader(nil)
// Global Encoder
encoder, _ = zstd.NewWriter(nil)
)
func (z *zstdCompressor) Compress(src []byte) ([]byte, error) {
return encoder.EncodeAll(src, nil), nil
}
func (z *zstdCompressor) Decompress(src []byte) ([]byte, error) {
return decoder.DecodeAll(src, nil)
} Would appreciate any suggestions or advice 🙏 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
@teikjun 3. Is designed for your use-case. A single instance will allow concurrent compression/decompression operations while not creating extra allocations. If you then re-use destination buffers for en/de-codes you generate very little garbage. Your API doesn't allow for that, but it will make a big difference if you do. |
Beta Was this translation helpful? Give feedback.
@teikjun 3. Is designed for your use-case. A single instance will allow concurrent compression/decompression operations while not creating extra allocations.
If you then re-use destination buffers for en/de-codes you generate very little garbage. Your API doesn't allow for that, but it will make a big difference if you do.