Skip to content

Commit

Permalink
Introduced lifecycle for compression providers (#284)
Browse files Browse the repository at this point in the history
* Introduced lifecycle for compression providers

* Fixed mocked test
  • Loading branch information
merlimat authored Jun 18, 2020
1 parent f6198df commit d4c5dcc
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 102 deletions.
80 changes: 49 additions & 31 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
)

var (
compressionProviders = map[pb.CompressionType]compression.Provider{
pb.CompressionType_NONE: compression.NoopProvider,
pb.CompressionType_LZ4: compression.Lz4Provider,
pb.CompressionType_ZLIB: compression.ZLibProvider,
pb.CompressionType_ZSTD: compression.ZStdProvider,
}
)

type consumerState int

const (
Expand Down Expand Up @@ -115,29 +106,32 @@ type partitionConsumer struct {
dlq *dlqRouter

log *log.Entry

compressionProviders map[pb.CompressionType]compression.Provider
}

func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter) (*partitionConsumer, error) {
pc := &partitionConsumer{
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *messageID)),
dlq: dlq,
log: log.WithField("topic", options.topic),
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *messageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
Expand Down Expand Up @@ -689,6 +683,10 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
pc.log.Info("Closed consumer")
}

for _, provider := range pc.compressionProviders {
provider.Close()
}

pc.state = consumerClosed
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
Expand Down Expand Up @@ -850,11 +848,15 @@ func getPreviousMessage(mid *messageID) *messageID {
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
provider, ok := compressionProviders[msgMeta.GetCompression()]
provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
if !ok {
err := fmt.Errorf("unsupported compression type: %v", msgMeta.GetCompression())
pc.log.WithError(err).Error("Failed to decompress message.")
return nil, err
var err error
if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
pc.log.WithError(err).Error("Failed to decompress message.")
return nil, err
}

pc.compressionProviders[msgMeta.GetCompression()] = provider
}

uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
Expand All @@ -865,6 +867,22 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
return internal.NewBufferWrapper(uncompressed), nil
}

func (pc *partitionConsumer) initializeCompressionProvider(
compressionType pb.CompressionType) (compression.Provider, error) {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NewNoopProvider(), nil
case pb.CompressionType_ZLIB:
return compression.NewZLibProvider(), nil
case pb.CompressionType_LZ4:
return compression.NewLz4Provider(), nil
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(), nil
}

return nil, fmt.Errorf("unsupported compression type: %v", compressionType)
}

func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
pc.log.WithFields(log.Fields{
Expand Down
18 changes: 12 additions & 6 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package pulsar
import (
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"

"github.com/stretchr/testify/assert"

"github.com/apache/pulsar-client-go/pulsar/internal"
Expand All @@ -28,8 +31,9 @@ import (
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
Expand All @@ -56,8 +60,9 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
Expand All @@ -84,8 +89,9 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
Expand Down
17 changes: 8 additions & 9 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package internal

import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
Expand Down Expand Up @@ -92,10 +91,6 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
bb.msgMetadata.Compression = &compressionType
}

if !bb.compressionProvider.CanCompress() {
return nil, fmt.Errorf("compression provider %d can only decompress data", compressionType)
}

return bb, nil
}

Expand Down Expand Up @@ -177,16 +172,20 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
return buffer.ReadableSlice(), sequenceID, callbacks
}

func (bb *BatchBuilder) Close() error {
return bb.compressionProvider.Close()
}

func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NoopProvider
return compression.NewNoopProvider()
case pb.CompressionType_LZ4:
return compression.Lz4Provider
return compression.NewLz4Provider()
case pb.CompressionType_ZLIB:
return compression.ZLibProvider
return compression.NewZLibProvider()
case pb.CompressionType_ZSTD:
return compression.ZStdProvider
return compression.NewZStdProvider()
default:
log.Panic("unsupported compression type")
return nil
Expand Down
18 changes: 8 additions & 10 deletions pulsar/internal/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package compression

import "io"

// Provider is a interface of compression providers
type Provider interface {
// CanCompress checks if the compression method is available under the current version.
CanCompress() bool

// Compress a []byte, the param is a []byte with the uncompressed content.
// The reader/writer indexes will not be modified. The return is a []byte
// with the compressed content.
Expand All @@ -31,11 +30,10 @@ type Provider interface {
// The compressedData is compressed content, originalSize is the size of the original content.
// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
Decompress(compressedData []byte, originalSize int) ([]byte, error)
}

var (
NoopProvider = NewNoopProvider()
ZLibProvider = NewZLibProvider()
Lz4Provider = NewLz4Provider()
ZStdProvider = NewZStdProvider()
)
// Returns a new instance of the same provider, with the same exact configuration
Clone() Provider

// Close the compressor
io.Closer
}
40 changes: 33 additions & 7 deletions pulsar/internal/compression/compression_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"testing"
)

