-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add NATS JetStream events plugin #2433
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in general 👍 Just some smaller comments...
|
||
// parse the options | ||
options := events.ConsumeOptions{ | ||
Group: uuid.New().String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group name should be defineable. I want multiple services listening in the same group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just a "default" group if you don't provide one via the opts.
If you provide one, it'll be set in the next few lines:
for _, o := range opts {
o(&options)
}
Thanks for the contribution. Think the test is failing on something unrelated. Looking for a reviewer. |
} | ||
|
||
pubOpts := []nats.PubOpt{ | ||
// TODO: to make de-duplication work, we need to pass the event from the outside as an option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asim would be de-duplication a thing we want to support? I'm not sure if it can be supported by other event implementations.
https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
ctx, cancel := context.WithCancel(context.TODO()) | ||
defer cancel() | ||
|
||
// TODO: not supported by go-micro interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asim NATS Jetstream supports "stretching" the message ACK timeout (options.AckWait
) but that would need exposing a InProgressFunc
to the events channel consumer. Is that something we want and can support with other implementations? see https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive -> AckProgress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its best to do it in an isolated case specifically for this. You can see the rabbitmq broker plugin for implementation specific options that can be passed through. It's not clear that other implementations would support it so we'd inherit an option nothing can deal with.
Thanks a lot for this pull <3 |
Introduce a NATS JetStream events plugin, since go-micro currently only supports NATS Streaming, which is depreceated.