-
Notifications
You must be signed in to change notification settings - Fork 709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IMPROVEMENT] added client retry for jetstream async publish old API #1695
Conversation
e038a42
to
6753a97
Compare
6753a97
to
9081c04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! I have a few comments.
js.go
Outdated
dch := js.dch | ||
js.dch = nil | ||
// Defer here so error is processed and can be checked. | ||
defer close(dch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This defer does not work as expected since it'll be executed at the end of closeDch
, not the caller. While it would be good to extract this func since it's repeated twice, I do think that we should make sure that close(dch)
is executed at the very end to first send signals on the pubAckFuture
and only later to PublishAsyncComplete()
js.go
Outdated
@@ -894,7 +933,10 @@ func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFutu | |||
const defaultStallWait = 200 * time.Millisecond | |||
|
|||
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { | |||
var o pubOpts | |||
var o = pubOpts{ | |||
rwait: DefaultPubRetryWait, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an old API many people rely on, I would be hesitant to make the retries default, but rather treat them as opt-in and clearly state it in the doc. I know that this makes it work differently than the new API, but I don't think we should be changing the default behavior in this case.
js.dch = nil | ||
// Defer here so error is processed and can be checked. | ||
defer close(dch) | ||
} | ||
} | ||
|
||
doErr := func(err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should clear the reply subject before passing the message in error handler. If user wants to retry manually inside the error handler, having paf.msg.Reply
set will cause the retry attempt to fail with nats: reply subject should be empty
object.go
Outdated
@@ -398,7 +398,7 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn | |||
return nil | |||
} | |||
|
|||
m, h := NewMsg(chunkSubj), sha256.New() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier, reply field in msg was set to empty string on PublishMsgAsync
exit using defer. After adding retry logic, go test detected race between setting reply in async handler handleAsyncReply
and emptying it in PublishMsgAsync
. So, the defer call which emptied the reply inside PublishMsgAsync was removed.
The code was re-using the msg object inside the loop which called PublishMsgAsync
and the test failed complaining about nats reply being not empty. Hence, the new msg object is being created in every iteration of the loop now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no good I think. We should not leave Msg.Reply
set with the generated subject as it makes it impossible to re-publish by the user (as in the object store now). By the way, having Put()
in object store recreate the message with each iteration means we have increased allocations (especially for large payloads / small chunks).
I may have an idea of how to tackle this problem - we could stop modifying the message passed by the user (which would prevent race) and utilize the low level nc.publish()
. Here's my initial attempt from your branch: https://gist.github.com/piotrpio/796e5203e2d5be2cabda771e9ec51d54
Possibly this can be improved, but it seems to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture. // The message should not be changed until the PubAckFuture has been processed.
From the above comment, the way Put()
uses PublishMsgAsync
is incorrect since it seems to modify msg object without waiting on PubAckFuture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok but even so, with your changes the message will never be restored to its original state right? Even after the PubAckFuture
has been processed. If we need to fix it anyway we can also improve by not modifying the message at all I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that a caller would assume that message would not be modified in anyway after PublishMsgAsync
is called and PubAckFuture
is processed. This is the reason why Reply was unset earlier. I will update the PR by applying your patch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it was unset before your changes: https://github.com/nats-io/nats.go/pull/1695/files#diff-176a2342c55218b70f165bed6a7731dc83d901f9513382f3c98bbd3d08ce0678L941
After PubAckFuture
is processed, the message can be re-used by the user and thus should be in its original state:
/ The message should not be changed *until* the PubAckFuture has been processed.
Again, your changes are good in general and we should definitely process those but we also need to be very careful when changing any behavior in the old JetStream API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I understand it was unset before these changes as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piotrpio I have updated the PR by applying your patch with one small change. Please review the changes
test/js_test.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really have tests for async retries. For reference, see tests in the new API: https://github.com/nats-io/nats.go/blob/main/jetstream/test/publish_test.go#L1402
@piotrpio I incorporated the review comments but the CI pipeline now fails in an unrelated test. When I run the tests locally, all the tests succeed |
748e304
to
9fcc865
Compare
9fcc865
to
59f8525
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the contribution!
This PR contains following:
This PR fixes following issues:
PublishAsync
Retry Mechanism Not Working as Expected #1678