Skip to content

Commit

Permalink
feat: support batch index ack.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jun 13, 2023
1 parent bc5182a commit 453681f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 0 deletions.
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface ConsumerConfig {
maxPendingChunkedMessage?: number;
autoAckOldestChunkedMessageOnQueueFull?: number;
schema?: SchemaInfo;
batchIndexAckEnabled?: boolean;
}

export class Consumer {
Expand Down
8 changes: 8 additions & 0 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMessage";
static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
"autoAckOldestChunkedMessageOnQueueFull";
static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand Down Expand Up @@ -215,6 +216,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
this->cConsumerConfig.get(), autoAckOldestChunkedMessageOnQueueFull);
}

if (consumerConfig.Has(CFG_BATCH_INDEX_ACK_ENABLED) &&
consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).IsBoolean()) {
bool batchIndexAckEnabled = consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).ToBoolean();
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
batchIndexAckEnabled);
}
}

ConsumerConfig::~ConsumerConfig() {
Expand Down
3 changes: 3 additions & 0 deletions tests/conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ backlogQuotaDefaultLimitGB=10
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true

# Enable batch index ACK
acknowledgmentAtBatchIndexLevelEnabled=true

# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

Expand Down
58 changes: 58 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,63 @@ const Pulsar = require('../index.js');
await expect(consumer.close()).rejects.toThrow('Failed to close consumer: AlreadyClosed');
});
});

describe('Features', () => {
test('Batch index ack', async () => {
const topicName = 'test-batch-index-ack';
const producer = await client.createProducer({
topic: topicName,
batchingEnabled: true,
batchingMaxMessages: 100,
batchingMaxPublishDelayMs: 10000,
});

let consumer = await client.subscribe({
topic: topicName,
batchIndexAckEnabled: true,
subscription: 'test-batch-index-ack',
});

// Make sure send 0~5 is a batch msg.
for (let i = 0; i < 5; i += 1) {
const msg = `my-message-${i}`;
console.log(msg);
producer.send({
data: Buffer.from(msg),
});
}
await producer.flush();

// Receive msgs and just ack 0, 1 msgs
const results = [];
for (let i = 0; i < 5; i += 1) {
const msg = await consumer.receive();
results.push(msg);
}
expect(results.length).toEqual(5);
for (let i = 0; i < 2; i += 1) {
await consumer.acknowledge(results[i]);
await new Promise((resolve) => setTimeout(resolve, 200));
}

// Restart consumer after, just receive 2~5 msg.
await consumer.close();
consumer = await client.subscribe({
topic: topicName,
batchIndexAckEnabled: true,
subscription: 'test-batch-index-ack',
});
const results2 = [];
for (let i = 2; i < 5; i += 1) {
const msg = await consumer.receive();
results2.push(msg);
}
expect(results2.length).toEqual(3);
// assert no more msgs.
await expect(consumer.receive(1000)).rejects.toThrow(
'Failed to receive message: TimeOut',
);
});
});
});
})();

0 comments on commit 453681f

Please sign in to comment.