diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 6df3a61850..1074be82ce 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -57,13 +57,11 @@ type BatchBuilder interface { ) bool // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. - Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) + Flush() *FlushBatch - // Flush all the messages buffered in multiple batches and wait until all + // FlushBatches all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. - FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, - ) + FlushBatches() []*FlushBatch // Return the batch container batch message in multiple batches. IsMultiBatches() bool @@ -72,6 +70,13 @@ type BatchBuilder interface { Close() error } +type FlushBatch struct { + BatchData Buffer + SequenceID uint64 + Callbacks []interface{} + Error error +} + // batchContainer wraps the objects needed to a batch. // batchContainer implement BatchBuilder as a single batch container. type batchContainer struct { @@ -250,12 +255,10 @@ func (bc *batchContainer) reset() { } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bc *batchContainer) Flush() ( - batchData Buffer, sequenceID uint64, callbacks []interface{}, err error, -) { +func (bc *batchContainer) Flush() *FlushBatch { if bc.numMessages == 0 { // No-Op for empty batch - return nil, 0, nil, nil + return nil } bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages) @@ -271,6 +274,8 @@ func (bc *batchContainer) Flush() ( buffer = NewBuffer(int(uncompressedSize * 3 / 2)) } + sequenceID := uint64(0) + var err error if err = serializeMessage( buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor, bc.maxMessageSize, true, @@ -278,15 +283,18 @@ func (bc *batchContainer) Flush() ( sequenceID = bc.cmdSend.Send.GetSequenceId() } - callbacks = bc.callbacks + callbacks := bc.callbacks bc.reset() - return buffer, sequenceID, callbacks, err + return &FlushBatch{ + BatchData: buffer, + SequenceID: sequenceID, + Callbacks: callbacks, + Error: err, + } } // FlushBatches only for multiple batches container -func (bc *batchContainer) FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, -) { +func (bc *batchContainer) FlushBatches() []*FlushBatch { panic("single batch container not support FlushBatches(), please use Flush() instead") } diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 88a4d5ed01..5dfcb0d8f6 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -191,51 +191,35 @@ func (bc *keyBasedBatchContainer) reset() { bc.batches.containers = map[string]*batchContainer{} } -// Flush all the messages buffered in multiple batches and wait until all +// FlushBatches all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. -func (bc *keyBasedBatchContainer) FlushBatches() ( - batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, errors []error, -) { +func (bc *keyBasedBatchContainer) FlushBatches() []*FlushBatch { if bc.numMessages == 0 { // No-Op for empty batch - return nil, nil, nil, nil + return nil } bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages) - var batchesLen = len(bc.batches.containers) - var idx = 0 - sortedKeys := make([]string, 0, batchesLen) - - batchesData = make([]Buffer, batchesLen) - sequenceIDs = make([]uint64, batchesLen) - callbacks = make([][]interface{}, batchesLen) - errors = make([]error, batchesLen) - bc.batches.l.RLock() defer bc.batches.l.RUnlock() - for k := range bc.batches.containers { - sortedKeys = append(sortedKeys, k) - } - sort.Strings(sortedKeys) - for _, k := range sortedKeys { - container := bc.batches.containers[k] - b, s, c, err := container.Flush() - if b != nil { - batchesData[idx] = b - sequenceIDs[idx] = s - callbacks[idx] = c - errors[idx] = err + flushBatches := make([]*FlushBatch, 0, len(bc.batches.containers)) + for _, container := range bc.batches.containers { + batch := container.Flush() + if batch == nil { + continue } - idx++ + flushBatches = append(flushBatches, batch) } bc.reset() - return batchesData, sequenceIDs, callbacks, errors + // Sort items by sequenceID + sort.Slice(flushBatches, func(i, j int) bool { + return flushBatches[i].SequenceID < flushBatches[j].SequenceID + }) + return flushBatches } -func (bc *keyBasedBatchContainer) Flush() ( - batchData Buffer, sequenceID uint64, callbacks []interface{}, err error, -) { +func (bc *keyBasedBatchContainer) Flush() *FlushBatch { panic("multi batches container not support Flush(), please use FlushBatches() instead") } diff --git a/pulsar/internal/key_based_batch_builder_test.go b/pulsar/internal/key_based_batch_builder_test.go new file mode 100644 index 0000000000..9df33b0780 --- /dev/null +++ b/pulsar/internal/key_based_batch_builder_test.go @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "fmt" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +type mockBufferPool struct { +} + +func (m *mockBufferPool) GetBuffer() Buffer { + return nil +} + +type mockEncryptor struct { +} + +func (m *mockEncryptor) Encrypt(_ []byte, _ *pb.MessageMetadata) ([]byte, error) { + return []byte("test"), nil +} + +func TestKeyBasedBatcherOrdering(t *testing.T) { + keyBatcher, err := NewKeyBasedBatchBuilder( + 1000, + 1000, + 1000, + "test", + 1, + pb.CompressionType_NONE, + compression.Level(0), + &mockBufferPool{}, + log.NewLoggerWithLogrus(logrus.StandardLogger()), + &mockEncryptor{}, + ) + if err != nil { + assert.Fail(t, "Failed to create key based batcher") + } + + sequenceID := uint64(0) + for i := 0; i < 10; i++ { + metadata := &pb.SingleMessageMetadata{ + OrderingKey: []byte(fmt.Sprintf("key-%d", i)), + PayloadSize: proto.Int32(0), + } + assert.True(t, keyBatcher.Add(metadata, &sequenceID, []byte("test"), nil, nil, time.Now(), + nil, false, false, 0, 0)) + } + + batches := keyBatcher.FlushBatches() + for i := 1; i < len(batches); i++ { + if batches[i].SequenceID <= batches[i-1].SequenceID { + t.Errorf("Batch id is not incremental at index %d: %d <= %d", i, batches[i].SequenceID, batches[i-1].SequenceID) + } + } +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e0981303f3..1b2fae729c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -846,10 +846,11 @@ func (p *partitionProducer) internalFlushCurrentBatch() { return } - batchData, sequenceID, callbacks, err := p.batchBuilder.Flush() - if batchData == nil { + batch := p.batchBuilder.Flush() + if batch == nil { return } + batchData, sequenceID, callbacks, err := batch.BatchData, batch.SequenceID, batch.Callbacks, batch.Error // error occurred in batch flush // report it using callback @@ -969,38 +970,38 @@ func (p *partitionProducer) failTimeoutMessages() { } func (p *partitionProducer) internalFlushCurrentBatches() { - batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches() - if batchesData == nil { + flushBatches := p.batchBuilder.FlushBatches() + if flushBatches == nil { return } - for i := range batchesData { + for _, b := range flushBatches { // error occurred in processing batch // report it using callback - if errs[i] != nil { - for _, cb := range callbacks[i] { + if b.Error != nil { + for _, cb := range b.Callbacks { if sr, ok := cb.(*sendRequest); ok { - sr.done(nil, errs[i]) + sr.done(nil, b.Error) } } - if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) { - p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i]) + if errors.Is(b.Error, internal.ErrExceedMaxMessageSize) { + p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", b.Error) return } continue } - if batchesData[i] == nil { + if b.BatchData == nil { continue } p.pendingQueue.Put(&pendingItem{ sentAt: time.Now(), - buffer: batchesData[i], - sequenceID: sequenceIDs[i], - sendRequests: callbacks[i], + buffer: b.BatchData, + sequenceID: b.SequenceID, + sendRequests: b.Callbacks, }) - p._getConn().WriteData(batchesData[i]) + p._getConn().WriteData(b.BatchData) } }