Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced lifecycle for compression providers #284

Merged
merged 2 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 49 additions & 31 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
)

var (
compressionProviders = map[pb.CompressionType]compression.Provider{
pb.CompressionType_NONE: compression.NoopProvider,
pb.CompressionType_LZ4: compression.Lz4Provider,
pb.CompressionType_ZLIB: compression.ZLibProvider,
pb.CompressionType_ZSTD: compression.ZStdProvider,
}
)

type consumerState int

const (
Expand Down Expand Up @@ -115,29 +106,32 @@ type partitionConsumer struct {
dlq *dlqRouter

log *log.Entry

compressionProviders map[pb.CompressionType]compression.Provider
}

func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter) (*partitionConsumer, error) {
pc := &partitionConsumer{
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *messageID)),
dlq: dlq,
log: log.WithField("topic", options.topic),
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: options.startMessageID,
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *messageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
Expand Down Expand Up @@ -685,6 +679,10 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
pc.log.Info("Closed consumer")
}

for _, provider := range pc.compressionProviders {
provider.Close()
}

pc.state = consumerClosed
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
Expand Down Expand Up @@ -846,11 +844,15 @@ func getPreviousMessage(mid *messageID) *messageID {
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
provider, ok := compressionProviders[msgMeta.GetCompression()]
provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
if !ok {
err := fmt.Errorf("unsupported compression type: %v", msgMeta.GetCompression())
pc.log.WithError(err).Error("Failed to decompress message.")
return nil, err
var err error
if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
pc.log.WithError(err).Error("Failed to decompress message.")
return nil, err
}

pc.compressionProviders[msgMeta.GetCompression()] = provider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to close these providers when we the consumer is closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's done at line 683 above

}

uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
Expand All @@ -861,6 +863,22 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
return internal.NewBufferWrapper(uncompressed), nil
}

func (pc *partitionConsumer) initializeCompressionProvider(
compressionType pb.CompressionType) (compression.Provider, error) {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NewNoopProvider(), nil
case pb.CompressionType_ZLIB:
return compression.NewZLibProvider(), nil
case pb.CompressionType_LZ4:
return compression.NewLz4Provider(), nil
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(), nil
}

return nil, fmt.Errorf("unsupported compression type: %v", compressionType)
}

func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
pc.log.WithFields(log.Fields{
Expand Down
18 changes: 12 additions & 6 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package pulsar
import (
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"

"github.com/stretchr/testify/assert"

"github.com/apache/pulsar-client-go/pulsar/internal"
Expand All @@ -28,8 +31,9 @@ import (
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
Expand All @@ -56,8 +60,9 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
Expand All @@ -84,8 +89,9 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
queueCh: make(chan []*message, 1),
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
Expand Down
17 changes: 8 additions & 9 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package internal

import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
Expand Down Expand Up @@ -92,10 +91,6 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
bb.msgMetadata.Compression = &compressionType
}

if !bb.compressionProvider.CanCompress() {
return nil, fmt.Errorf("compression provider %d can only decompress data", compressionType)
}

return bb, nil
}

Expand Down Expand Up @@ -177,16 +172,20 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
return buffer.ReadableSlice(), sequenceID, callbacks
}

func (bb *BatchBuilder) Close() error {
return bb.compressionProvider.Close()
}

func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NoopProvider
return compression.NewNoopProvider()
case pb.CompressionType_LZ4:
return compression.Lz4Provider
return compression.NewLz4Provider()
case pb.CompressionType_ZLIB:
return compression.ZLibProvider
return compression.NewZLibProvider()
case pb.CompressionType_ZSTD:
return compression.ZStdProvider
return compression.NewZStdProvider()
default:
log.Panic("unsupported compression type")
return nil
Expand Down
18 changes: 8 additions & 10 deletions pulsar/internal/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package compression

import "io"

// Provider is a interface of compression providers
type Provider interface {
// CanCompress checks if the compression method is available under the current version.
CanCompress() bool

// Compress a []byte, the param is a []byte with the uncompressed content.
// The reader/writer indexes will not be modified. The return is a []byte
// with the compressed content.
Expand All @@ -31,11 +30,10 @@ type Provider interface {
// The compressedData is compressed content, originalSize is the size of the original content.
// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
Decompress(compressedData []byte, originalSize int) ([]byte, error)
}

var (
NoopProvider = NewNoopProvider()
ZLibProvider = NewZLibProvider()
Lz4Provider = NewLz4Provider()
ZStdProvider = NewZStdProvider()
)
// Returns a new instance of the same provider, with the same exact configuration
Clone() Provider

// Close the compressor
io.Closer
}
40 changes: 33 additions & 7 deletions pulsar/internal/compression/compression_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"testing"
)

