Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boost optional #138

Merged
merged 2 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <pulsar/MessageIdBuilder.h>

#include <boost/optional.hpp>
#include <fstream>

#include "Commands.h"
Expand Down Expand Up @@ -1079,9 +1080,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = Optional<uint64_t>::empty();
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
Expand All @@ -40,7 +41,6 @@
#include "LookupDataResult.h"
#include "SharedBuffer.h"
#include "UtilAllocator.h"
#include "Utils.h"

namespace pulsar {

Expand Down Expand Up @@ -83,7 +83,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
Optional<uint64_t> topicEpoch;
boost::optional<uint64_t> topicEpoch;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down
8 changes: 4 additions & 4 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
Optional<MessageId> startMessageId, bool readCompacted,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
Expand All @@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_allocated_schema(getSchema(schemaInfo));
}

if (startMessageId.is_present()) {
if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
Expand Down Expand Up @@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
Expand All @@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
if (topicEpoch.is_present()) {
if (topicEpoch) {
producer->set_topic_epoch(topicEpoch.value());
}

Expand Down
22 changes: 10 additions & 12 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <boost/optional.hpp>
#include <set>

#include "ProtoApiEnums.h"
#include "SharedBuffer.h"
#include "Utils.h"

using namespace pulsar;

Expand Down Expand Up @@ -89,16 +89,14 @@ class Commands {
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);

static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
int priorityLevel = 0);
static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy, int priorityLevel = 0);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand All @@ -107,7 +105,7 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch);

static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
CommandAck_AckType ackType, CommandAck_ValidationError validationError);
Expand Down
40 changes: 20 additions & 20 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
waitingForZeroQueueSizeMessage(false),
Expand Down Expand Up @@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
const auto startMessageId = clearReceiveQueue();
const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
? startMessageId
: Optional<MessageId>::empty();
const auto subscribeMessageId =
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
startMessageId_ = startMessageId;
lockForMessageId.unlock();

Expand Down Expand Up @@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
});
}

Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx) {
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
Expand Down Expand Up @@ -422,14 +422,14 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
return Optional<SharedBuffer>::empty();
return boost::none;
}

chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
return Optional<SharedBuffer>::empty();
return boost::none;
}

LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
Expand All @@ -438,9 +438,9 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
auto wholePayload = chunkedMsgCtx.getBuffer();
chunkedMessageCache_.remove(uuid);
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
return Optional<SharedBuffer>::of(wholePayload);
return wholePayload;
} else {
return Optional<SharedBuffer>::empty();
return boost::none;
}
}

Expand Down Expand Up @@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
const auto& messageIdData = msg.message_id();
auto messageId = MessageIdBuilder::from(messageIdData).build();
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
if (optionalPayload.is_present()) {
if (optionalPayload) {
payload = optionalPayload.value();
} else {
return;
Expand Down Expand Up @@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
m.impl_->convertPayloadToKeyValue(config_.getSchema());

const auto startMessageId = startMessageId_.get();
if (isPersistent_ && startMessageId.is_present() &&
if (isPersistent_ && startMessageId &&
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
m.getMessageId().entryId() == startMessageId.value().entryId() &&
isPriorEntryIndex(m.getMessageId().entryId())) {
Expand Down Expand Up @@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setTopicName(batchedMessage.getTopicName());
msg.impl_->convertPayloadToKeyValue(config_.getSchema());

if (startMessageId.is_present()) {
if (startMessageId) {
const MessageId& msgId = msg.getMessageId();

// If we are receiving a batch message, we need to discard messages that were prior
Expand Down Expand Up @@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
* was
* not seen by the application
*/
Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
bool expectedDuringSeek = true;
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
return Optional<MessageId>::of(seekMessageId_.get());
return seekMessageId_.get();
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
return startMessageId_.get();
}
Expand All @@ -943,12 +943,12 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() - 1)
.build();
return Optional<MessageId>::of(previousMessageId);
return previousMessageId;
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
// in the past
return Optional<MessageId>::of(lastDequedMessageId_);
return lastDequedMessageId_;
} else {
// No message was received or dequeued by this consumer. Next message would still be the
// startMessageId
Expand Down
17 changes: 9 additions & 8 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <pulsar/Reader.h>

#include <boost/optional.hpp>
#include <functional>
#include <memory>

Expand Down Expand Up @@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase {
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
Optional<MessageId> startMessageId = Optional<MessageId>::empty());
boost::optional<MessageId> startMessageId = boost::none);
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
Expand Down Expand Up @@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase {
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback);

Optional<MessageId> clearReceiveQueue();
boost::optional<MessageId> clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
ResultCallback callback);

Expand Down Expand Up @@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastMessageIdInBroker_{MessageId::earliest()};

std::atomic_bool duringSeek_{false};
Synchronized<Optional<MessageId>> startMessageId_{Optional<MessageId>::empty()};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};

class ChunkedMessageCtx {
Expand Down Expand Up @@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase {
* @return the concatenated payload if chunks are concatenated into a completed message payload
* successfully, else Optional::empty()
*/
Optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx);
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx);

friend class PulsarFriend;

Expand Down
15 changes: 7 additions & 8 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
for (int i = 0; i < numberPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto optConsumer = consumers_.find(topicPartitionName);
if (optConsumer.is_empty()) {
if (!optConsumer) {
LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
callback(ResultUnknownError);
continue;
Expand All @@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);

auto optConsumer = consumers_.remove(topicPartitionName);
if (optConsumer.is_present()) {
if (optConsumer) {
optConsumer.value()->pauseMessageListener();
}

Expand Down Expand Up @@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
const std::string& topicPartitionName = msgId.getTopicName();
auto optConsumer = consumers_.find(topicPartitionName);

if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->acknowledgeAsync(msgId, callback);
} else {
Expand Down Expand Up @@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
};
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
Expand All @@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
auto optConsumer = consumers_.find(msgId.getTopicName());

if (optConsumer.is_present()) {
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->negativeAcknowledge(msgId);
}
Expand Down Expand Up @@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const {
return false;
}

return consumers_
.findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
.is_empty();
return !consumers_.findFirstValueIf(
[](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
}

uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
Expand Down
8 changes: 4 additions & 4 deletions lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
}

ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
impl_->producerName = Optional<std::string>::of(producerName);
impl_->producerName = boost::make_optional(producerName);
return *this;
}

const std::string& ProducerConfiguration::getProducerName() const {
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
return !impl_->producerName ? emptyString : impl_->producerName.value();
}

ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
impl_->initialSequenceId = boost::make_optional(initialSequenceId);
return *this;
}

int64_t ProducerConfiguration::getInitialSequenceId() const {
return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value();
}

ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
Expand Down
Loading