var compressed int
const (
dataSampleFile = "test_data_sample.txt"
)

func testCompression(b *testing.B, provider Provider) {
data, err := ioutil.ReadFile("test_data_sample.txt")
data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}
Expand All @@ -35,15 +37,14 @@ func testCompression(b *testing.B, provider Provider) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Use len() to avoid the compiler optimizing the call away
compressed = len(provider.Compress(data))
provider.Compress(data)
b.SetBytes(dataLen)
}
}

func testDecompression(b *testing.B, provider Provider) {
// Read data sample file
data, err := ioutil.ReadFile("test_data_sample.txt")
data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}
Expand All @@ -61,8 +62,8 @@ func testDecompression(b *testing.B, provider Provider) {
}

var benchmarkProviders = []testProvider{
{"zlib", ZLibProvider, nil},
{"lz4", Lz4Provider, nil},
{"zlib", NewZLibProvider(), nil},
{"lz4", NewLz4Provider(), nil},
{"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil},
{"zstd-pure-go-default", newPureGoZStdProvider(2), nil},
{"zstd-pure-go-best", newPureGoZStdProvider(3), nil},
Expand Down Expand Up @@ -90,3 +91,28 @@ func BenchmarkDecompression(b *testing.B) {
})
}
}

func BenchmarkCompressionParallel(b *testing.B) {
b.ReportAllocs()

data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}

dataLen := int64(len(data))
b.ResetTimer()

for _, provider := range benchmarkProviders {
p := provider
b.Run(p.name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
localProvider := p.provider.Clone()
for pb.Next() {
localProvider.Compress(data)
b.SetBytes(dataLen)
}
})
})
}
}
11 changes: 4 additions & 7 deletions pulsar/internal/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,16 @@ type testProvider struct {
}

var providers = []testProvider{
{"zlib", ZLibProvider, []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", Lz4Provider, []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zstd", ZStdProvider, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zstd", NewZStdProvider(),
[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
}

func TestCompression(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
if !p.provider.CanCompress() {
return
}

hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
Expand Down
27 changes: 17 additions & 10 deletions pulsar/internal/compression/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ import (
"github.com/pierrec/lz4"
)

type lz4Provider struct{}
type lz4Provider struct {
hashTable []int
}

// NewLz4Provider return a interface of Provider.
func NewLz4Provider() Provider {
return &lz4Provider{}
}
const tableSize = 1 << 16

func (lz4Provider) CanCompress() bool {
return true
return &lz4Provider{
hashTable: make([]int, tableSize),
}
}

func (lz4Provider) Compress(data []byte) []byte {
const tableSize = 1 << 16
hashTable := make([]int, tableSize)

func (l *lz4Provider) Compress(data []byte) []byte {
maxSize := lz4.CompressBlockBound(len(data))
compressed := make([]byte, maxSize)
size, err := lz4.CompressBlock(data, compressed, hashTable)
size, err := lz4.CompressBlock(data, compressed, l.hashTable)
if err != nil {
panic("Failed to compress")
}
Expand Down Expand Up @@ -75,3 +74,11 @@ func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte,
_, err := lz4.UncompressBlock(compressedData, uncompressed)
return uncompressed, err
}

func (lz4Provider) Close() error {
return nil
}

func (lz4Provider) Clone() Provider {
return NewLz4Provider()
}
Loading

0 comments on commit d4c5dcc

Please sign in to comment.