Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support acknowledging a list of message IDs #1301

Merged

Conversation

BewareMyPower
Copy link
Contributor

Motivation

For Shared and Key_Shared subscriptions, if some messages failed to acknowledge, these messages would never be dispatched to the consumer until restarted. If the number of unacknowledged messages reached the threshold, the broker would never dispatch messages anymore. However, the consumer does not have a chance to check which messages failed to acknowledged.

Even if this case was not hit, if the consumer restarted after consuming many messages, the old unacknowledged messages would be delivered again, which is very confusing and might affect the business logic.

Therefore, we can only enable AckWithResponse to know which messages failed to acknowledge. Unfortunately, currently the Go SDK only supports acknowledging single messages. It harms the performance significantly.

To solve this solution, this PR adds an API to acknowledge a list of messages.

	// AckIDList the consumption of a list of messages, identified by their MessageIDs
	// Returns a map of MessageID to error, the keys are the MessageIDs that failed to be acknowledged
	AckIDList([]MessageID) map[MessageID]error

Users can save the failed message IDs and add them again in the next AckIDList call.

Modifications

  • Add an AckIDList API and reuse the logic of ack_grouping_tracker.go to convert user provided message IDs to the message IDs in the ACK requests
  • Add the request id to the ACK request in internalAckList and wait for the response error via a error channel
  • Add TestAckIDList to verify the case that a message ID list has message IDs of non-batched messages, whole batched messages or partial batched messages because the behaviors are different if the batch index ACK is enabled
  • Add TestMultiTopicAckIDList to verify the multi-topics case, including regex subscription.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@BewareMyPower BewareMyPower self-assigned this Oct 29, 2024
@BewareMyPower BewareMyPower added this to the v0.15.0 milestone Oct 29, 2024
@nodece
Copy link
Member

nodece commented Oct 30, 2024

This feature is good to me, I have some questions:

  1. Why does the AckIDList not support ack the incomplete batch message, when the users use that, it is confusing.
  2. Why not refactor the ackIDCommon(msgID MessageID, withResponse bool, txn Transaction), we can use trackingMessageID to track the batch message. Right now , the ackIDCommon and AckIDList have the duplication code.

@BewareMyPower
Copy link
Contributor Author

The 1st question

What's confusing is the tracking message ID itself. The existing AckID method's semantic is wrong. Let's look at the following example with AckID:

func TestMyAck(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	assert.Nil(t, err)
	defer client.Close()

	topic := fmt.Sprintf("test-my-ack-%v", time.Now().Nanosecond())
	createConsumer := func() Consumer {
		consumer, err := client.Subscribe(ConsumerOptions{
			Topic:                       topic,
			SubscriptionName:            "my-sub",
			SubscriptionInitialPosition: SubscriptionPositionEarliest,
			Type:                        Shared,
			AckWithResponse:             true,
		})
		assert.Nil(t, err)
		return consumer
	}
	consumer := createConsumer()
	sendMessages(t, client, topic, 0, 2, true) // send 0 and 1 in the same batch
	msgs := receiveMessages(t, consumer, 2)
	for i := 0; i < 2; i++ {
		fmt.Println("Received message: ", string(msgs[i].Payload()), msgs[i].ID())
	}

	if err := consumer.AckID(msgs[0].ID()); err != nil {
		fmt.Println("Ack message 0 failed: ", err.Error())
	} else {
		fmt.Println("Ack message 0 success")
	}
	consumer.Close()

	consumer = createConsumer()
	msgs = receiveMessages(t, consumer, 1)
	fmt.Println("Received message: ", string(msgs[0].Payload()), msgs[0].ID())
}

Outputs:

Received message:  msg-0 9:0:0
Received message:  msg-1 9:0:0
Ack message 0 success
Received message:  msg-0 9:0:0

From the perspective from user side:

  • "msg-0"'s message ID has been acknowledged successfully
  • "msg-0" was delivered again after restarting the consumer

Actually this API implements AckWithResponse with correct semantics. Replacing the AckID call with:

	for msgID, err := range consumer.AckIDList([]MessageID{msgs[0].ID()}) {
		fmt.Println("Failed to acknowledge ", msgID, err.Error())
	}

The outputs will be:

Received message:  msg-0 10:0:0
Received message:  msg-1 10:0:0
Failed to acknowledge  10:0:0 incomplete batch
Received message:  msg-0 10:0:0

P.S. we should fix the String() method of message ID.

Users should add the failed message ID to the next message ID list passed to AckIDList. Acknowledging the same message ID

IMO, we should make batch index ACK enabled by default for both client side and server side. The current default behavior is really confusing.

The 2nd question

ackIDCommon is an abstraction for acknowledgment on a single message ID. When AckWithResponse is true, we cannot reuse this method because we should avoid sending N requests.

@BewareMyPower
Copy link
Contributor Author

@nodece I added TestAckIDListRetry to show why the semantic of this PR is correct.

@nodece
Copy link
Member

nodece commented Oct 30, 2024

@BewareMyPower

  1. The 1st answer

