Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the AckIDList performance when there are many topics subscribed #1305

Merged

Conversation

BewareMyPower
Copy link
Contributor

Motivation

Currently when a consumer subscribes multiple topic-partitions and AckWithResponse is true, the AckIDList method will iterate over all internal consumers sequentially. It harms the performance especially there are many internal consumers. For example, if the connection of an internal consumer was stuck by some reason, message IDs from other consumer would be blocked for the operation timeout.

Modifications

In ackIDListFromMultiTopics, call consumer.AckIDList in goroutines and use a channel to receive all errors from these calls.

Add TestMultiTopicAckIDListTimeout, which sets a dummy connection instance whose SendRequest never completes the callback, to verify the AckIDList call will not take much more time than the operation timeout to complete. Without this improvement, it will take more than 5 times of the operation timeout to fail.

@BewareMyPower BewareMyPower force-pushed the bewaremypower/concurrent-ack-id-list branch from fbed4d3 to 33f7146 Compare November 16, 2024 14:45
@geniusjoe
Copy link
Contributor

I missed PR #1301 and I wonder does this AckIDList([]MessageID) map[MessageID]error API is an alternative to Java void acknowledge(List<MessageId> messageIdList) throws PulsarClientException API? Are there any differences when I use them?

@BewareMyPower
Copy link
Contributor Author

I wonder does this AckIDList([]MessageID) map[MessageID]error API is an alternative to Java void acknowledge(List messageIdList) throws PulsarClientException API?

Not an alternative. The Go API still returns an error, not map[MessageID]error. But the error can be cast to an AckError (map[MessageID]error) so you can just retry the failed message IDs rather than the whole message IDs. While the Java API void acknowledge(List<MessageId> messageIdList) throws PulsarClientException does not define such an exception that you can retrieve failed message IDs and corresponding errors from it.

@geniusjoe
Copy link
Contributor

Not an alternative. The Go API still returns an error, not map[MessageID]error. But the error can be cast to an AckError (map[MessageID]error) so you can just retry the failed message IDs rather than the whole message IDs. While the Java API void acknowledge(List<MessageId> messageIdList) throws PulsarClientException does not define such an exception that you can retrieve failed message IDs and corresponding errors from it.

So that if I don't need to check each message exception, I can just regard AckError as a normal exception? I think using error instead of map[MessageID]error is more consistent with Java API.

@BewareMyPower
Copy link
Contributor Author

So that if I don't need to check each message exception, I can just regard AckError as a normal exception?

Yes. You can still write the following code:

	if err := consumer.AckIDList(msgIDs); err != nil {
		fmt.Errorf("Failed to ack messages: %v", err)
	}

The design in Go client is more like to add the following exception to the Java client and document that this exception could be thrown by acknowledge.

    public static class AckException extends PulsarClientException {
        
        private final Map<MessageId, PulsarClientException> exceptionMap;

        public AckException(Map<MessageId, PulsarClientException> exceptionMap) {
            super(toString(exceptionMap));
            this.exceptionMap = exceptionMap;
        }
        
        private static String toString(Map<MessageId, PulsarClientException> exceptionMap) {
            return ""; // TODO
        }
    }

@geniusjoe
Copy link
Contributor

Yes. You can still write the following code:

	if err := consumer.AckIDList(msgIDs); err != nil {
		fmt.Errorf("Failed to ack messages: %v", err)
	}

The design in Go client is more like to add the following exception to the Java client and document that this exception could be thrown by acknowledge.

    public static class AckException extends PulsarClientException {
        
        private final Map<MessageId, PulsarClientException> exceptionMap;

        public AckException(Map<MessageId, PulsarClientException> exceptionMap) {
            super(toString(exceptionMap));
            this.exceptionMap = exceptionMap;
        }
        
        private static String toString(Map<MessageId, PulsarClientException> exceptionMap) {
            return ""; // TODO
        }
    }

I got it, thank you very much.

pulsar/consumer_multitopic.go Outdated Show resolved Hide resolved
pulsar/consumer_multitopic_test.go Outdated Show resolved Hide resolved
pulsar/consumer_multitopic_test.go Outdated Show resolved Hide resolved
@RobertIndie RobertIndie merged commit ffdc3af into apache:master Nov 19, 2024
7 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/concurrent-ack-id-list branch November 19, 2024 07:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants