Skip to content

Commit

Permalink
Reuse compression writer objects
Browse files Browse the repository at this point in the history
Similar to decompressing, you can reuse *lz4.Writer and *gzip.Writer
objects by calling Reset() between uses. Add two more sync.Pools so we
can reuse writers easily.

The gzip writer is only reused when using the default compression. We
could maintain a separate sync.Pool for each gzip compression level,
but that adds more complexity and gzip wasn't that slow without this
optimization.

When producing 100 msgs/second across 10 producers, CPU usage dropped
from 70% to 4% on my machine (lz4). Gzip dropped from 15% to 5%.
  • Loading branch information
Muir Manders committed Oct 5, 2018
1 parent afe6b1d commit f352e5c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 106 deletions.
75 changes: 75 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package sarama

import (
"bytes"
"compress/gzip"
"fmt"
"sync"

"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)

var (
lz4WriterPool = sync.Pool{
New: func() interface{} {
return lz4.NewWriter(nil)
},
}

gzipWriterPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
)

func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
switch cc {
case CompressionNone:
return data, nil
case CompressionGZIP:
var (
err error
buf bytes.Buffer
writer *gzip.Writer
)
if level != CompressionLevelDefault {
writer, err = gzip.NewWriterLevel(&buf, level)
if err != nil {
return nil, err
}
} else {
writer = gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(writer)
writer.Reset(&buf)
}
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionSnappy:
return snappy.Encode(data), nil
case CompressionLZ4:
writer := lz4WriterPool.Get().(*lz4.Writer)
defer lz4WriterPool.Put(writer)

var buf bytes.Buffer
writer.Reset(&buf)

if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionZSTD:
return zstdCompressLevel(nil, data, level)
default:
return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
}
}
12 changes: 6 additions & 6 deletions decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
)

var (
lz4Pool = sync.Pool{
lz4ReaderPool = sync.Pool{
New: func() interface{} {
return lz4.NewReader(nil)
},
}

gzipPool sync.Pool
gzipReaderPool sync.Pool
)

func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
Expand All @@ -29,7 +29,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
var (
err error
reader *gzip.Reader
readerIntf = gzipPool.Get()
readerIntf = gzipReaderPool.Get()
)
if readerIntf != nil {
reader = readerIntf.(*gzip.Reader)
Expand All @@ -40,7 +40,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
}
}

defer gzipPool.Put(reader)
defer gzipReaderPool.Put(reader)

if err := reader.Reset(bytes.NewReader(data)); err != nil {
return nil, err
Expand All @@ -50,8 +50,8 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
case CompressionSnappy:
return snappy.Decode(data)
case CompressionLZ4:
reader := lz4Pool.Get().(*lz4.Reader)
defer lz4Pool.Put(reader)
reader := lz4ReaderPool.Get().(*lz4.Reader)
defer lz4ReaderPool.Put(reader)

reader.Reset(bytes.NewReader(data))
return ioutil.ReadAll(reader)
Expand Down
56 changes: 5 additions & 51 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package sarama

import (
"bytes"
"compress/gzip"
"fmt"
"time"

"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
Expand Down Expand Up @@ -76,53 +71,12 @@ func (m *Message) encode(pe packetEncoder) error {
payload = m.compressedCache
m.compressedCache = nil
} else if m.Value != nil {
switch m.Codec {
case CompressionNone:
payload = m.Value
case CompressionGZIP:
var buf bytes.Buffer
var writer *gzip.Writer
if m.CompressionLevel != CompressionLevelDefault {
writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
if err != nil {
return err
}
} else {
writer = gzip.NewWriter(&buf)
}
if _, err = writer.Write(m.Value); err != nil {
return err
}
if err = writer.Close(); err != nil {
return err
}
m.compressedCache = buf.Bytes()
payload = m.compressedCache
case CompressionSnappy:
tmp := snappy.Encode(m.Value)
m.compressedCache = tmp
payload = m.compressedCache
case CompressionLZ4:
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
if _, err = writer.Write(m.Value); err != nil {
return err
}
if err = writer.Close(); err != nil {
return err
}
m.compressedCache = buf.Bytes()
payload = m.compressedCache
case CompressionZSTD:
c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel)
if err != nil {
return err
}
m.compressedCache = c
payload = m.compressedCache
default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}

payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
if err != nil {
return err
}
m.compressedCache = payload
// Keep in mind the compressed payload size for metric gathering
m.compressedSize = len(payload)
}
Expand Down
51 changes: 2 additions & 49 deletions record_batch.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package sarama

import (
"bytes"
"compress/gzip"
"fmt"
"time"

"github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)

const recordBatchOverhead = 49
Expand Down Expand Up @@ -196,50 +191,8 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
}
b.recordsLen = len(raw)

switch b.Codec {
case CompressionNone:
b.compressedRecords = raw
case CompressionGZIP:
var buf bytes.Buffer
var writer *gzip.Writer
if b.CompressionLevel != CompressionLevelDefault {
writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel)
if err != nil {
return err
}
} else {
writer = gzip.NewWriter(&buf)
}
if _, err := writer.Write(raw); err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
b.compressedRecords = buf.Bytes()
case CompressionSnappy:
b.compressedRecords = snappy.Encode(raw)
case CompressionLZ4:
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
if _, err := writer.Write(raw); err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
b.compressedRecords = buf.Bytes()
case CompressionZSTD:
c, err := zstdCompressLevel(nil, raw, b.CompressionLevel)
if err != nil {
return err
}
b.compressedRecords = c
default:
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
}

return nil
b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
return err
}

func (b *RecordBatch) computeAttributes() int16 {
Expand Down

0 comments on commit f352e5c

Please sign in to comment.