Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used pooled buffering for compression and batch serialization #292

Merged
merged 1 commit into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -34,7 +35,8 @@ func TestClient(t *testing.T) {

func TestTLSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURLTLS,
URL: serviceURLTLS,
OperationTimeout: 5 * time.Second,
})
assert.NoError(t, err)

Expand Down Expand Up @@ -105,6 +107,7 @@ func TestTLSConnectionHostNameVerification(t *testing.T) {
func TestTLSConnectionHostNameVerificationError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar+ssl://127.0.0.1:6651",
OperationTimeout: 5 * time.Second,
TLSTrustCertsFilePath: caCertsPath,
TLSValidateHostname: true,
})
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
pc.compressionProviders[msgMeta.GetCompression()] = provider
}

uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a method definition that does not require nil as a parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s an established convention in Go APIs. For example all the compression implementations are using that.

if err != nil {
return nil, err
}
Expand Down
25 changes: 19 additions & 6 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
DefaultMaxMessagesPerBatch = 1000
)

type ConnectionHolder interface {
GetConnection() Connection
}

// BatchBuilder wraps the objects needed to build a batch.
type BatchBuilder struct {
buffer Buffer
Expand All @@ -58,11 +62,13 @@ type BatchBuilder struct {
callbacks []interface{}

compressionProvider compression.Provider
cnxHolder ConnectionHolder
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
compressionType pb.CompressionType, level compression.Level,
cnxHolder ConnectionHolder) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -85,6 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
cnxHolder: cnxHolder,
}

if compressionType != pb.CompressionType_NONE {
Expand Down Expand Up @@ -149,7 +156,7 @@ func (bb *BatchBuilder) reset() {
}

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) {
if bb.numMessages == 0 {
// No-Op for empty batch
return nil, 0, nil
Expand All @@ -160,16 +167,22 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))

uncompressedSize := bb.buffer.ReadableBytes()
compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
bb.msgMetadata.UncompressedSize = &uncompressedSize

buffer := NewBuffer(4096)
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
cnx := bb.cnxHolder.GetConnection()
var buffer Buffer
if cnx == nil {
buffer = NewBuffer(int(uncompressedSize))
} else {
buffer = cnx.GetBufferFromPool()
}

serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)

callbacks = bb.callbacks
sequenceID = bb.cmdSend.Send.GetSequenceId()
bb.reset()
return buffer.ReadableSlice(), sequenceID, callbacks
return buffer, sequenceID, callbacks
}