var compressed int
const (
dataSampleFile = "test_data_sample.txt"
)

func testCompression(b *testing.B, provider Provider) {
data, err := ioutil.ReadFile("test_data_sample.txt")
data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}
Expand All @@ -35,15 +37,14 @@ func testCompression(b *testing.B, provider Provider) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Use len() to avoid the compiler optimizing the call away
compressed = len(provider.Compress(data))
provider.Compress(data)
b.SetBytes(dataLen)
}
}

func testDecompression(b *testing.B, provider Provider) {
// Read data sample file
data, err := ioutil.ReadFile("test_data_sample.txt")
data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}
Expand All @@ -61,8 +62,8 @@ func testDecompression(b *testing.B, provider Provider) {
}

var benchmarkProviders = []testProvider{
{"zlib", ZLibProvider, nil},
{"lz4", Lz4Provider, nil},
{"zlib", NewZLibProvider(), nil},
{"lz4", NewLz4Provider(), nil},
{"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil},
{"zstd-pure-go-default", newPureGoZStdProvider(2), nil},
{"zstd-pure-go-best", newPureGoZStdProvider(3), nil},
Expand Down Expand Up @@ -90,3 +91,28 @@ func BenchmarkDecompression(b *testing.B) {
})
}
}

func BenchmarkCompressionParallel(b *testing.B) {
b.ReportAllocs()

data, err := ioutil.ReadFile(dataSampleFile)
if err != nil {
b.Error(err)
}

dataLen := int64(len(data))
b.ResetTimer()

for _, provider := range benchmarkProviders {
p := provider
b.Run(p.name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
localProvider := p.provider.Clone()
for pb.Next() {
localProvider.Compress(data)
b.SetBytes(dataLen)
}
})
})
}
}
11 changes: 4 additions & 7 deletions pulsar/internal/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,16 @@ type testProvider struct {
}

var providers = []testProvider{
{"zlib", ZLibProvider, []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", Lz4Provider, []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zstd", ZStdProvider, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
{"zstd", NewZStdProvider(),
[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
}

func TestCompression(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
if !p.provider.CanCompress() {
return
}

hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
Expand Down
27 changes: 17 additions & 10 deletions pulsar/internal/compression/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ import (
"github.com/pierrec/lz4"
)

type lz4Provider struct{}
type lz4Provider struct {
hashTable []int
}

// NewLz4Provider return a interface of Provider.
func NewLz4Provider() Provider {
return &lz4Provider{}
}
const tableSize = 1 << 16

func (lz4Provider) CanCompress() bool {
return true
return &lz4Provider{
hashTable: make([]int, tableSize),
}
}

func (lz4Provider) Compress(data []byte) []byte {
const tableSize = 1 << 16
hashTable := make([]int, tableSize)

func (l *lz4Provider) Compress(data []byte) []byte {
maxSize := lz4.CompressBlockBound(len(data))
compressed := make([]byte, maxSize)
size, err := lz4.CompressBlock(data, compressed, hashTable)
size, err := lz4.CompressBlock(data, compressed, l.hashTable)
if err != nil {
panic("Failed to compress")
}
Expand Down Expand Up @@ -75,3 +74,11 @@ func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte,
_, err := lz4.UncompressBlock(compressedData, uncompressed)
return uncompressed, err
}

func (lz4Provider) Close() error {
return nil
}

func (lz4Provider) Clone() Provider {
return NewLz4Provider()
}
Loading