Skip to content

Commit

Permalink
[C++] Allow to configure KeyShared with out of order delivery (#7842)
Browse files Browse the repository at this point in the history
Co-authored-by: penghui <penghui@apache.org>
  • Loading branch information
merlimat and codelipenghui authored Nov 6, 2020
1 parent 210b7f4 commit 3142456
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 6 deletions.
12 changes: 12 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <pulsar/ConsumerCryptoFailureAction.h>
#include <pulsar/CryptoKeyReader.h>
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>

namespace pulsar {

Expand Down Expand Up @@ -90,6 +91,17 @@ class PULSAR_PUBLIC ConsumerConfiguration {
ConsumerConfiguration& setConsumerType(ConsumerType consumerType);
ConsumerType getConsumerType() const;

/**
* Set KeyShared subscription policy for consumer.
*
* <p>By default, KeyShared subscription use auto split hash range to maintain consumers. If you want to
* set a different KeyShared policy, you can set by following example:
*
* @param keySharedPolicy The {@link KeySharedPolicy} want to specify
*/
ConsumerConfiguration& setKeySharedPolicy(KeySharedPolicy keySharedPolicy);
KeySharedPolicy getKeySharedPolicy() const;

/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
Expand Down
70 changes: 70 additions & 0 deletions pulsar-client-cpp/include/pulsar/KeySharedPolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/defines.h>

#include <memory>

namespace pulsar {

/**
* KeyShared mode of KeyShared subscription.
*/
enum KeySharedMode
{

/**
* Auto split while new consumer connected.
*/
AUTO_SPLIT = 0,

/**
* New consumer with fixed hash range to attach the topic, if new consumer use conflict hash range with
* exits consumers, new consumer will be rejected.
*/
STICKY = 1
};

struct KeySharedPolicyImpl;

class PULSAR_PUBLIC KeySharedPolicy {
public:
KeySharedPolicy();
~KeySharedPolicy();

KeySharedPolicy(const KeySharedPolicy&);
KeySharedPolicy& operator=(const KeySharedPolicy&);

/**
* Create a new instance of KeySharedPolicy with the same
* initial settings as the current one.
*/
KeySharedPolicy clone() const;

KeySharedPolicy& setKeySharedMode(KeySharedMode keySharedMode);
KeySharedMode getKeySharedMode() const;

KeySharedPolicy& setAllowOutOfOrderDelivery(bool allowOutOfOrderDelivery);
bool isAllowOutOfOrderDelivery() const;

private:
std::shared_ptr<KeySharedPolicyImpl> impl_;
};
} // namespace pulsar
16 changes: 15 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
Optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition) {
CommandSubscribe_InitialPosition subscriptionInitialPosition,
KeySharedPolicy keySharedPolicy) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
Expand Down Expand Up @@ -288,6 +289,19 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->mutable_metadata()->AddAllocated(keyValue);
}

if (subType == CommandSubscribe_SubType_Key_Shared) {
KeySharedMeta ksm;
switch (keySharedPolicy.getKeySharedMode()) {
case pulsar::AUTO_SPLIT:
ksm.set_keysharedmode(proto::KeySharedMode::AUTO_SPLIT);
break;
case pulsar::STICKY:
ksm.set_keysharedmode(proto::KeySharedMode::STICKY);
}

ksm.set_allowoutoforderdelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
}

return writeMessageWithSize(cmd);
}

Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <pulsar/defines.h>
#include <pulsar/Message.h>
#include <pulsar/Schema.h>
#include <pulsar/KeySharedPolicy.h>

#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
Expand Down Expand Up @@ -86,7 +87,8 @@ class Commands {
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo,
proto::CommandSubscribe_InitialPosition subscriptionInitialPosition);
proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
KeySharedPolicy keySharedPolicy);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,11 @@ ConsumerConfiguration& ConsumerConfiguration::setProperties(
return *this;
}

ConsumerConfiguration& ConsumerConfiguration::setKeySharedPolicy(KeySharedPolicy keySharedPolicy) {
impl_->keySharedPolicy = keySharedPolicy.clone();
return *this;
}

KeySharedPolicy ConsumerConfiguration::getKeySharedPolicy() const { return impl_->keySharedPolicy; }

} // namespace pulsar
5 changes: 4 additions & 1 deletion pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct ConsumerConfigurationImpl {
InitialPosition subscriptionInitialPosition;
int patternAutoDiscoveryPeriod;
std::map<std::string, std::string> properties;
KeySharedPolicy keySharedPolicy;

ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
Expand All @@ -63,7 +65,8 @@ struct ConsumerConfigurationImpl {
readCompacted(false),
subscriptionInitialPosition(InitialPosition::InitialPositionLatest),
patternAutoDiscoveryPeriod(60),
properties() {}
properties(),
keySharedPolicy() {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
7 changes: 4 additions & 3 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {

ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition());
SharedBuffer cmd =
Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_,
subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition(), config_.getKeySharedPolicy());
cnx->sendRequestWithId(cmd, requestId)
.addListener(
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
Expand Down
54 changes: 54 additions & 0 deletions pulsar-client-cpp/lib/KeySharedPolicy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <lib/KeySharedPolicyImpl.h>

namespace pulsar {

KeySharedPolicy::KeySharedPolicy() : impl_(std::make_shared<KeySharedPolicyImpl>()) {}

KeySharedPolicy::~KeySharedPolicy() {}

KeySharedPolicy::KeySharedPolicy(const KeySharedPolicy &x) : impl_(x.impl_) {}

KeySharedPolicy &KeySharedPolicy::operator=(const KeySharedPolicy &x) {
impl_ = x.impl_;
return *this;
}

KeySharedPolicy &KeySharedPolicy::setKeySharedMode(KeySharedMode keySharedMode) {
impl_->keySharedMode = keySharedMode;
return *this;
}

KeySharedMode KeySharedPolicy::getKeySharedMode() const { return impl_->keySharedMode; }

KeySharedPolicy &KeySharedPolicy::setAllowOutOfOrderDelivery(bool allowOutOfOrderDelivery) {
impl_->allowOutOfOrderDelivery = allowOutOfOrderDelivery;
return *this;
}

bool KeySharedPolicy::isAllowOutOfOrderDelivery() const { return impl_->allowOutOfOrderDelivery; }

KeySharedPolicy KeySharedPolicy::clone() const {
KeySharedPolicy newConf;
newConf.impl_.reset(new KeySharedPolicyImpl(*this->impl_));
return newConf;
}

} // namespace pulsar
31 changes: 31 additions & 0 deletions pulsar-client-cpp/lib/KeySharedPolicyImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/KeySharedPolicy.h>

namespace pulsar {

struct KeySharedPolicyImpl {
bool allowOutOfOrderDelivery;
KeySharedMode keySharedMode;

KeySharedPolicyImpl() : allowOutOfOrderDelivery(false), keySharedMode(AUTO_SPLIT) {}
};
} // namespace pulsar

0 comments on commit 3142456

Please sign in to comment.