Skip to content

Commit

Permalink
[pulsar-client-cpp] Support Seek on Partitioned Topic by Time (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
k2la authored and cdbartholomew committed Jul 24, 2020
1 parent 0059cda commit b84cf8e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,16 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
}

void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
callback(ResultOperationNotSupported);
Lock stateLock(mutex_);
if (state_ != Ready) {
stateLock.unlock();
callback(ResultAlreadyClosed);
return;
}
stateLock.unlock();
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->seekAsync(timestamp, callback);
}
}

void PartitionedConsumerImpl::runPartitionUpdateTask() {
Expand Down
77 changes: 77 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,83 @@ TEST(BasicEndToEndTest, testSeek) {
ASSERT_EQ(ResultOk, client.close());
}

TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "persistent://public/default/testSeekOnPartitionedTopic";

std::string url =
adminUrl + "admin/v2/persistent/public/default/testSeekOnPartitionedTopic" + "/partitions";
int res = makePutRequest(url, "3");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

std::string subName = "sub-testSeekOnPartitionedTopic";
Producer producer;

Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setReceiverQueueSize(1);
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);

uint64_t timestampMillis = TimeUtils::currentTimeMillis();

// Send 100 messages synchronously
std::string msgContent = "msg-content";
LOG_INFO("Publishing 100 messages synchronously");
int msgNum = 0;
for (; msgNum < 100; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

LOG_INFO("Trying to receive 100 messages");
Message msgReceived;
for (msgNum = 0; msgNum < 100; msgNum++) {
consumer.receive(msgReceived, 3000);
LOG_DEBUG("Received message :" << msgReceived.getMessageId());
std::stringstream expected;
expected << msgContent << msgNum;
ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
}

// seek to the time before sending messages, expected receive first message.
result = consumer.seek(timestampMillis);
// Sleeping for 500ms to wait for consumer re-connect
std::this_thread::sleep_for(std::chrono::microseconds(500 * 1000));

ASSERT_EQ(ResultOk, result);
consumer.receive(msgReceived, 3000);
LOG_ERROR("Received message :" << msgReceived.getMessageId());
std::stringstream expected;
msgNum = 0;
expected << msgContent << msgNum;
ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
ASSERT_EQ(ResultOk, consumer.unsubscribe());
ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}

TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
Client client(lookupUrl);
std::string topicName = "testUnAckedMessageTimeout";
Expand Down

0 comments on commit b84cf8e

Please sign in to comment.