-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support sendTimeout #394
Support sendTimeout #394
Conversation
// SendTimeout set the timeout for a message that not be acknowledged by server since sent. | ||
// Send and SendAsync returns an error after timeout. | ||
// Default is 30 seconds, -1 to disable. | ||
SendTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be taken the ctx
passed in by the application at the moment of Send()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review.
I see PR#252 implementing sendTimeout
with context but progress has stalled.
Personally think there are 2 ways to implement sendTimeout
:
message level
- Perpose: precisely control the
sendTimeout
for each message, so that different messages can set different timeout, likeconsumer.Receive()
did. - Implement: set timeout context for
Send
, and check every message before dequeue, or something else.
producer level
- Perpose:
sendTimeout
for producer instance is sufficient for most situation, like client-java did. - Implement: set timeout timer to periodic fail pending queue.
- Trade-off: optional big change, we may need remove context parameter from Send method.
I will try context If context way is better, looking forward to reply, thanks.
pulsar/producer.go
Outdated
|
||
// BlockIfQueueFull control whether Send and SendAsync return error if producer's message queue is full. | ||
// Default is false. | ||
BlockIfQueueFull bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic should be negated here. The default should be to block if queue full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review.
I intented to maintain the same default behavior as client-java, but this does change the default behavior of current client-go.
I replaced NonBlockIfQueueFull
with BlockIfQueueFull
, just like DisableBatching
for enable batch by default.
pulsar/producer_partition.go
Outdated
func (p *partitionProducer) internalFlush(fr *flushRequest) { | ||
p.internalFlushCurrentBatch() | ||
|
||
p.queueLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having a separate go routine checking for timeouts can the message expiry check be performed here?
I don't think the timeout check needs to be exact? We just need to expires old messages at some point? This could simplify the code and reduces the need for another lock. We can just call this method failTimeoutMessages
and expire messages if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review, good idea.
In some case producer.eventLoop()
will be stucked such as producer is reconnecting to broker, in this case:
- Check at independent goroutine:
sendTimeout
still effective, periodic check and fail pending messages. - Check before flushing:
sendTimeout
not effective in this case.
If no need to check message timeout precisely, the above case will be ignored.
This is reasonable, since sendTimeout
now implemented at producer level, no need to be that precise, also avoided cost of pending queue lock.
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) | |||
// lock the pending item while sending the requests | |||
pi.Lock() | |||
defer pi.Unlock() | |||
if pi.sentAt > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg
I used sentAt
for checking timeout of messasges.
sentAt
always be set before puting into pending queue, so I removed this predicate.
Like sendRequest.publishTime did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok to me
sendRequests []interface{} | ||
completed bool | ||
} | ||
|
||
func (p *partitionProducer) internalFlushCurrentBatch() { | ||
if p.options.SendTimeout > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this be disabled? Setting it to -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, negative to disable, such as -1 typically.
https://github.com/apache/pulsar-client-go/blob/master/pulsar/producer.go#L81
Motivation
Support
SendTimeout
andDisableBlockIfQueueFull
for producer.Modifications
ProducerOptions
DisableBlockIfQueueFull
before acquiring publish semaphore while sending messages.TestSendTimeout
to verify sendTimeout effective whille backlog exceeded on topic.Others
client-go and client-java have different ways to disable
sendTimeout
sendTimeout
to 0 explicitly, to override default 30s configuration.sendTimeout
to -1, to bypass unset detection.Documentation
configure-producer
Verifying this change