-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve go client sending performance #253
Conversation
@@ -34,7 +34,7 @@ const ( | |||
// MaxBatchSize will be the largest size for a batch sent from this particular producer. | |||
// This is used as a baseline to allocate a new buffer that can hold the entire batch | |||
// without needing costly re-allocations. | |||
MaxBatchSize = 128 * 1024 | |||
MaxBatchSize = 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for using 128 KB is that we measured that there's no advantage in using bigger batches, as in we can easily saturate the network bandwidth already.
@@ -66,7 +66,7 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64, | |||
maxMessages = DefaultMaxMessagesPerBatch | |||
} | |||
bb := &BatchBuilder{ | |||
buffer: NewBuffer(4096), | |||
buffer: NewBuffer(MaxBatchSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's increasing the min memory size for each producer from 4KB to 1MB. That would be very bad for overall memory usage.
@@ -77,13 +86,43 @@ type buffer struct { | |||
|
|||
// NewBuffer creates and initializes a new Buffer using buf as its initial contents. | |||
func NewBuffer(size int) Buffer { | |||
var memSeg []byte | |||
memSegmentQueue := getMemorySegmentQueue(size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will introduce a contention point on a global mutex, plus reading from a global channel, which again uses a mutex. I'd like to see some data indicating how this change is going to perform compared to the current code.
@@ -166,7 +166,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO | |||
connectionTimeout: connectionTimeout, | |||
logicalAddr: logicalAddr, | |||
physicalAddr: physicalAddr, | |||
writeBuffer: NewBuffer(4096), | |||
writeBuffer: NewBuffer(512 * 1024), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer is auto-expanding as needed. I don't think increasing the minimum size to 0.5MB is going to be making any difference other than increasing the overall memory usage.
@@ -36,7 +36,7 @@ func newConnectionReader(cnx *connection) *connectionReader { | |||
return &connectionReader{ | |||
cnx: cnx, | |||
reader: bufio.NewReader(cnx.cnx), | |||
buffer: NewBuffer(4096), | |||
buffer: NewBuffer(4 * 1024 * 1024), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, the buffer is auto-expanding. Increasing here from 4 KB to 4 MB is going to just blow up the memory with no advantage.
Have you tried to benchmark the client using non-persistent topics? |
I did benchmark and code before this PR: #209, in order to get better performance I did some thing. After reading your comments, I think the optimization I did is not necessary. |
I will do more benchmark, and commit PR if necessary. |
Improve the sending performance to 50000 message per second, by