Skip to content

Commit

Permalink
Use boost::optional instead self-written Optional class (#138)
Browse files Browse the repository at this point in the history
Fixes #134

### Motivation

Switch to use boost::optional instead self-written Optional class

### Modifications

Remove class Optional in Utils.h and change all usage to boost::optional
  • Loading branch information
A authored Dec 7, 2022
1 parent 7a7b3aa commit 18dcdd5
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 120 deletions.
5 changes: 3 additions & 2 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <pulsar/MessageIdBuilder.h>

#include <boost/optional.hpp>
#include <fstream>

#include "Commands.h"
Expand Down Expand Up @@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = Optional<uint64_t>::empty();
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
Expand All @@ -40,7 +41,6 @@
#include "LookupDataResult.h"
#include "SharedBuffer.h"
#include "UtilAllocator.h"
#include "Utils.h"

namespace pulsar {

Expand Down Expand Up @@ -83,7 +83,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
Optional<uint64_t> topicEpoch;
boost::optional<uint64_t> topicEpoch;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down
8 changes: 4 additions & 4 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
Optional<MessageId> startMessageId, bool readCompacted,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
Expand All @@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_allocated_schema(getSchema(schemaInfo));
}

if (startMessageId.is_present()) {
if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
Expand Down Expand Up @@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
Expand All @@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
if (topicEpoch.is_present()) {
if (topicEpoch) {
producer->set_topic_epoch(topicEpoch.value());
}

Expand Down
22 changes: 10 additions & 12 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <boost/optional.hpp>
#include <set>

#include "ProtoApiEnums.h"
#include "SharedBuffer.h"
#include "Utils.h"

using namespace pulsar;

Expand Down Expand Up @@ -89,16 +89,14 @@ class Commands {
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);

static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
int priorityLevel = 0);
static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy, int priorityLevel = 0);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand All @@ -107,7 +105,7 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);

static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
CommandAck_AckType ackType, CommandAck_ValidationError validationError);
Expand Down
40 changes: 20 additions & 20 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
waitingForZeroQueueSizeMessage(false),
Expand Down Expand Up @@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
const auto startMessageId = clearReceiveQueue();
const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
? startMessageId
: Optional<MessageId>::empty();
const auto subscribeMessageId =
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
startMessageId_ = startMessageId;
lockForMessageId.unlock();

Expand Down Expand Up @@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
});
}

Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx) {
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
Expand Down Expand Up @@ -422,14 +422,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
return Optional<SharedBuffer>::empty();
return boost::none;
}

chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
return Optional<SharedBuffer>::empty();
return boost::none;
}

LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
Expand All @@ -438,9 +438,9 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
auto wholePayload = chunkedMsgCtx.getBuffer();
chunkedMessageCache_.remove(uuid);
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
return Optional<SharedBuffer>::of(wholePayload);
return wholePayload;
} else {
return Optional<SharedBuffer>::empty();
return boost::none;
}
}

Expand Down Expand Up @@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).build();
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
if (optionalPayload.is_present()) {
if (optionalPayload) {
payload = optionalPayload.value();
} else {
return;
Expand Down Expand Up @@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
m.impl_->convertPayloadToKeyValue(config_.getSchema());

const auto startMessageId = startMessageId_.get();
if (isPersistent_ && startMessageId.is_present() &&
if (isPersistent_ && startMessageId &&
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
m.getMessageId().entryId() == startMessageId.value().entryId() &&
isPriorEntryIndex(m.getMessageId().entryId())) {
Expand Down Expand Up @@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setTopicName(batchedMessage.getTopicName());
msg.impl_->convertPayloadToKeyValue(config_.getSchema());

if (startMessageId.is_present()) {
if (startMessageId) {
const MessageId& msgId = msg.getMessageId();

// If we are receiving a batch message, we need to discard messages that were prior
Expand Down Expand Up @@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
* was
* not seen by the application
*/
Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
bool expectedDuringSeek = true;
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
return Optional<MessageId>::of(seekMessageId_.get());
return seekMessageId_.get();
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
return startMessageId_.get();
}
Expand All @@ -943,12 +943,12 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() - 1)
.build();
return Optional<MessageId>::of(previousMessageId);
return previousMessageId;
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
// in the past
return Optional<MessageId>::of(lastDequedMessageId_);
return lastDequedMessageId_;
} else {
// No message was received or dequeued by this consumer. Next message would still be the
// startMessageId
Expand Down
17 changes: 9 additions & 8 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <pulsar/Reader.h>

#include <boost/optional.hpp>
#include <functional>
#include <memory>

Expand Down Expand Up @@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase {
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
Optional<MessageId> startMessageId = Optional<MessageId>::empty());
boost::optional<MessageId> startMessageId = boost::none);
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
Expand Down Expand Up @@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase {
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback);

Optional<MessageId> clearReceiveQueue();
boost::optional<MessageId> clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
ResultCallback callback);

Expand Down Expand Up @@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastMessageIdInBroker_{MessageId::earliest()};

std::atomic_bool duringSeek_{false};
Synchronized<Optional<MessageId>> startMessageId_{Optional<MessageId>::empty()};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};

class ChunkedMessageCtx {
Expand Down Expand Up @@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase {
* @return the concatenated payload if chunks are concatenated into a completed message payload
* successfully, else Optional::empty()
*/
Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx);
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx);

friend class PulsarFriend;

Expand Down
15 changes: 7 additions & 8 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
for (int i = 0; i < numberPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto optConsumer = consumers_.find(topicPartitionName);
if (optConsumer.is_empty()) {
if (!optConsumer) {
LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
callback(ResultUnknownError);
continue;
Expand All @@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);

auto optConsumer = consumers_.remove(topicPartitionName);
if (optConsumer.is_present()) {
if (optConsumer) {
optConsumer.value()->pauseMessageListener();
}

Expand Down Expand Up @@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
const std::string& topicPartitionName = msgId.getTopicName();
auto optConsumer = consumers_.find(topicPartitionName);

if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->acknowledgeAsync(msgId, callback);
} else {
Expand Down Expand Up @@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
};
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
Expand All @@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
auto optConsumer = consumers_.find(msgId.getTopicName());

if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->negativeAcknowledge(msgId);
}
Expand Down Expand Up @@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const {
return false;
}

return consumers_
.findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
.is_empty();
return !consumers_.findFirstValueIf(
[](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
}

uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
Expand Down
8 changes: 4 additions & 4 deletions lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
}

ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
impl_->producerName = Optional<std::string>::of(producerName);
impl_->producerName = boost::make_optional(producerName);
return *this;
}

const std::string& ProducerConfiguration::getProducerName() const {
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
return !impl_->producerName ? emptyString : impl_->producerName.value();
}

ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
impl_->initialSequenceId = boost::make_optional(initialSequenceId);
return *this;
}

int64_t ProducerConfiguration::getInitialSequenceId() const {
return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value();
}

ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
Expand Down
Loading

0 comments on commit 18dcdd5

Please sign in to comment.