func (bb *BatchBuilder) Close() error {
Expand Down
13 changes: 7 additions & 6 deletions pulsar/internal/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Buffer interface {
PutUint32(n uint32, writerIdx uint32)

Resize(newSize uint32)
ResizeIfNeeded(spaceNeeded uint32)

// Clear will clear the current buffer data.
Clear()
Expand Down Expand Up @@ -154,9 +155,9 @@ func (b *buffer) Resize(newSize uint32) {
b.writerIdx = size
}

func (b *buffer) resizeIfNeeded(spaceNeeded int) {
if b.WritableBytes() < uint32(spaceNeeded) {
capacityNeeded := uint32(cap(b.data) + spaceNeeded)
func (b *buffer) ResizeIfNeeded(spaceNeeded uint32) {
if b.WritableBytes() < spaceNeeded {
capacityNeeded := uint32(cap(b.data)) + spaceNeeded
minCapacityIncrease := uint32(cap(b.data) * 3 / 2)
if capacityNeeded < minCapacityIncrease {
capacityNeeded = minCapacityIncrease
Expand All @@ -174,7 +175,7 @@ func (b *buffer) ReadUint16() uint16 {
}

func (b *buffer) WriteUint32(n uint32) {
b.resizeIfNeeded(4)
b.ResizeIfNeeded(4)
binary.BigEndian.PutUint32(b.WritableSlice(), n)
b.writerIdx += 4
}
Expand All @@ -184,13 +185,13 @@ func (b *buffer) PutUint32(n uint32, idx uint32) {
}

func (b *buffer) WriteUint16(n uint16) {
b.resizeIfNeeded(2)
b.ResizeIfNeeded(2)
binary.BigEndian.PutUint16(b.WritableSlice(), n)
b.writerIdx += 2
}

func (b *buffer) Write(s []byte) {
b.resizeIfNeeded(len(s))
b.ResizeIfNeeded(uint32(len(s)))
copy(b.WritableSlice(), s)
b.writerIdx += uint32(len(s))
}
Expand Down
31 changes: 19 additions & 12 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"

"github.com/golang/protobuf/proto"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -211,19 +213,17 @@ func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
wb.Write(payload)
}

func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, payload []byte) {
func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
payloadSize := len(payload)

magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
// cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
totalSize := headerContentSize + payloadSize

wb.WriteUint32(uint32(totalSize)) // External frame
frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()

// Write cmd
wb.WriteUint32(uint32(cmdSize))
Expand All @@ -248,13 +248,20 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
}

wb.Write(serialized)
wb.Write(payload)

// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
wb.ResizeIfNeeded(maxSize)
b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
wb.WrittenBytes(uint32(len(b)))

// Write checksum at created checksum-placeholder
endIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
frameEndIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))

// set computed checksum
// Set Sizes and checksum in the fixed-size header
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
}

Expand Down
9 changes: 6 additions & 3 deletions pulsar/internal/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ const (

// Provider is a interface of compression providers
type Provider interface {
// Return the max possible size for a compressed buffer given the uncompressed data size
CompressMaxSize(originalSize int) int

// Compress a []byte, the param is a []byte with the uncompressed content.
// The reader/writer indexes will not be modified. The return is a []byte
// with the compressed content.
Compress(data []byte) []byte
Compress(dst, src []byte) []byte

// Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
// The compressedData is compressed content, originalSize is the size of the original content.
// The src is compressed content. If dst is passed, the decompressed data will be written there
// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
Decompress(compressedData []byte, originalSize int) ([]byte, error)
Decompress(dst, src []byte, originalSize int) ([]byte, error)

// Returns a new instance of the same provider, with the same exact configuration
Clone() Provider
Expand Down
12 changes: 8 additions & 4 deletions pulsar/internal/compression/compression_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ func testCompression(b *testing.B, provider Provider) {
}

dataLen := int64(len(data))
compressed := make([]byte, 1024*1024)

b.ResetTimer()

for i := 0; i < b.N; i++ {
provider.Compress(data)
provider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
}
Expand All @@ -49,14 +50,15 @@ func testDecompression(b *testing.B, provider Provider) {
b.Error(err)
}

dataCompressed := provider.Compress(data)
dataCompressed := provider.Compress(nil, data)
dataDecompressed := make([]byte, 1024*1024)

dataLen := int64(len(data))

b.ResetTimer()

for i := 0; i < b.N; i++ {
provider.Decompress(dataCompressed, int(dataLen))
provider.Decompress(dataDecompressed[:0], dataCompressed, int(dataLen))
b.SetBytes(dataLen)
}
}
Expand Down Expand Up @@ -108,8 +110,10 @@ func BenchmarkCompressionParallel(b *testing.B) {
b.Run(p.name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
localProvider := p.provider.Clone()
compressed := make([]byte, 1024*1024)

for pb.Next() {
localProvider.Compress(data)
localProvider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
})
Expand Down
24 changes: 20 additions & 4 deletions pulsar/internal/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,24 @@ func TestCompression(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
compressed := make([]byte, 1024)
compressed = p.provider.Compress(compressed, hello)

uncompressed := make([]byte, 1024)
uncompressed, err := p.provider.Decompress(uncompressed, compressed, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
}
}

func TestCompressionNoBuffers(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("test compression data")
compressed := p.provider.Compress(nil, hello)
uncompressed, err := p.provider.Decompress(nil, compressed, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
Expand All @@ -56,7 +72,7 @@ func TestJavaCompatibility(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("hello")
uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
uncompressed, err := p.provider.Decompress(nil, p.compressedHello, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
Expand All @@ -67,7 +83,7 @@ func TestDecompressionError(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
_, err := p.provider.Decompress([]byte{0x05}, 10)
_, err := p.provider.Decompress(nil, []byte{0x05}, 10)
assert.NotNil(t, err)
})
}
Expand Down
43 changes: 32 additions & 11 deletions pulsar/internal/compression/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/pierrec/lz4"
)

const (
minLz4DestinationBufferSize = 1024 * 1024
)

type lz4Provider struct {
hashTable []int
}
Expand All @@ -34,22 +38,35 @@ func NewLz4Provider() Provider {
}
}

func (l *lz4Provider) Compress(data []byte) []byte {
func (l *lz4Provider) CompressMaxSize(originalSize int) int {
s := lz4.CompressBlockBound(originalSize)
if s < minLz4DestinationBufferSize {
return minLz4DestinationBufferSize
}

return s
}

func (l *lz4Provider) Compress(dst, data []byte) []byte {
maxSize := lz4.CompressBlockBound(len(data))
compressed := make([]byte, maxSize)
size, err := lz4.CompressBlock(data, compressed, l.hashTable)
if cap(dst) >= maxSize {
dst = dst[0:maxSize] // Reuse dst buffer
} else {
dst = make([]byte, maxSize)
}
size, err := lz4.CompressBlock(data, dst, l.hashTable)
if err != nil {
panic("Failed to compress")
}

if size == 0 {
// The data block was not compressed. Just repeat it with
// the block header flag to signal it's not compressed
headerSize := writeSize(len(data), compressed)
copy(compressed[headerSize:], data)
return compressed[:len(data)+headerSize]
headerSize := writeSize(len(data), dst)
copy(dst[headerSize:], data)
return dst[:len(data)+headerSize]
}
return compressed[:size]
return dst[:size]
}

// Write the encoded size for the uncompressed payload
Expand All @@ -69,10 +86,14 @@ func writeSize(size int, dst []byte) int {
return i + 1
}

func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
uncompressed := make([]byte, originalSize)
_, err := lz4.UncompressBlock(compressedData, uncompressed)
return uncompressed, err
func (lz4Provider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
if cap(dst) >= originalSize {
dst = dst[0:originalSize] // Reuse dst buffer
} else {
dst = make([]byte, originalSize)
}
_, err := lz4.UncompressBlock(src, dst)
return dst, err
}

func (lz4Provider) Close() error {
Expand Down
Loading