From afe6b1d9063b4417a68d14dc1cb8125213c04ed9 Mon Sep 17 00:00:00 2001 From: Muir Manders Date: Thu, 4 Oct 2018 16:20:46 -0700 Subject: [PATCH 1/2] Reuse lz4 and gzip readers Use sync.Pool to reuse lz4 and gzip reader objects across decompressions. lz4 in particular makes a large allocation per-reader, so you spend all your time in GC if make a new reader per-message. Benchmarking reading 500 messages/s with 3 consumers and 32 partitions, lz4 consumer CPU fell from ~120% to ~5%. gzip went from ~20% to ~5%. --- decompress.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ message.go | 42 +++------------------------------ record_batch.go | 29 +++-------------------- 3 files changed, 69 insertions(+), 65 deletions(-) create mode 100644 decompress.go diff --git a/decompress.go b/decompress.go new file mode 100644 index 000000000..5fa750560 --- /dev/null +++ b/decompress.go @@ -0,0 +1,63 @@ +package sarama + +import ( + "bytes" + "compress/gzip" + "fmt" + "io/ioutil" + "sync" + + "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" +) + +var ( + lz4Pool = sync.Pool{ + New: func() interface{} { + return lz4.NewReader(nil) + }, + } + + gzipPool sync.Pool +) + +func decompress(cc CompressionCodec, data []byte) ([]byte, error) { + switch cc { + case CompressionNone: + return data, nil + case CompressionGZIP: + var ( + err error + reader *gzip.Reader + readerIntf = gzipPool.Get() + ) + if readerIntf != nil { + reader = readerIntf.(*gzip.Reader) + } else { + reader, err = gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + } + + defer gzipPool.Put(reader) + + if err := reader.Reset(bytes.NewReader(data)); err != nil { + return nil, err + } + + return ioutil.ReadAll(reader) + case CompressionSnappy: + return snappy.Decode(data) + case CompressionLZ4: + reader := lz4Pool.Get().(*lz4.Reader) + defer lz4Pool.Put(reader) + + reader.Reset(bytes.NewReader(data)) + return ioutil.ReadAll(reader) + case CompressionZSTD: + return zstdDecompress(nil, data) + default: + return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)} + } +} diff --git a/message.go b/message.go index 44d5cc91b..9ffb4b146 100644 --- a/message.go +++ b/message.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "fmt" - "io/ioutil" "time" "github.com/eapache/go-xerial-snappy" @@ -179,53 +178,18 @@ func (m *Message) decode(pd packetDecoder) (err error) { switch m.Codec { case CompressionNone: // nothing to do - case CompressionGZIP: + default: if m.Value == nil { break } - reader, err := gzip.NewReader(bytes.NewReader(m.Value)) + + m.Value, err = decompress(m.Codec, m.Value) if err != nil { return err } - if m.Value, err = ioutil.ReadAll(reader); err != nil { - return err - } if err := m.decodeSet(); err != nil { return err } - case CompressionSnappy: - if m.Value == nil { - break - } - if m.Value, err = snappy.Decode(m.Value); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - case CompressionLZ4: - if m.Value == nil { - break - } - reader := lz4.NewReader(bytes.NewReader(m.Value)) - if m.Value, err = ioutil.ReadAll(reader); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - case CompressionZSTD: - if m.Value == nil { - break - } - if m.Value, err = zstdDecompress(nil, m.Value); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - default: - return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)} } return pd.pop() diff --git a/record_batch.go b/record_batch.go index 5444557f1..dfcbdb0f6 100644 --- a/record_batch.go +++ b/record_batch.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "fmt" - "io/ioutil" "time" "github.com/eapache/go-xerial-snappy" @@ -174,31 +173,9 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) { return err } - switch b.Codec { - case CompressionNone: - case CompressionGZIP: - reader, err := gzip.NewReader(bytes.NewReader(recBuffer)) - if err != nil { - return err - } - if recBuffer, err = ioutil.ReadAll(reader); err != nil { - return err - } - case CompressionSnappy: - if recBuffer, err = snappy.Decode(recBuffer); err != nil { - return err - } - case CompressionLZ4: - reader := lz4.NewReader(bytes.NewReader(recBuffer)) - if recBuffer, err = ioutil.ReadAll(reader); err != nil { - return err - } - case CompressionZSTD: - if recBuffer, err = zstdDecompress(nil, recBuffer); err != nil { - return err - } - default: - return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)} + recBuffer, err = decompress(b.Codec, recBuffer) + if err != nil { + return err } b.recordsLen = len(recBuffer) From f352e5cc8b4428de4c1e96c22aa784f37674d893 Mon Sep 17 00:00:00 2001 From: Muir Manders Date: Thu, 4 Oct 2018 17:32:11 -0700 Subject: [PATCH 2/2] Reuse compression writer objects Similar to decompressing, you can reuse *lz4.Writer and *gzip.Writer objects by calling Reset() between uses. Add two more sync.Pools so we can reuse writers easily. The gzip writer is only reused when using the default compression. We could maintain a separate sync.Pool for each gzip compression level, but that adds more complexity and gzip wasn't that slow without this optimization. When producing 100 msgs/second across 10 producers, CPU usage dropped from 70% to 4% on my machine (lz4). Gzip dropped from 15% to 5%. --- compress.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ decompress.go | 12 ++++---- message.go | 56 ++++-------------------------------- record_batch.go | 51 ++------------------------------- 4 files changed, 88 insertions(+), 106 deletions(-) create mode 100644 compress.go diff --git a/compress.go b/compress.go new file mode 100644 index 000000000..94b716e4b --- /dev/null +++ b/compress.go @@ -0,0 +1,75 @@ +package sarama + +import ( + "bytes" + "compress/gzip" + "fmt" + "sync" + + "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" +) + +var ( + lz4WriterPool = sync.Pool{ + New: func() interface{} { + return lz4.NewWriter(nil) + }, + } + + gzipWriterPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, + } +) + +func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { + switch cc { + case CompressionNone: + return data, nil + case CompressionGZIP: + var ( + err error + buf bytes.Buffer + writer *gzip.Writer + ) + if level != CompressionLevelDefault { + writer, err = gzip.NewWriterLevel(&buf, level) + if err != nil { + return nil, err + } + } else { + writer = gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(writer) + writer.Reset(&buf) + } + if _, err := writer.Write(data); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil + case CompressionSnappy: + return snappy.Encode(data), nil + case CompressionLZ4: + writer := lz4WriterPool.Get().(*lz4.Writer) + defer lz4WriterPool.Put(writer) + + var buf bytes.Buffer + writer.Reset(&buf) + + if _, err := writer.Write(data); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil + case CompressionZSTD: + return zstdCompressLevel(nil, data, level) + default: + return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} + } +} diff --git a/decompress.go b/decompress.go index 5fa750560..eaccbfc26 100644 --- a/decompress.go +++ b/decompress.go @@ -12,13 +12,13 @@ import ( ) var ( - lz4Pool = sync.Pool{ + lz4ReaderPool = sync.Pool{ New: func() interface{} { return lz4.NewReader(nil) }, } - gzipPool sync.Pool + gzipReaderPool sync.Pool ) func decompress(cc CompressionCodec, data []byte) ([]byte, error) { @@ -29,7 +29,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { var ( err error reader *gzip.Reader - readerIntf = gzipPool.Get() + readerIntf = gzipReaderPool.Get() ) if readerIntf != nil { reader = readerIntf.(*gzip.Reader) @@ -40,7 +40,7 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { } } - defer gzipPool.Put(reader) + defer gzipReaderPool.Put(reader) if err := reader.Reset(bytes.NewReader(data)); err != nil { return nil, err @@ -50,8 +50,8 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { case CompressionSnappy: return snappy.Decode(data) case CompressionLZ4: - reader := lz4Pool.Get().(*lz4.Reader) - defer lz4Pool.Put(reader) + reader := lz4ReaderPool.Get().(*lz4.Reader) + defer lz4ReaderPool.Put(reader) reader.Reset(bytes.NewReader(data)) return ioutil.ReadAll(reader) diff --git a/message.go b/message.go index 9ffb4b146..51d3309c0 100644 --- a/message.go +++ b/message.go @@ -1,13 +1,8 @@ package sarama import ( - "bytes" - "compress/gzip" "fmt" "time" - - "github.com/eapache/go-xerial-snappy" - "github.com/pierrec/lz4" ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. @@ -76,53 +71,12 @@ func (m *Message) encode(pe packetEncoder) error { payload = m.compressedCache m.compressedCache = nil } else if m.Value != nil { - switch m.Codec { - case CompressionNone: - payload = m.Value - case CompressionGZIP: - var buf bytes.Buffer - var writer *gzip.Writer - if m.CompressionLevel != CompressionLevelDefault { - writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel) - if err != nil { - return err - } - } else { - writer = gzip.NewWriter(&buf) - } - if _, err = writer.Write(m.Value); err != nil { - return err - } - if err = writer.Close(); err != nil { - return err - } - m.compressedCache = buf.Bytes() - payload = m.compressedCache - case CompressionSnappy: - tmp := snappy.Encode(m.Value) - m.compressedCache = tmp - payload = m.compressedCache - case CompressionLZ4: - var buf bytes.Buffer - writer := lz4.NewWriter(&buf) - if _, err = writer.Write(m.Value); err != nil { - return err - } - if err = writer.Close(); err != nil { - return err - } - m.compressedCache = buf.Bytes() - payload = m.compressedCache - case CompressionZSTD: - c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel) - if err != nil { - return err - } - m.compressedCache = c - payload = m.compressedCache - default: - return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} + + payload, err = compress(m.Codec, m.CompressionLevel, m.Value) + if err != nil { + return err } + m.compressedCache = payload // Keep in mind the compressed payload size for metric gathering m.compressedSize = len(payload) } diff --git a/record_batch.go b/record_batch.go index dfcbdb0f6..e0f183f7a 100644 --- a/record_batch.go +++ b/record_batch.go @@ -1,13 +1,8 @@ package sarama import ( - "bytes" - "compress/gzip" "fmt" "time" - - "github.com/eapache/go-xerial-snappy" - "github.com/pierrec/lz4" ) const recordBatchOverhead = 49 @@ -196,50 +191,8 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error { } b.recordsLen = len(raw) - switch b.Codec { - case CompressionNone: - b.compressedRecords = raw - case CompressionGZIP: - var buf bytes.Buffer - var writer *gzip.Writer - if b.CompressionLevel != CompressionLevelDefault { - writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel) - if err != nil { - return err - } - } else { - writer = gzip.NewWriter(&buf) - } - if _, err := writer.Write(raw); err != nil { - return err - } - if err := writer.Close(); err != nil { - return err - } - b.compressedRecords = buf.Bytes() - case CompressionSnappy: - b.compressedRecords = snappy.Encode(raw) - case CompressionLZ4: - var buf bytes.Buffer - writer := lz4.NewWriter(&buf) - if _, err := writer.Write(raw); err != nil { - return err - } - if err := writer.Close(); err != nil { - return err - } - b.compressedRecords = buf.Bytes() - case CompressionZSTD: - c, err := zstdCompressLevel(nil, raw, b.CompressionLevel) - if err != nil { - return err - } - b.compressedRecords = c - default: - return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} - } - - return nil + b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw) + return err } func (b *RecordBatch) computeAttributes() int16 {