Your explanation is clear, I agree with you:

IMO, we should make batch index ACK enabled by default for both client side and server side. The current default behavior is really confusing.

Default to batch index ACK is disabled. When disabled, the users must ack all batch messages by AckID(id), and then the client sends the ack request to the broker.

Received message: msg-0 9:0:0
Received message: msg-1 9:0:0
Ack message 0 success
Received message: msg-0 9:0:0

You only ack the first message, the client doesn't send the ack request to the broker, so you still receive the first message after restart the consumer.

I remember the Java client has the same behavior, if I am wrong, please let me know.

  1. The 2nd answer.

ackIDCommon is an abstraction for acknowledgment on a single message ID. When AckWithResponse is true, we cannot reuse this method because we should avoid sending N requests.

You can refactor the ackIDCommon, because you ignore the ackTracer, so it is confusing. I hope we still use the ackTracer to track all(non-batch, batch, chunk) messages, and we can keep the same semantics with the AckID.

You can also disscus this issue on the dev mailing list.

@BewareMyPower
Copy link
Contributor Author

the client doesn't send the ack request to the broker

There is no way to let the client know whether the ack request was sent. That's the point. In addition, users should never care if the ack was sent. They only care if the ack succeeded.

I remember the Java client has the same behavior

Yeah, but this behavior is wrong. I don't want to blame the author but the error handling is really not taken care of. Happy path is always easy to write.

because you ignore the ackTracer, so it is confusing.

Oh I see. Let me think a better way to reuse the code.

You can also disscus this issue on the dev mailing list.

I plan to write a PIP to explain these things, including why to enable batch index ACK by default, the mess of acknowledge APIs' semantics.

@BewareMyPower BewareMyPower marked this pull request as draft October 31, 2024 07:25
@BewareMyPower BewareMyPower marked this pull request as ready for review October 31, 2024 13:59
@BewareMyPower
Copy link
Contributor Author

@nodece I reused the tracking message ID's tracker and used the semantics that ACK on an incomplete message ID in the batch does not return an error. Please check the latest tests. As for reusing the ackIDCommon method. It's still hard. There is not much to reuse except for the ack method call on the tracking message ID. The ack and ackWithTxn methods of unAckChunksTracker are also a mess.

pulsar/consumer.go Outdated Show resolved Hide resolved
@BewareMyPower BewareMyPower marked this pull request as draft November 1, 2024 08:01
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x38 pc=0x1033af634]

goroutine 28 [running]:
testing.tRunner.func1.2({0x103773580, 0x103e9bde0})
	/usr/local/go/src/testing/testing.go:1631 +0x1c4
testing.tRunner.func1()
	/usr/local/go/src/testing/testing.go:1634 +0x33c
panic({0x103773580?, 0x103e9bde0?})
	/usr/local/go/src/runtime/panic.go:770 +0x124
github.com/apache/pulsar-client-go/pulsar.(*consumer).checkMsgIDPartition(0x140001126c8, {0x0?, 0x0?})
	/Users/xuyunze/github.com/bewaremypower/pulsar-client-go/pulsar/consumer_impl.go:757 +0x24
@BewareMyPower BewareMyPower force-pushed the bewaremypower/ack-batch-api branch from d4687ce to 14cbf16 Compare November 1, 2024 11:37
@BewareMyPower BewareMyPower marked this pull request as ready for review November 1, 2024 11:43
@BewareMyPower
Copy link
Contributor Author

@RobertIndie @nodece @crossoverJie @shibd Thanks for your reviews. The AckIDList API is updated now.

For the simple consumer case that only 1 topic is subscribed, since only 1 ACK request will be sent, these message IDs either all fail or all succeed. The only exceptional case is that invalid message IDs are passed. However, in this case, there is no way to handle these message IDs because acknowledging them will always fail. Hence I just return a trivial error here.

However, if the consumer subscribes multiple topics, there might be multiple partitionConsumer objects here, each of them has a separated events channel (eventsCh) so multiple ACK requests could be sent. Therefore, an AckError will be returned, which contains a map that maps the failed message ID to its error.

See the API documents and the tests for details. PTAL again.

@BewareMyPower BewareMyPower marked this pull request as draft November 1, 2024 12:05
@BewareMyPower BewareMyPower marked this pull request as ready for review November 1, 2024 12:15
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some small comment

pulsar/consumer_multitopic.go Outdated Show resolved Hide resolved
pulsar/consumer.go Outdated Show resolved Hide resolved
pulsar/consumer_multitopic.go Show resolved Hide resolved
pulsar/consumer_multitopic.go Outdated Show resolved Hide resolved
@BewareMyPower BewareMyPower marked this pull request as draft November 5, 2024 02:38
@BewareMyPower BewareMyPower marked this pull request as ready for review November 5, 2024 09:48
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/LGTM

pulsar/consumer_multitopic.go Show resolved Hide resolved
@BewareMyPower BewareMyPower merged commit 35076ac into apache:master Nov 6, 2024
7 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/ack-batch-api branch November 6, 2024 05:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants