From d07afe5d5cc3f18755f443eb05f93176110e59c4 Mon Sep 17 00:00:00 2001 From: hrsakai Date: Fri, 7 Jun 2019 15:19:27 +0900 Subject: [PATCH 1/2] Change download url for pulsar pkg --- pulsar-test-service-start.sh | 2 +- run-unit-tests.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-test-service-start.sh b/pulsar-test-service-start.sh index 78e715e901bb5d..44cdf9c2b67857 100755 --- a/pulsar-test-service-start.sh +++ b/pulsar-test-service-start.sh @@ -28,7 +28,7 @@ PULSAR_DIR="${PULSAR_DIR:-/tmp/pulsar-test-dist}" PKG=apache-pulsar-${VERSION}-bin.tar.gz rm -rf $PULSAR_DIR -curl -L --create-dir "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-${VERSION}/${PKG}" -o $PULSAR_DIR/$PKG +curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/${PKG}" -o $PULSAR_DIR/$PKG tar xfz $PULSAR_DIR/$PKG -C $PULSAR_DIR --strip-components 1 DATA_DIR=/tmp/pulsar-test-data diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 1acce0ac757762..c521db44b6585d 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -28,7 +28,7 @@ VERSION="${VERSION:-`cat ./pulsar-version.txt`}" PULSAR_PKG_DIR="/tmp/pulsar-test-pkg" rm -rf $PULSAR_PKG_DIR for pkg in apache-pulsar-client-dev.deb apache-pulsar-client.deb;do - curl -L --create-dir "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg + curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg done; apt install $PULSAR_PKG_DIR/apache-pulsar-client*.deb From a945e49730ec4caf059dfb166020969dbcc69d6c Mon Sep 17 00:00:00 2001 From: hrsakai Date: Fri, 7 Jun 2019 15:21:03 +0900 Subject: [PATCH 2/2] Add acknowledgeCumulative / receiveWithTimeout methods --- src/Consumer.cc | 52 ++++++++++++++++++++++++++++++++-------- src/Consumer.h | 3 +++ tests/end_to_end.test.js | 39 ++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 10 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index db6fab4d528710..454ba3c1977034 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -28,13 +28,17 @@ Napi::FunctionReference Consumer::constructor; void Consumer::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); - Napi::Function func = DefineClass(env, "Consumer", - { - InstanceMethod("receive", &Consumer::Receive), - InstanceMethod("acknowledge", &Consumer::Acknowledge), - InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), - InstanceMethod("close", &Consumer::Close), - }); + Napi::Function func = + DefineClass(env, "Consumer", + { + InstanceMethod("receive", &Consumer::Receive), + InstanceMethod("receiveWithTimeout", &Consumer::ReceiveWithTimeout), + InstanceMethod("acknowledge", &Consumer::Acknowledge), + InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), + InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative), + InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId), + InstanceMethod("close", &Consumer::Close), + }); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -108,13 +112,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_ class ConsumerReceiveWorker : public Napi::AsyncWorker { public: - ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) + ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer, + int64_t timeout = -1) : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), deferred(deferred), - cConsumer(cConsumer) {} + cConsumer(cConsumer), + timeout(timeout) {} ~ConsumerReceiveWorker() {} void Execute() { - pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); + pulsar_result result; + if (timeout > 0) { + result = pulsar_consumer_receive_with_timeout(this->cConsumer, &(this->cMessage), timeout); + } else { + result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); + } if (result != pulsar_result_Ok) { SetError(std::string("Failed to received message ") + pulsar_result_str(result)); @@ -130,6 +141,7 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker { Napi::Promise::Deferred deferred; pulsar_consumer_t *cConsumer; pulsar_message_t *cMessage; + int64_t timeout; }; Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { @@ -139,6 +151,14 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { return deferred.Promise(); } +Napi::Value Consumer::ReceiveWithTimeout(const Napi::CallbackInfo &info) { + Napi::Number timeout = info[0].As().ToNumber(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value()); + wk->Queue(); + return deferred.Promise(); +} + void Consumer::Acknowledge(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As(); Message *msg = Message::Unwrap(obj); @@ -151,6 +171,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); } +void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As(); + Message *msg = Message::Unwrap(obj); + pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL); +} + +void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As(); + MessageId *msgId = MessageId::Unwrap(obj); + pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); +} + class ConsumerCloseWorker : public Napi::AsyncWorker { public: ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) diff --git a/src/Consumer.h b/src/Consumer.h index fe681245293aaa..5e8bf5be34d9a2 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -36,8 +36,11 @@ class Consumer : public Napi::ObjectWrap { pulsar_consumer_t *cConsumer; Napi::Value Receive(const Napi::CallbackInfo &info); + Napi::Value ReceiveWithTimeout(const Napi::CallbackInfo &info); void Acknowledge(const Napi::CallbackInfo &info); void AcknowledgeId(const Napi::CallbackInfo &info); + void AcknowledgeCumulative(const Napi::CallbackInfo &info); + void AcknowledgeCumulativeId(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); }; diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 9d5ed49cddc7a0..ae24d8653abb16 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -40,6 +40,7 @@ const Pulsar = require('../index.js'); subscription: 'sub1', ackTimeoutMs: 10000, }); + expect(consumer).not.toBeNull(); const messages = []; @@ -60,6 +61,44 @@ const Pulsar = require('../index.js'); } expect(lodash.difference(messages, results)).toEqual([]); + await producer.close(); + await consumer.close(); + }); + + test('acknowledgeCumulative', async () => { + const producer = await client.createProducer({ + topic: 'persistent://public/default/acknowledgeCumulative', + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + expect(producer).not.toBeNull(); + + const consumer = await client.subscribe({ + topic: 'persistent://public/default/acknowledgeCumulative', + subscription: 'sub1', + ackTimeoutMs: 10000, + }); + expect(consumer).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + if (i === 9) { + consumer.acknowledgeCumulative(msg); + } + } + + await expect(consumer.receiveWithTimeout(1000)).rejects.toThrow('Failed to received message TimeOut'); + await producer.close(); await consumer.close(); await client.close();