Skip to content

Commit

Permalink
Fix encrption bug with chunked message (#13689)
Browse files Browse the repository at this point in the history
# Motivation

Fix issue #13688.

Send chunking message failed with `org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when encryption is enabled.

### Modifications

The root cause is that all chunked messages share the same msgMetadata object.
The `EncryptionKeys` will be repeated added into message metadata. 
And proto buffer objects do not support serialization with bytes value type.
  • Loading branch information
Jason918 authored Jan 18, 2022
1 parent 9c94cd7 commit c1ff87c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -2620,6 +2621,38 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testCryptoWithChunking() throws Exception {
final String topic = "persistent://my-property/my-ns/testCryptoWithChunking" + System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";

this.conf.setMaxMessageSize(1000);

@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
.enableChunking(true)
.enableBatching(false)
.addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile)
.create();

byte[] data = RandomUtils.nextBytes(5100);
MessageId id = producer1.send(data);
log.info("Message Id={}", id);

MessageImpl<byte[]> message;
message = (MessageImpl<byte[]>) consumer1.receive();
Assert.assertEquals(message.getData(), data);
Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1);
}

@Test
public void testDefaultCryptoKeyReader() throws Exception {
final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public synchronized void encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
return;
}

msgMetadata.clearEncryptionKeys();
// Update message metadata with encrypted data key
for (String keyName : encKeys) {
if (encryptedDataKeyMap.get(keyName) == null) {
Expand Down

0 comments on commit c1ff87c

Please sign in to comment.