From d7387d7375765b7ede726ecd7df221b8e4901faa Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Mon, 10 Jan 2022 11:40:47 +0800 Subject: [PATCH] fix encrption bug with chunked message --- .../api/SimpleProducerConsumerTest.java | 33 +++++++++++++++++++ .../client/impl/crypto/MessageCryptoBc.java | 1 + 2 files changed, 34 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 63dbb8f4d0afd..39ffafc4b3296 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -2620,6 +2621,38 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe log.info("-- Exiting {} test --", methodName); } + @Test + public void testCryptoWithChunking() throws Exception { + final String topic = "persistent://my-property/my-ns/testCryptoWithChunking" + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + + this.conf.setMaxMessageSize(1000); + + @Cleanup + PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe(); + @Cleanup + Producer producer1 = pulsarClient.newProducer().topic(topic) + .enableChunking(true) + .enableBatching(false) + .addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile) + .create(); + + byte[] data = RandomUtils.nextBytes(5100); + MessageId id = producer1.send(data); + log.info("Message Id={}", id); + + MessageImpl message; + message = (MessageImpl) consumer1.receive(); + Assert.assertEquals(message.getData(), data); + Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1); + } + @Test public void testDefaultCryptoKeyReader() throws Exception { final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis(); diff --git a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java index e4f3200735edb..4ce457401cd70 100644 --- a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java +++ b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java @@ -389,6 +389,7 @@ public synchronized void encrypt(Set encKeys, CryptoKeyReader keyReader, return; } + msgMetadata.clearEncryptionKeys(); // Update message metadata with encrypted data key for (String keyName : encKeys) { if (encryptedDataKeyMap.get(keyName) == null) {