Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event for recovering consumer #1304

Merged
merged 2 commits into from
Feb 23, 2023
Merged

Conversation

Zerpet
Copy link
Contributor

@Zerpet Zerpet commented Feb 22, 2023

Proposed Changes

Related to #1293.

In the context of Streams over AMQP, the consumer must keep track of the consumer offset. When a consumer subscribes to a stream-type queue, it may provide a consumer argument to specify a "point" in the stream to attach to.

This library records consumers, and their arguments, for topology recovery purposes. When a consumer is declared, it starts reading at an arbitrary point e.g. offset=123. After receiving messages, the offset "moves" forward. In the event of a connection recovery due to e.g. network error, the consumer is re-declared with the offset recorded when it was first declared i.e. offset=123. This is not correct, because the consumer has received some messages, and the offset value when it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is about to be recovered, but has not started recovering yet. This event exposes the consumer arguments as a reference type, allowing an event handler to update the consumer offset, or any other consumer argument.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

The recovering-consumer event is exposed and fire in the auto-recovering connection class. However, the concept of a consumer is a concern for the Model classes. IMO, this makes this feature a bit awkward to reason about, because user code responsible for connections may not necessarily know about models and consumers. In order to use this feature, such code would have to become aware of some parts of the consumer.

The above in inevitable, given that consumer and model recovery is called from the auto-recovering connection class.


In the scenario where a model declares more than 1 consumer, the recovering-consumer event handler will be called for each consumer recorded. It is responsibility of the caller to inspect the consumer tag, and determine if the handler logic should be applied to the consumer arguments the current event relates to.

Edit: formatting

Related to #1293. In the context of Streams over AMQP,
the consumer must keep track of the consumer offset.
When a consumer subscribes to a stream-type queue, it
may provide a consumer argument to specify a "point"
in the stream to attach to.

This library records consumers, and their arguments,
for topology recovery purposes. When a consumer is
declared, it starts reading at an arbitrary point e.g.
offset=123. After receiving messages, the offset "moves"
forward. In the event of a connection recovery due to
e.g. network error, the consumer is re-declared with
the offset recorded when it was first declared i.e.
offset=123. This is not correct, because the consumer
has received some messages, and the offset value when
it was first declared is not accurate anymore.

This commit adds an event that fire when a consumer is
about to be recovered, but has not started recovering yet.
This event exposes the consumer arguments as a reference
type, allowing an event handler to update the consumer
offset, or any other consumer argument.

Signed-off-by: Aitor Perez Cedres <acedres@vmware.com>
@Zerpet Zerpet added this to the 6.5.0 milestone Feb 22, 2023
@Zerpet Zerpet self-assigned this Feb 22, 2023
@Zerpet
Copy link
Contributor Author

Zerpet commented Feb 22, 2023

Git was complaning when I tried to backport locally from main to 6.x, so I created this PR first, to merge in the 6.x branch. I will create a follow up PR to add this feature to main after this PR is merged.

Marked this a draft because I'm not sure if I should add more test coverage 🙃 Any comments on this regard are very welcome.

@lukebakken lukebakken self-assigned this Feb 22, 2023
Copy link
Contributor

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to me. One question - does the Java client have a similar event during consumer recovery?

@Zerpet Zerpet marked this pull request as ready for review February 23, 2023 11:58
@Zerpet
Copy link
Contributor Author

Zerpet commented Feb 23, 2023

Did some more manual testing to gain more confidence. This PR should cover the use case from the OP in #1293. Something similar to this, in a class that implements a consumer, does the trick:

// ulong StreamOffset;
// string _consumerTag
        (_connection as IAutorecoveringConnection)!.RecoveringConsumer += (_, args) =>
        {
            if (args.ConsumerTag == _consumerTag)
            {
                args.ConsumerArguments["x-stream-offset"] = StreamOffset;
            }
        };

@lukebakken it's a great question :D I'll have a look.

@michaelklishin
Copy link
Member

michaelklishin commented Feb 23, 2023

Java client has consumer recovery listeners, they are just plain old Java objects and not C# event handlers.

@Laurianti
Copy link

Did some more manual testing to gain more confidence. This PR should cover the use case from the OP in #1293. Something similar to this, in a class that implements a consumer, does the trick:

// ulong StreamOffset;
// string _consumerTag
        (_connection as IAutorecoveringConnection)!.RecoveringConsumer += (_, args) =>
        {
            if (args.ConsumerTag == _consumerTag)
            {
                args.ConsumerArguments["x-stream-offset"] = StreamOffset;
            }
        };

@lukebakken it's a great question :D I'll have a look.

ulong cannot be used. use long instead of ulong

#1299

@lukebakken
Copy link
Contributor

@Laurianti I don't know why you added that comment. Would you care to clarify?

@Laurianti
Copy link

@Laurianti I don't know why you added that comment. Would you care to clarify?

Using ulong for x-stream-offset in Zerpet's example leads to consumer auto-recovery failure. So, the example is incorrect and needs long.

#1299

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants