diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 8917c46b0f..308c8e0122 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io/ioutil" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -34,7 +35,8 @@ func TestClient(t *testing.T) { func TestTLSConnectionCAError(t *testing.T) { client, err := NewClient(ClientOptions{ - URL: serviceURLTLS, + URL: serviceURLTLS, + OperationTimeout: 5 * time.Second, }) assert.NoError(t, err) @@ -105,6 +107,7 @@ func TestTLSConnectionHostNameVerification(t *testing.T) { func TestTLSConnectionHostNameVerificationError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar+ssl://127.0.0.1:6651", + OperationTimeout: 5 * time.Second, TLSTrustCertsFilePath: caCertsPath, TLSValidateHostname: true, }) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b7bf261ebb..dc4607bc4d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -859,7 +859,7 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int pc.compressionProviders[msgMeta.GetCompression()] = provider } - uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize())) + uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize())) if err != nil { return nil, err } diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 61c5ced681..db6074e6ea 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -35,6 +35,10 @@ const ( DefaultMaxMessagesPerBatch = 1000 ) +type ConnectionHolder interface { + GetConnection() Connection +} + // BatchBuilder wraps the objects needed to build a batch. type BatchBuilder struct { buffer Buffer @@ -58,11 +62,13 @@ type BatchBuilder struct { callbacks []interface{} compressionProvider compression.Provider + cnxHolder ConnectionHolder } // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, - compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) { + compressionType pb.CompressionType, level compression.Level, + cnxHolder ConnectionHolder) (*BatchBuilder, error) { if maxMessages == 0 { maxMessages = DefaultMaxMessagesPerBatch } @@ -85,6 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p }, callbacks: []interface{}{}, compressionProvider: getCompressionProvider(compressionType, level), + cnxHolder: cnxHolder, } if compressionType != pb.CompressionType_NONE { @@ -149,7 +156,7 @@ func (bb *BatchBuilder) reset() { } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) { +func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { if bb.numMessages == 0 { // No-Op for empty batch return nil, 0, nil @@ -160,16 +167,22 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages)) uncompressedSize := bb.buffer.ReadableBytes() - compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice()) bb.msgMetadata.UncompressedSize = &uncompressedSize - buffer := NewBuffer(4096) - serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed) + cnx := bb.cnxHolder.GetConnection() + var buffer Buffer + if cnx == nil { + buffer = NewBuffer(int(uncompressedSize)) + } else { + buffer = cnx.GetBufferFromPool() + } + + serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider) callbacks = bb.callbacks sequenceID = bb.cmdSend.Send.GetSequenceId() bb.reset() - return buffer.ReadableSlice(), sequenceID, callbacks + return buffer, sequenceID, callbacks } func (bb *BatchBuilder) Close() error { diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go index e4abac922d..c6d007ddfa 100644 --- a/pulsar/internal/buffer.go +++ b/pulsar/internal/buffer.go @@ -63,6 +63,7 @@ type Buffer interface { PutUint32(n uint32, writerIdx uint32) Resize(newSize uint32) + ResizeIfNeeded(spaceNeeded uint32) // Clear will clear the current buffer data. Clear() @@ -154,9 +155,9 @@ func (b *buffer) Resize(newSize uint32) { b.writerIdx = size } -func (b *buffer) resizeIfNeeded(spaceNeeded int) { - if b.WritableBytes() < uint32(spaceNeeded) { - capacityNeeded := uint32(cap(b.data) + spaceNeeded) +func (b *buffer) ResizeIfNeeded(spaceNeeded uint32) { + if b.WritableBytes() < spaceNeeded { + capacityNeeded := uint32(cap(b.data)) + spaceNeeded minCapacityIncrease := uint32(cap(b.data) * 3 / 2) if capacityNeeded < minCapacityIncrease { capacityNeeded = minCapacityIncrease @@ -174,7 +175,7 @@ func (b *buffer) ReadUint16() uint16 { } func (b *buffer) WriteUint32(n uint32) { - b.resizeIfNeeded(4) + b.ResizeIfNeeded(4) binary.BigEndian.PutUint32(b.WritableSlice(), n) b.writerIdx += 4 } @@ -184,13 +185,13 @@ func (b *buffer) PutUint32(n uint32, idx uint32) { } func (b *buffer) WriteUint16(n uint16) { - b.resizeIfNeeded(2) + b.ResizeIfNeeded(2) binary.BigEndian.PutUint16(b.WritableSlice(), n) b.writerIdx += 2 } func (b *buffer) Write(s []byte) { - b.resizeIfNeeded(len(s)) + b.ResizeIfNeeded(uint32(len(s))) copy(b.WritableSlice(), s) b.writerIdx += uint32(len(s)) } diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 1ee3517eb5..0fb112557c 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" @@ -211,19 +213,17 @@ func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) { wb.Write(payload) } -func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, payload []byte) { +func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, + uncompressedPayload Buffer, + compressionProvider compression.Provider) { // Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] cmdSize := proto.Size(cmdSend) msgMetadataSize := proto.Size(msgMetadata) - payloadSize := len(payload) - - magicAndChecksumLength := 2 + 4 /* magic + checksumLength */ - headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize - // cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize - totalSize := headerContentSize + payloadSize - wb.WriteUint32(uint32(totalSize)) // External frame + frameSizeIdx := wb.WriterIndex() + wb.WriteUint32(0) // Skip frame size until we now the size + frameStartIdx := wb.WriterIndex() // Write cmd wb.WriteUint32(uint32(cmdSize)) @@ -248,13 +248,20 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, } wb.Write(serialized) - wb.Write(payload) + + // Make sure the buffer has enough space to hold the compressed data + // and perform the compression in-place + maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes()))) + wb.ResizeIfNeeded(maxSize) + b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice()) + wb.WrittenBytes(uint32(len(b))) // Write checksum at created checksum-placeholder - endIdx := wb.WriterIndex() - checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx)) + frameEndIdx := wb.WriterIndex() + checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx)) - // set computed checksum + // Set Sizes and checksum in the fixed-size header + wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame wb.PutUint32(checksum, checksumIdx) } diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go index 90adeeb8d3..a4504bf1f2 100644 --- a/pulsar/internal/compression/compression.go +++ b/pulsar/internal/compression/compression.go @@ -29,15 +29,18 @@ const ( // Provider is a interface of compression providers type Provider interface { + // Return the max possible size for a compressed buffer given the uncompressed data size + CompressMaxSize(originalSize int) int + // Compress a []byte, the param is a []byte with the uncompressed content. // The reader/writer indexes will not be modified. The return is a []byte // with the compressed content. - Compress(data []byte) []byte + Compress(dst, src []byte) []byte // Decompress a []byte. The buffer needs to have been compressed with the matching Encoder. - // The compressedData is compressed content, originalSize is the size of the original content. + // The src is compressed content. If dst is passed, the decompressed data will be written there // The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise. - Decompress(compressedData []byte, originalSize int) ([]byte, error) + Decompress(dst, src []byte, originalSize int) ([]byte, error) // Returns a new instance of the same provider, with the same exact configuration Clone() Provider diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go index 1f4032178f..de0ca48d5d 100644 --- a/pulsar/internal/compression/compression_bench_test.go +++ b/pulsar/internal/compression/compression_bench_test.go @@ -33,11 +33,12 @@ func testCompression(b *testing.B, provider Provider) { } dataLen := int64(len(data)) + compressed := make([]byte, 1024*1024) b.ResetTimer() for i := 0; i < b.N; i++ { - provider.Compress(data) + provider.Compress(compressed[:0], data) b.SetBytes(dataLen) } } @@ -49,14 +50,15 @@ func testDecompression(b *testing.B, provider Provider) { b.Error(err) } - dataCompressed := provider.Compress(data) + dataCompressed := provider.Compress(nil, data) + dataDecompressed := make([]byte, 1024*1024) dataLen := int64(len(data)) b.ResetTimer() for i := 0; i < b.N; i++ { - provider.Decompress(dataCompressed, int(dataLen)) + provider.Decompress(dataDecompressed[:0], dataCompressed, int(dataLen)) b.SetBytes(dataLen) } } @@ -108,8 +110,10 @@ func BenchmarkCompressionParallel(b *testing.B) { b.Run(p.name, func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { localProvider := p.provider.Clone() + compressed := make([]byte, 1024*1024) + for pb.Next() { - localProvider.Compress(data) + localProvider.Compress(compressed[:0], data) b.SetBytes(dataLen) } }) diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go index cfb00b230a..7155e31fa0 100644 --- a/pulsar/internal/compression/compression_test.go +++ b/pulsar/internal/compression/compression_test.go @@ -43,8 +43,24 @@ func TestCompression(t *testing.T) { p := provider t.Run(p.name, func(t *testing.T) { hello := []byte("test compression data") - compressed := p.provider.Compress(hello) - uncompressed, err := p.provider.Decompress(compressed, len(hello)) + compressed := make([]byte, 1024) + compressed = p.provider.Compress(compressed, hello) + + uncompressed := make([]byte, 1024) + uncompressed, err := p.provider.Decompress(uncompressed, compressed, len(hello)) + assert.Nil(t, err) + assert.ElementsMatch(t, hello, uncompressed) + }) + } +} + +func TestCompressionNoBuffers(t *testing.T) { + for _, provider := range providers { + p := provider + t.Run(p.name, func(t *testing.T) { + hello := []byte("test compression data") + compressed := p.provider.Compress(nil, hello) + uncompressed, err := p.provider.Decompress(nil, compressed, len(hello)) assert.Nil(t, err) assert.ElementsMatch(t, hello, uncompressed) }) @@ -56,7 +72,7 @@ func TestJavaCompatibility(t *testing.T) { p := provider t.Run(p.name, func(t *testing.T) { hello := []byte("hello") - uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello)) + uncompressed, err := p.provider.Decompress(nil, p.compressedHello, len(hello)) assert.Nil(t, err) assert.ElementsMatch(t, hello, uncompressed) }) @@ -67,7 +83,7 @@ func TestDecompressionError(t *testing.T) { for _, provider := range providers { p := provider t.Run(p.name, func(t *testing.T) { - _, err := p.provider.Decompress([]byte{0x05}, 10) + _, err := p.provider.Decompress(nil, []byte{0x05}, 10) assert.NotNil(t, err) }) } diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go index 5a1f0f4bfd..745ea831fc 100644 --- a/pulsar/internal/compression/lz4.go +++ b/pulsar/internal/compression/lz4.go @@ -21,6 +21,10 @@ import ( "github.com/pierrec/lz4" ) +const ( + minLz4DestinationBufferSize = 1024 * 1024 +) + type lz4Provider struct { hashTable []int } @@ -34,10 +38,23 @@ func NewLz4Provider() Provider { } } -func (l *lz4Provider) Compress(data []byte) []byte { +func (l *lz4Provider) CompressMaxSize(originalSize int) int { + s := lz4.CompressBlockBound(originalSize) + if s < minLz4DestinationBufferSize { + return minLz4DestinationBufferSize + } + + return s +} + +func (l *lz4Provider) Compress(dst, data []byte) []byte { maxSize := lz4.CompressBlockBound(len(data)) - compressed := make([]byte, maxSize) - size, err := lz4.CompressBlock(data, compressed, l.hashTable) + if cap(dst) >= maxSize { + dst = dst[0:maxSize] // Reuse dst buffer + } else { + dst = make([]byte, maxSize) + } + size, err := lz4.CompressBlock(data, dst, l.hashTable) if err != nil { panic("Failed to compress") } @@ -45,11 +62,11 @@ func (l *lz4Provider) Compress(data []byte) []byte { if size == 0 { // The data block was not compressed. Just repeat it with // the block header flag to signal it's not compressed - headerSize := writeSize(len(data), compressed) - copy(compressed[headerSize:], data) - return compressed[:len(data)+headerSize] + headerSize := writeSize(len(data), dst) + copy(dst[headerSize:], data) + return dst[:len(data)+headerSize] } - return compressed[:size] + return dst[:size] } // Write the encoded size for the uncompressed payload @@ -69,10 +86,14 @@ func writeSize(size int, dst []byte) int { return i + 1 } -func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { - uncompressed := make([]byte, originalSize) - _, err := lz4.UncompressBlock(compressedData, uncompressed) - return uncompressed, err +func (lz4Provider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { + if cap(dst) >= originalSize { + dst = dst[0:originalSize] // Reuse dst buffer + } else { + dst = make([]byte, originalSize) + } + _, err := lz4.UncompressBlock(src, dst) + return dst, err } func (lz4Provider) Close() error { diff --git a/pulsar/internal/compression/noop.go b/pulsar/internal/compression/noop.go index 48318c5b7f..78acb52d3c 100644 --- a/pulsar/internal/compression/noop.go +++ b/pulsar/internal/compression/noop.go @@ -17,6 +17,10 @@ package compression +import ( + "bytes" +) + type noopProvider struct{} // NewNoopProvider returns a Provider interface that does not compress the data @@ -24,12 +28,28 @@ func NewNoopProvider() Provider { return &noopProvider{} } -func (noopProvider) Compress(data []byte) []byte { - return data +func (noopProvider) CompressMaxSize(originalSize int) int { + return originalSize } -func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { - return compressedData, nil +func (noopProvider) Compress(dst, src []byte) []byte { + if dst == nil { + dst = make([]byte, len(src)) + } + + b := bytes.NewBuffer(dst[:0]) + b.Write(src) + return dst[:len(src)] +} + +func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { + if dst == nil { + dst = make([]byte, len(src)) + } + + b := bytes.NewBuffer(dst[:0]) + b.Write(src) + return dst[:len(src)], nil } func (noopProvider) Close() error { diff --git a/pulsar/internal/compression/zlib.go b/pulsar/internal/compression/zlib.go index fe252061e5..44c45665d3 100644 --- a/pulsar/internal/compression/zlib.go +++ b/pulsar/internal/compression/zlib.go @@ -30,11 +30,15 @@ func NewZLibProvider() Provider { return &zlibProvider{} } -func (zlibProvider) Compress(data []byte) []byte { - var b bytes.Buffer - w := zlib.NewWriter(&b) +func (zlibProvider) CompressMaxSize(originalSize int) int { + return int(float32(originalSize) * 1.10) +} + +func (zlibProvider) Compress(dst, src []byte) []byte { + var b = bytes.NewBuffer(dst[:0]) + w := zlib.NewWriter(b) - if _, err := w.Write(data); err != nil { + if _, err := w.Write(src); err != nil { return nil } if err := w.Close(); err != nil { @@ -44,14 +48,18 @@ func (zlibProvider) Compress(data []byte) []byte { return b.Bytes() } -func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { - r, err := zlib.NewReader(bytes.NewReader(compressedData)) +func (zlibProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { + r, err := zlib.NewReader(bytes.NewReader(src)) if err != nil { return nil, err } - uncompressed := make([]byte, originalSize) - if _, err = io.ReadFull(r, uncompressed); err != nil { + if cap(dst) >= originalSize { + dst = dst[0:originalSize] // Reuse dst buffer + } else { + dst = make([]byte, originalSize) + } + if _, err = io.ReadFull(r, dst); err != nil { return nil, err } @@ -59,7 +67,7 @@ func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, return nil, err } - return uncompressed, nil + return dst, nil } func (zlibProvider) Clone() Provider { diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 9c43cc4ad7..9611aee666 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -55,8 +55,12 @@ func NewZStdProvider(level Level) Provider { return newCGoZStdProvider(level) } -func (z *zstdCGoProvider) Compress(data []byte) []byte { - out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel) +func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { + return zstd.CompressBound(originalSize) +} + +func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { + out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") } @@ -64,8 +68,8 @@ func (z *zstdCGoProvider) Compress(data []byte) []byte { return out } -func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { - return z.ctx.Decompress(nil, compressedData) +func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { + return z.ctx.Decompress(dst, src) } func (z *zstdCGoProvider) Close() error { diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go index 06f203e4d3..ae850783dd 100644 --- a/pulsar/internal/compression/zstd_go.go +++ b/pulsar/internal/compression/zstd_go.go @@ -19,7 +19,6 @@ package compression import ( "github.com/klauspost/compress/zstd" - "github.com/pkg/errors" ) type zstdProvider struct { @@ -44,16 +43,23 @@ func newPureGoZStdProvider(level Level) Provider { return p } -func (p *zstdProvider) Compress(data []byte) []byte { - return p.encoder.EncodeAll(data, []byte{}) +func (p *zstdProvider) CompressMaxSize(srcSize int) int { + // from zstd.h + // this formula ensures that bound(A) + bound(B) <= bound(A+B) as long as A and B >= 128 KB + lowLimit := 128 << 10 // 128 kB + var margin int + if srcSize < lowLimit { + margin = (lowLimit - srcSize) >> 11 + } + return srcSize + (srcSize >> 8) + margin } -func (p *zstdProvider) Decompress(compressedData []byte, originalSize int) (dst []byte, err error) { - dst, err = p.decoder.DecodeAll(compressedData, nil) - if err == nil && len(dst) != originalSize { - return nil, errors.New("Invalid uncompressed size") - } - return +func (p *zstdProvider) Compress(dst, src []byte) []byte { + return p.encoder.EncodeAll(src, dst) +} + +func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { + return p.decoder.DecodeAll(src, dst) } func (p *zstdProvider) Close() error { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 573e607051..9e6d79f436 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -65,13 +65,14 @@ type ConnectionListener interface { type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) - WriteData(data []byte) + WriteData(data Buffer) RegisterListener(id uint64, listener ConnectionListener) UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) ID() string GetMaxMessageSize() int32 + GetBufferFromPool() Buffer Close() } @@ -148,7 +149,7 @@ type connection struct { incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd closeCh chan interface{} - writeRequestsCh chan []byte + writeRequestsCh chan Buffer pendingReqs map[uint64]*request listeners map[uint64]ConnectionListener @@ -160,6 +161,8 @@ type connection struct { auth auth.Provider maxMessageSize int32 + + buffersPool sync.Pool } func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, @@ -187,9 +190,14 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO // partition produces writing on a single connection. In general it's // good to keep this above the number of partition producers assigned // to a single connection. - writeRequestsCh: make(chan []byte, 256), + writeRequestsCh: make(chan Buffer, 256), listeners: make(map[uint64]ConnectionListener), consumerHandlers: make(map[uint64]ConsumerHandler), + buffersPool: sync.Pool{ + New: func() interface{} { + return NewBuffer(1024) + }, + }, } cnx.reader = newConnectionReader(cnx) cnx.cond = sync.NewCond(cnx) @@ -344,6 +352,8 @@ func (c *connection) run() { return } c.internalWriteData(data) + // Return buffer to the pool since we're now done using it + c.buffersPool.Put(data) case <-c.pingTicker.C: c.sendPing() @@ -368,13 +378,13 @@ func (c *connection) runPingCheck() { } } -func (c *connection) WriteData(data []byte) { +func (c *connection) WriteData(data Buffer) { c.writeRequestsCh <- data } -func (c *connection) internalWriteData(data []byte) { - c.log.Debug("Write data: ", len(data)) - if _, err := c.cnx.Write(data); err != nil { +func (c *connection) internalWriteData(data Buffer) { + c.log.Debug("Write data: ", data.ReadableBytes()) + if _, err := c.cnx.Write(data.ReadableSlice()); err != nil { c.log.WithError(err).Warn("Failed to write on connection") c.Close() } @@ -398,8 +408,7 @@ func (c *connection) writeCommand(cmd proto.Message) { } c.writeBuffer.Write(serialized) - data := c.writeBuffer.ReadableSlice() - c.internalWriteData(data) + c.internalWriteData(c.writeBuffer) } func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) { @@ -469,7 +478,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl } } -func (c *connection) Write(data []byte) { +func (c *connection) Write(data Buffer) { c.writeRequestsCh <- data } @@ -768,3 +777,9 @@ func (c *connection) ID() string { func (c *connection) GetMaxMessageSize() int32 { return c.maxMessageSize } + +func (c *connection) GetBufferFromPool() Buffer { + b := c.buffersPool.Get().(Buffer) + b.Clear() + return b +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 32b0da0009..b8bdd0ddbd 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -155,7 +155,8 @@ func (p *partitionProducer) grabCnx() error { if p.batchBuilder == nil { p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), - compression.Level(p.options.CompressionLevel)) + compression.Level(p.options.CompressionLevel), + p) if err != nil { return err } @@ -180,6 +181,10 @@ func (p *partitionProducer) grabCnx() error { type connectionClosed struct{} +func (p *partitionProducer) GetConnection() internal.Connection { + return p.cnx +} + func (p *partitionProducer) ConnectionClosed() { // Trigger reconnection in the produce goroutine p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed") @@ -310,7 +315,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { type pendingItem struct { sync.Mutex - batchData []byte + batchData internal.Buffer sequenceID uint64 sendRequests []interface{} completed bool