You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
After the reconnection of the producer, the msg in the pendingQueue will be resend.
But not every msg in pendingQueue hasn't send to broker, the resend logic here seems to deal with the problem that producer meet a network error when producing a msg which cause the msg doesn't really send to broker.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
will also trigger the reconnection logic again and again, as well as the resend logic, which cause the consumer to consume the same msg with difference msgId
The text was updated successfully, but these errors were encountered:
Try to present a solution, the problem is that in the resend logic after the reconnection of producer, no mater the item in pendingQueue has been sent or not, all the items will be resend to broker, which will cause the redundancy of msg.
We can add sent flag in pendingItemQueue initialized as false, which will be set to true after the corresponding buffer has been sent successfully.
And filter the items which has been sent after the reconnection, resend the item which has not been sent only.
pulsar-client-go/pulsar/producer_partition.go
Line 272 in 965045a
After the reconnection of the producer, the msg in the pendingQueue will be resend.
But not every msg in pendingQueue hasn't send to broker, the resend logic here seems to deal with the problem that producer meet a network error when producing a msg which cause the msg doesn't really send to broker.
While, the code here
pulsar-client-go/pulsar/producer_partition.go
Line 793 in 965045a
will also trigger the reconnection logic again and again, as well as the resend logic, which cause the consumer to consume the same msg with difference msgId
The text was updated successfully, but these errors were encountered: