-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] [Issue 456] feat: support chunked msg #717
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShuaoZhang Thank you very much for implementing chunking for the go client. It is a highly desired feature by the community.
Based on Java implementation, ConsumerImpl.java, Chunking also requires changes at the consumer side to be able to assembly chunks into the original message. It basically needs to check NumOfChunksMsg in the metadata. It also needs to support chunks from different producers.
On the consumer side, we also need to verify the chunking is only supported by Exclusive and Failover subscription.
According Java implementation, consumer also has a configuration parameter called maxPendingChuckedMessage
that allows consumer to drop unchunked messages when the max queue is full.
It will be great if you can add a few test cases.
@@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) { | |||
} | |||
} | |||
|
|||
func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) { | |||
chunkSize := int(p._getConn().GetMaxMessageSize()) | |||
totalChunks := (len(payload)+1)/chunkSize + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last +1
may not be accurate. Java implementation has a more precise way to calculate the number of chunks
int totalChunks = canAddToBatch(msg) ? 1
: Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+ (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
if right > len(payload)-1 { | ||
right = len(payload) - 1 | ||
} | ||
// [left, right) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this as a separate function to return a slice of [ {trunk 0 left,right}, {trunk 1 left,right} ... ]. So that we can write a unit test to verify any number is missing from the splitting. WDYT?
sequenceID: uint64(*msg.SequenceID), | ||
sendRequests: callbacks, | ||
}) | ||
p._getConn().WriteData(newBuffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a batching is enabled, how does this flush work with batching?
One scenario is there is message already in a batch yet to be flushed upon batch requirements are fulfilled. There is a large message added requires chunking. Is this logic going to flush the chunk message ahead of previously batched message?
So do you need to flush the batch first before call individual chunking flushing? Probably add a logic like this before flush a chunk?
batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData != nil {
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
batchData: batchData,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(batchData)}
@@ -207,6 +215,57 @@ func (bc *batchContainer) Add( | |||
return true | |||
} | |||
|
|||
func (bc *batchContainer) AddMessageMetaData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find any code calls this function. Who's supposed to call this function?
Master Issue: [#456](#456) ### Motivation Make pulsar go client support chunking to produce/consume big messages. The earlier implementation ([#717](#717)) didn't take into account many details, so I decided to reimplement it. ### Modifications - Add `internalSingleSend` to send message without batch because batch message will not be received by chunk. - Moved `BlockIfQueueFull` check from `internalSendAsync` to `internalSend` (`canAddQueue`) to ensure the normal block in chunking. - Make producer send big messages by chunking. - Add `chunkedMsgCtxMap` to store chunked messages meta and data. - Make consumer can obtain chunks and consume the big message.
Fixes #456
Motivation
Allow go clients to send and receive large messages in chunks.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation