Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to disable TLS in cpp. #542

Merged
merged 4 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/examples/ExampleProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.build();

Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExampleProducerWithAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.build();

Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExampleProducerWithFifoMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.build();

Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExampleProducerWithTimedMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.build();

Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExampleProducerWithTransactionalMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.withTopics({FLAGS_topic})
.withTransactionChecker(checker)
Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExamplePushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ int main(int argc, char* argv[]) {
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.withConsumeThreads(4)
.withListener(listener)
Expand Down
1 change: 1 addition & 0 deletions cpp/examples/ExampleSimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
.withSsl(true)
.build())
.subscribe(FLAGS_topic, tag)
.build();
Expand Down
7 changes: 7 additions & 0 deletions cpp/include/rocketmq/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class Configuration {
return request_timeout_;
}

bool withSsl() const {
return withSsl_;
}

protected:
friend class ConfigurationBuilder;

Expand All @@ -52,6 +56,7 @@ class Configuration {
std::string endpoints_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout};
bool withSsl_ = true;
};

class ConfigurationBuilder {
Expand All @@ -62,6 +67,8 @@ class ConfigurationBuilder {

ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout);

ConfigurationBuilder& withSsl(bool enable);

Configuration build();

private:
Expand Down
5 changes: 5 additions & 0 deletions cpp/source/base/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ ConfigurationBuilder& ConfigurationBuilder::withRequestTimeout(std::chrono::mill
return *this;
}

ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
configuration_.withSsl_ = enable;
return *this;
}

Configuration ConfigurationBuilder::build() {
return std::move(configuration_);
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/source/client/ClientManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@

ROCKETMQ_NAMESPACE_BEGIN

ClientManagerImpl::ClientManagerImpl(std::string resource_namespace)
ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl)
: scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)),
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())) {
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
withSsl_(withSsl){
certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
Expand Down Expand Up @@ -469,7 +470,7 @@ std::shared_ptr<grpc::Channel> ClientManagerImpl::createChannel(const std::strin
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories));
return channel;
}

Expand Down
1 change: 1 addition & 0 deletions cpp/source/client/include/ClientConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ClientConfig {
PublisherConfig publisher;
SubscriberConfig subscriber;
Metric metric;
bool withSsl;
std::unique_ptr<opencensus::trace::Sampler> sampler_;
};

Expand Down
3 changes: 2 additions & 1 deletion cpp/source/client/include/ClientManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share
* effectively.
* @param resource_namespace Abstract resource namespace, in which this client manager lives.
*/
explicit ClientManagerImpl(std::string resource_namespace);
explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true);

~ClientManagerImpl() override;

Expand Down Expand Up @@ -242,6 +242,7 @@ class ClientManagerImpl : virtual public ClientManager, public std::enable_share
grpc::ChannelArguments channel_arguments_;

bool trace_{false};
bool withSsl_;
};

ROCKETMQ_NAMESPACE_END
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/ClientImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void ClientImpl::start() {
client_config_.client_id = clientId();

if (!client_manager_) {
client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace);
client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl);
}
client_manager_->start();

Expand Down
1 change: 1 addition & 0 deletions cpp/source/rocketmq/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration)
impl_->withNameServerResolver(std::move(name_server_resolver));
impl_->withCredentialsProvider(configuration.credentialsProvider());
impl_->withRequestTimeout(configuration.requestTimeout());
impl_->withSsl(configuration.withSsl());
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions cpp/source/rocketmq/PushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ PushConsumer PushConsumerBuilder::build() {
}
impl->consumeThreadPoolSize(consume_thread_);
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
impl->withSsl(configuration_.withSsl());
impl->withCredentialsProvider(configuration_.credentialsProvider());
impl->withRequestTimeout(configuration_.requestTimeout());
impl->start();
Expand Down
1 change: 1 addition & 0 deletions cpp/source/rocketmq/SimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
simple_consumer.impl_->withSsl(configuration_.withSsl());

for (const auto& entry : subscriptions_) {
simple_consumer.impl_->subscribe(entry.first, entry.second);
Expand Down
4 changes: 4 additions & 0 deletions cpp/source/rocketmq/include/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class ClientImpl : virtual public Client {
client_config_.request_timeout = absl::FromChrono(request_timeout);
}

void withSsl(bool enable) {
client_config_.withSsl = enable;
}

/**
* Expose for test purpose only.
*/
Expand Down
4 changes: 4 additions & 0 deletions cpp/source/rocketmq/include/SimpleConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_thi
long_polling_duration_ = receive_timeout;
}

void withSsl(bool enable) {
client_config_.withSsl = enable;
}

protected:
void topicsOfInterest(std::vector<std::string> topics) override;

Expand Down