Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added end to end encryption in C++ client #1129

Merged
merged 1 commit into from
Feb 2, 2018

Conversation

saandrews
Copy link
Contributor

Motivation

C++ client implementation to support end to end encryption based on PIP4

  • It supports RSA encryption at the moment.
  • Added unit test and added option to the perf test to send and receive encrypted messages.
  • Tested compatibility for encrypted messages between Java and C++ client

@saandrews saandrews added this to the 1.22.0-incubating milestone Jan 29, 2018
@merlimat merlimat added the type/feature The PR added a new feature or issue requested a new feature label Jan 29, 2018
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change LGTM, just few minor comment/suggestions


class CryptoKeyReader {

public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting will need to be adjusted, using the make format command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -93,6 +93,10 @@ void BatchMessageContainer::sendMessage() {
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
compressPayLoad();

SharedBuffer encryptedPayload;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the SharedBuffer will need to malloc some stuff, can we just doing when encryption is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I have allocated a byte to have valid object. Will try to refactor it before merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking again, it seems we are ok to without allocating anything as well.
https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/lib/SharedBuffer.h#L46
The ptr will be null and will be an issue if we use the buffer at this point. We will error out if encryption fails anyway, so guaranteed to have non null if success. Do you see any issues @merlimat ?

boost::mutex mutex_;

int dataKeyLen_;
std::unique_ptr<unsigned char []> dataKey_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stick with boost::scoped_ptr instead of std::unique_ptr which depends on C++11

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to scoped_array


int tagLen_;
int ivLen_;
std::unique_ptr<unsigned char []> iv_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, boost::scoped_ptr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to scoped_array

@@ -262,12 +295,21 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
SharedBuffer& payload = msg.impl_->payload;

uint32_t uncompressedSize = payload.readableBytes();
uint32_t compressedSize = uncompressedSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ways it sounds, it's a bit confusing ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) Agree. Changed to generic name.

compressedSize = payload.readableBytes();

// Encrypt the payload if enabled
SharedBuffer encryptedPayload;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check whether it's required before creating the SharedBuffer

@@ -23,7 +23,9 @@

rm -rf ./pulsar-dist
mkdir pulsar-dist
tar xfz ../all/target/apache-pulsar*bin.tar.gz -C pulsar-dist --strip-components 1
for i in `ls ../all/target/apache-pulsar*bin.tar.gz`; do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation here is that there will be a single tgz left after the build

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats right. I ended up with multiple bin.tar.gz files and tar failed in Mac.

#ifndef LIB_CONSUMERCRYPTOFAILUREACTION_H_
#define LIB_CONSUMERCRYPTOFAILUREACTION_H_

using namespace pulsar;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

@@ -40,6 +40,7 @@
#include "ExecutorService.h"
#include <boost/asio.hpp>
#include "ProducerImpl.h"
#include "MessageCrypto.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -74,4 +74,30 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
}
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}

bool ConsumerConfiguration::isEncryptionEnabled() const {
if (impl_->cryptoKeyReader != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return impl_->cryptoKeyReader != NULL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -31,13 +31,18 @@ struct ConsumerConfigurationImpl {
int receiverQueueSize;
std::string consumerName;
long brokerConsumerStatsCacheTimeInMs;
CryptoKeyReader* cryptoKeyReader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use shared pointer instead like we do for MessageRouter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly different use case here. It has a reference to the application allocated implementation of CryptoKeyReader

sendTimer_() {
sendTimer_(),
msgCrypto_(),
dataKeyGenInterval_(4 * 60 * 60) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataKeyGenIntervalInMs_ (time unit missing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@saandrews saandrews force-pushed the cppencryption branch 2 times, most recently from d104d05 to dfac0da Compare January 30, 2018 22:24

const CryptoKeyReader& ConsumerConfiguration::getCryptoKeyReader() const { return *(impl_->cryptoKeyReader); }

ConsumerConfiguration& ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReader& cryptoKeyReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of getting a reference and storing a pointer, we should take either a copy or a shared_ptr from the user. That would avoid any problem on the life cycle of the crypto reader. Otherwise the use needs to know it cannot be destructed and that will lead to unexpected segfaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to shared_ptr.

@merlimat
Copy link
Contributor

@saandrews Change looks good. Just the comment on using shared_ptr instead of relying on user to keep the object alive.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@saandrews saandrews force-pushed the cppencryption branch 5 times, most recently from 660e99c to 3b70624 Compare February 2, 2018 00:05
@merlimat merlimat merged commit 9a081ad into apache:master Feb 2, 2018
sijie added a commit to sijie/pulsar that referenced this pull request Aug 8, 2020
…_capture_response

Capture response for broker interceptor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants