Skip to content

Commit

Permalink
Merge pull request apache#29 from hrsakai/ack_cumulative
Browse files Browse the repository at this point in the history
 Add acknowledgeCumulative / receiveWithTimeout methods
  • Loading branch information
massakam authored Jun 10, 2019
2 parents b6ce761 + a945e49 commit d07163e
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pulsar-test-service-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PULSAR_DIR="${PULSAR_DIR:-/tmp/pulsar-test-dist}"
PKG=apache-pulsar-${VERSION}-bin.tar.gz

rm -rf $PULSAR_DIR
curl -L --create-dir "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-${VERSION}/${PKG}" -o $PULSAR_DIR/$PKG
curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/${PKG}" -o $PULSAR_DIR/$PKG
tar xfz $PULSAR_DIR/$PKG -C $PULSAR_DIR --strip-components 1

DATA_DIR=/tmp/pulsar-test-data
Expand Down
2 changes: 1 addition & 1 deletion run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ VERSION="${VERSION:-`cat ./pulsar-version.txt`}"
PULSAR_PKG_DIR="/tmp/pulsar-test-pkg"
rm -rf $PULSAR_PKG_DIR
for pkg in apache-pulsar-client-dev.deb apache-pulsar-client.deb;do
curl -L --create-dir "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg
curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg
done;
apt install $PULSAR_PKG_DIR/apache-pulsar-client*.deb

Expand Down
52 changes: 42 additions & 10 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ Napi::FunctionReference Consumer::constructor;
void Consumer::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);

Napi::Function func = DefineClass(env, "Consumer",
{
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
InstanceMethod("close", &Consumer::Close),
});
Napi::Function func =
DefineClass(env, "Consumer",
{
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("receiveWithTimeout", &Consumer::ReceiveWithTimeout),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative),
InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId),
InstanceMethod("close", &Consumer::Close),
});

constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
Expand Down Expand Up @@ -108,13 +112,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_

class ConsumerReceiveWorker : public Napi::AsyncWorker {
public:
ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer,
int64_t timeout = -1)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cConsumer(cConsumer) {}
cConsumer(cConsumer),
timeout(timeout) {}
~ConsumerReceiveWorker() {}
void Execute() {
pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
pulsar_result result;
if (timeout > 0) {
result = pulsar_consumer_receive_with_timeout(this->cConsumer, &(this->cMessage), timeout);
} else {
result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage));
}

if (result != pulsar_result_Ok) {
SetError(std::string("Failed to received message ") + pulsar_result_str(result));
Expand All @@ -130,6 +141,7 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
Napi::Promise::Deferred deferred;
pulsar_consumer_t *cConsumer;
pulsar_message_t *cMessage;
int64_t timeout;
};

Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
Expand All @@ -139,6 +151,14 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
return deferred.Promise();
}

Napi::Value Consumer::ReceiveWithTimeout(const Napi::CallbackInfo &info) {
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
wk->Queue();
return deferred.Promise();
}

void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
Expand All @@ -151,6 +171,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
}

void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
}

void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
MessageId *msgId = MessageId::Unwrap(obj);
pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
}

class ConsumerCloseWorker : public Napi::AsyncWorker {
public:
ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer)
Expand Down
3 changes: 3 additions & 0 deletions src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
pulsar_consumer_t *cConsumer;

Napi::Value Receive(const Napi::CallbackInfo &info);
Napi::Value ReceiveWithTimeout(const Napi::CallbackInfo &info);
void Acknowledge(const Napi::CallbackInfo &info);
void AcknowledgeId(const Napi::CallbackInfo &info);
void AcknowledgeCumulative(const Napi::CallbackInfo &info);
void AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};

Expand Down
39 changes: 39 additions & 0 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const Pulsar = require('../index.js');
subscription: 'sub1',
ackTimeoutMs: 10000,
});

expect(consumer).not.toBeNull();

const messages = [];
Expand All @@ -60,6 +61,44 @@ const Pulsar = require('../index.js');
}
expect(lodash.difference(messages, results)).toEqual([]);

await producer.close();
await consumer.close();
});

test('acknowledgeCumulative', async () => {
const producer = await client.createProducer({
topic: 'persistent://public/default/acknowledgeCumulative',
sendTimeoutMs: 30000,
batchingEnabled: true,
});
expect(producer).not.toBeNull();

const consumer = await client.subscribe({
topic: 'persistent://public/default/acknowledgeCumulative',
subscription: 'sub1',
ackTimeoutMs: 10000,
});
expect(consumer).not.toBeNull();

const messages = [];
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
messages.push(msg);
}
await producer.flush();

for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
if (i === 9) {
consumer.acknowledgeCumulative(msg);
}
}

await expect(consumer.receiveWithTimeout(1000)).rejects.toThrow('Failed to received message TimeOut');

await producer.close();
await consumer.close();
await client.close();
Expand Down

0 comments on commit d07163e

Please sign in to comment.