Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept received-msg-ack from different consumer than received-consumer on shared-subscription #52

Merged
merged 1 commit into from
Oct 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ void messageAcked(CommandAck ack) {
checkArgument(ack.getAckType() == AckType.Individual);

// Only ack a single message
pendingAcks.remove(position);
removePendingAcks(position);
subscription.acknowledgeMessage(position, AckType.Individual);
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
Expand Down Expand Up @@ -328,6 +328,25 @@ public int hashCode() {
return consumerName.hashCode() + 31 * cnx.hashCode();
}

/**
* first try to remove ack-position from the current_consumer's pendingAcks.
* if ack-message doesn't present into current_consumer's pendingAcks
* a. try to remove from other connected subscribed consumers (It happens when client
* tries to acknowledge message through different consumer under the same subscription)
*
*
* @param position
*/
private void removePendingAcks(PositionImpl position) {
if (!pendingAcks.remove(position)) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().remove(position)) {
break;
}
}
}
}

public ConcurrentOpenHashSet<PositionImpl> getPendingAcks() {
return pendingAcks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,75 @@ public void testSendCallBack() throws Exception {
assertEquals(message.getBytes().length, msgLength.get());
}
}

/**
* consume message from consumer1 and send acknowledgement from different consumer subscribed under same
* subscription-name
*
* @throws Exception
*/
@Test
public void testSharedConsumerAckDifferentConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(5);
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
Consumer consumer2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message msg = null;
Set<Message> consumerMsgSet1 = Sets.newHashSet();
Set<Message> consumerMsgSet2 = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
consumerMsgSet1.add(msg);

}
for (int i = 0; i < 5; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
consumerMsgSet2.add(msg);

}
consumerMsgSet1.stream().forEach(m -> {
try {
consumer2.acknowledge(m);
} catch (PulsarClientException e) {
fail();
}
});
consumerMsgSet2.stream().forEach(m -> {
try {
consumer1.acknowledge(m);
} catch (PulsarClientException e) {
fail();
}
});

consumer1.redeliverUnacknowledgedMessages();
consumer2.redeliverUnacknowledgedMessages();

try {
if (consumer1.receive(1, TimeUnit.SECONDS) != null || consumer2.receive(1, TimeUnit.SECONDS) != null) {
fail();
}
} finally {
consumer1.close();
consumer2.close();
}

log.info("-- Exiting {} test --", methodName);
}

private void receiveAsync(Consumer consumer, int totalMessage, int currentMessage, CountDownLatch latch,
final Set<String> consumeMsg, ExecutorService executor) throws PulsarClientException {
Expand Down