Skip to content

Commit fc1cf25

Browse files
authored
[client-cpp] add subscription properties to consumer for cpp (apache#15020)
# Motivation Pulsar already support entry filter, But pulsar-client-cpp ConsumerImpl not contains subscriptionProperties Option. So, this PR want to solve it. # Modifications Enable subscriptionProperties on consumer for cpp client.
1 parent 8757c3a commit fc1cf25

8 files changed

+45
-3
lines changed

pulsar-client-cpp/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ lib*.so*
4242
/examples/SampleConsumerListener
4343
/examples/SampleConsumerListenerCApi
4444
/examples/SampleReaderCApi
45+
/examples/SampleFileLogger
4546
/tests/main
4647
/perf/perfProducer
4748
/perf/perfConsumer

pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h

+15
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
424424
*/
425425
ConsumerConfiguration& setProperties(const std::map<std::string, std::string>& properties);
426426

427+
/**
428+
* Get all the subscription properties attached to this subscription.
429+
*/
430+
std::map<std::string, std::string>& getSubscriptionProperties() const;
431+
432+
/**
433+
* Sets a new subscription properties for this subscription.
434+
* Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to
435+
* create a subscription if they use different properties.
436+
*
437+
* @param subscriptionProperties all the subscription properties in the provided map
438+
*/
439+
ConsumerConfiguration& setSubscriptionProperties(
440+
const std::map<std::string, std::string>& subscriptionProperties);
441+
427442
/**
428443
* Set the Priority Level for consumer (0 is the default value and means the highest priority).
429444
*

pulsar-client-cpp/lib/Commands.cc

+8
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
268268
const std::string& consumerName, SubscriptionMode subscriptionMode,
269269
Optional<MessageId> startMessageId, bool readCompacted,
270270
const std::map<std::string, std::string>& metadata,
271+
const std::map<std::string, std::string>& subscriptionProperties,
271272
const SchemaInfo& schemaInfo,
272273
CommandSubscribe_InitialPosition subscriptionInitialPosition,
273274
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
@@ -308,6 +309,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
308309
subscribe->mutable_metadata()->AddAllocated(keyValue);
309310
}
310311

312+
for (const auto& subscriptionProperty : subscriptionProperties) {
313+
proto::KeyValue* keyValue = proto::KeyValue().New();
314+
keyValue->set_key(subscriptionProperty.first);
315+
keyValue->set_value(subscriptionProperty.second);
316+
subscribe->mutable_subscription_properties()->AddAllocated(keyValue);
317+
}
318+
311319
if (subType == CommandSubscribe_SubType_Key_Shared) {
312320
KeySharedMeta& ksm = *subscribe->mutable_keysharedmeta();
313321
switch (keySharedPolicy.getKeySharedMode()) {

pulsar-client-cpp/lib/Commands.h

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class Commands {
8888
proto::CommandSubscribe_SubType subType, const std::string& consumerName,
8989
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
9090
bool readCompacted, const std::map<std::string, std::string>& metadata,
91+
const std::map<std::string, std::string>& subscriptionProperties,
9192
const SchemaInfo& schemaInfo,
9293
proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
9394
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,

pulsar-client-cpp/lib/ConsumerConfiguration.cc

+12
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,18 @@ ConsumerConfiguration& ConsumerConfiguration::setProperties(
214214
return *this;
215215
}
216216

217+
std::map<std::string, std::string>& ConsumerConfiguration::getSubscriptionProperties() const {
218+
return impl_->subscriptionProperties;
219+
}
220+
221+
ConsumerConfiguration& ConsumerConfiguration::setSubscriptionProperties(
222+
const std::map<std::string, std::string>& subscriptionProperties) {
223+
for (const auto& subscriptionProperty : subscriptionProperties) {
224+
impl_->subscriptionProperties.emplace(subscriptionProperty.first, subscriptionProperty.second);
225+
}
226+
return *this;
227+
}
228+
217229
ConsumerConfiguration& ConsumerConfiguration::setPriorityLevel(int priorityLevel) {
218230
if (priorityLevel < 0) {
219231
throw std::invalid_argument("Consumer Config Exception: PriorityLevel should be nonnegative number.");

pulsar-client-cpp/lib/ConsumerConfigurationImpl.h

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ struct ConsumerConfigurationImpl {
4848
int patternAutoDiscoveryPeriod{60};
4949
bool replicateSubscriptionStateEnabled{false};
5050
std::map<std::string, std::string> properties;
51+
std::map<std::string, std::string> subscriptionProperties;
5152
int priorityLevel{0};
5253
KeySharedPolicy keySharedPolicy;
5354
size_t maxPendingChunkedMessage{10};

pulsar-client-cpp/lib/ConsumerImpl.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,9 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
189189
uint64_t requestId = client->newRequestId();
190190
SharedBuffer cmd = Commands::newSubscribe(
191191
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
192-
startMessageId, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
193-
config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(),
194-
config_.getPriorityLevel());
192+
startMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
193+
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
194+
config_.getKeySharedPolicy(), config_.getPriorityLevel());
195195
cnx->sendRequestWithId(cmd, requestId)
196196
.addListener(
197197
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));

pulsar-client-cpp/tests/ConsumerConfigurationTest.cc

+4
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
139139
ASSERT_EQ(conf.getProperties()["k1"], "v1");
140140
ASSERT_EQ(conf.hasProperty("k1"), true);
141141

142+
std::map<std::string, std::string> subscriptionProperties = {{"k1", "v1"}};
143+
conf.setSubscriptionProperties(subscriptionProperties);
144+
ASSERT_EQ(conf.getSubscriptionProperties()["k1"], "v1");
145+
142146
conf.setPriorityLevel(1);
143147
ASSERT_EQ(conf.getPriorityLevel(), 1);
144148

0 commit comments

Comments
 (0)