Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-37] Add new APIs for consumer #4019

Merged
merged 9 commits into from
Apr 19, 2022
Merged

[RIP-37] Add new APIs for consumer #4019

merged 9 commits into from
Apr 19, 2022

Conversation

chenzlalvin
Copy link
Contributor

As we mentioned in RIP-37 New and unified APIs, establish a new APIs specifications. we divide it into two independent pull requests.

The producer part.
The consumer part.

Similar to the previous part #3987, this pull request is about the consumer part.

related issue: #3973

* <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
* (finite-state machine) to record the different states for each producer, which is similar to
* {@link Service.State}.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to get the MessageListenner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the exact scene that need get MessageListener.

Copy link
Contributor

@ni-ze ni-ze Mar 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we use subscribe mode, client will rebalance and poll messages from messageQueue into local buffer. MessageListener will be invoked after this relationship change, and start task to pull message.
On the other hand, app wants to be notify once the queue it consumes is determined, init app with messageQueue, messageListener will be used in this scene.

* SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group.
*
* <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself,
* simple consumer should be your first consideration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding the difference between SimpleConsumer and PullConsumer.

* @return map of messageQueue to messageViews.
*/
Map<MessageQueue, Collection<MessageView>> pull(int maxMessageNum) throws ClientException;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync pull can use pullAsync().get

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not convenient to catch the real exception(ClientException) when using pullAsync.get.

* @param messageQueue the specified messageQueue to commit offset
* @param committedOffset the specified offset commit to server
*/
void commit(MessageQueue messageQueue, long committedOffset) throws ClientException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need commit a collection of messageQueue interface;
and commitedOffset can not find in the return of pull method.

* @param subscriptionExpression new subscription expression to add.
* @return pull consumer instance.
*/
PullConsumer subscribe(SubscriptionExpression subscriptionExpression) throws ClientException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe(HashSet subs) is need.

* @param messageViews are batch of messages which need consume. this message in the collection may come from different topics and the collection size control by setBatchSize(int batchSize) in {@link PushConsumerBuilder}
* @param ackList are collection of messages which already consume success and need ack to server.
*/
void consume(Collection<MessageView> messageViews, Collection<MessageView> ackList);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider replacing ackList with failedList? The code will look cleaner if all messages succeed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider replacing ackList with failedList? The code will look cleaner if all messages succeed.

How do we define this behavior that exception is thrown during the invocation of MessageListener? Agree with your opinion to some degree, I feel anxious about user forget to put message into the ackList explicitly, which may cause a bulk of retry-messages.

@zhouxinyu
Copy link
Member

Let's merge this PR to the 5.0 branch first and polish the APIs continuously.

@zhouxinyu zhouxinyu merged commit df5e885 into apache:5.0.0-beta Apr 19, 2022
aaron-ai added a commit to aaron-ai/rocketmq that referenced this pull request Jun 20, 2022
zhouxinyu pushed a commit that referenced this pull request Jun 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants