From d89f7c0b03744c6401c771e58cc601ffa55113ee Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Wed, 14 Aug 2024 10:26:47 +0800 Subject: [PATCH 1/8] [fix][client] Fix for early hit `beforeConsume` for MultiTopicConsumer (#23141) (cherry picked from commit c07b158f003c5a5623296189f0932d7058d2e75a) --- .../pulsar/client/api/InterceptorsTest.java | 45 +++++++++----- .../client/impl/MultiTopicsConsumerImpl.java | 58 +++++++++++++++++-- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index b3f5ed3b487d6..8f239aea1f0b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -27,8 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; @@ -66,6 +66,12 @@ public Object[][] getReceiverQueueSize() { return new Object[][] { { 0 }, { 1000 } }; } + @DataProvider(name = "topics") + public Object[][] getTopics() { + return new Object[][] {{Lists.newArrayList("persistent://my-property/my-ns/my-topic") }, + { Lists.newArrayList("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }}; + } + @Test public void testProducerInterceptor() throws Exception { Map> ackCallback = new HashMap<>(); @@ -390,9 +396,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -436,13 +442,19 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId int keyCount = 0; for (int i = 0; i < 2; i++) { - Message received = consumer.receive(); + Message received; + if (i % 2 == 0) { + received = consumer.receive(); + } else { + received = consumer.receiveAsync().join(); + } MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { if ("beforeConsumer".equals(keyValue.getKey())) { keyCount++; } } + Assert.assertEquals(keyCount, i + 1); consumer.acknowledge(received); } Assert.assertEquals(2, keyCount); @@ -462,9 +474,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -599,8 +611,8 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForNegativeAcksSend(List topics) throws PulsarClientException, InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -627,6 +639,7 @@ public void onAcknowledgeCumulative(Consumer consumer, MessageId message @Override public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } @@ -637,7 +650,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId }; Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionType(SubscriptionType.Failover) .intercept(interceptor) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -645,7 +658,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId .subscribe(); Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); for (int i = 0; i < totalNumOfMessages; i++) { @@ -669,8 +682,9 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForAckTimeoutSend(List topics) throws PulsarClientException, + InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -701,16 +715,17 @@ public void onNegativeAcksSend(Consumer consumer, Set message @Override public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } }; Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionName("foo") .intercept(interceptor) .ackTimeout(2, TimeUnit.SECONDS) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index b01c25d215bdd..be618744180cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -104,6 +105,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private volatile BatchMessageIdImpl startMessageId = null; private final long startMessageRollbackDurationInSec; + private final ConsumerInterceptors internalConsumerInterceptors; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { @@ -133,6 +135,11 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { long startMessageRollbackDurationInSec) { super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); + if (interceptors != null) { + this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors); + } else { + this.internalConsumerInterceptors = null; + } checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); @@ -315,7 +322,8 @@ private void messageReceived(ConsumerImpl consumer, Message message) { CompletableFuture> receivedFuture = nextPendingReceive(); if (receivedFuture != null) { unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount()); - completePendingReceive(receivedFuture, topicMessage); + final Message interceptMessage = beforeConsume(topicMessage); + completePendingReceive(receivedFuture, interceptMessage); } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } @@ -366,7 +374,7 @@ protected Message internalReceive() throws PulsarClientException { } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - return message; + return beforeConsume(message); } catch (Exception e) { throw PulsarClientException.unwrap(e); } @@ -393,6 +401,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC } } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); + message = beforeConsume(message); } resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -463,7 +472,7 @@ protected CompletableFuture> internalReceiveAsync() { checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); + result.complete(beforeConsume(message)); } }); return result; @@ -1119,7 +1128,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, this.internalConsumerInterceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); } @@ -1503,4 +1512,45 @@ public void tryAcknowledgeMessage(Message msg) { acknowledgeCumulativeAsync(msg); } } + + private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { + return new ConsumerInterceptors(new ArrayList<>()) { + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledge(consumer, messageId, exception); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, + MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception); + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set set) { + multiTopicInterceptors.onNegativeAcksSend(consumer, set); + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set set) { + multiTopicInterceptors.onAckTimeoutSend(consumer, set); + } + + @Override + public void onPartitionsChange(String topicName, int partitions) { + multiTopicInterceptors.onPartitionsChange(topicName, partitions); + } + + @Override + public void close() throws IOException { + multiTopicInterceptors.close(); + } + }; + } } From e0a9308090036cb94fbc2c8ff5df00ec1fc5d6ae Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 28 Sep 2023 18:39:23 +0800 Subject: [PATCH 2/8] InflightReadsLimiter - limit the memory used by reads end-to-end (#5920) * InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (#18245) * InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) Motivation: Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value) The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache. When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them Modifications: - Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads. - Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB - The feature is disabled by default - Add new metrics to track the values * Change error message * checkstyle * Fix license * remove duplicate method after cherry-pick * Rename onDeallocate (cherry picked from commit 6fec66b12b04a37e4c2b05d78d4e33b380c270df) (cherry picked from commit 47c98e57de5eeb5a114c05eb17698d3fd2b3342a) * [fix][broker][branch-2.10] limit the memory used by reads end-to-end (cherry picked from commit eeb80e1092b6d289f1adbfd65b87cecc9b29b900) * remove gpg plugin * remove release profile * remove release plugin * Revert "remove release plugin" This reverts commit 20522ea705e95930af0e75ffad9f19baeb7a6063. * Revert "remove release profile" This reverts commit 64627fdfbfe1fc4b93dbaa5663e1347f19e05850. * Revert "remove gpg plugin" This reverts commit 8054d592e4cdca522062234461b348ee27160286. --------- Co-authored-by: Enrico Olivelli --- conf/broker.conf | 10 +- .../mledger/ManagedLedgerFactoryConfig.java | 5 + .../bookkeeper/mledger/impl/EntryImpl.java | 25 +++ .../impl/cache/InflightReadsLimiter.java | 137 ++++++++++++++ .../impl/cache/RangeEntryCacheImpl.java | 107 +++++++++++ .../cache/RangeEntryCacheManagerImpl.java | 8 +- .../mledger/impl/EntryCacheManagerTest.java | 3 + .../mledger/impl/EntryCacheTest.java | 2 + .../impl/cache/InflightReadsLimiterTest.java | 172 ++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 6 + .../broker/ManagedLedgerClientFactory.java | 2 + .../service/AbstractBaseDispatcher.java | 10 + .../service/PulsarCommandSenderImpl.java | 16 +- ...PersistentDispatcherMultipleConsumers.java | 4 +- ...tStreamingDispatcherMultipleConsumers.java | 3 + 15 files changed, 506 insertions(+), 4 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java diff --git a/conf/broker.conf b/conf/broker.conf index 9d7c68bc34ed7..466b4d8c0f332 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1064,7 +1064,15 @@ managedLedgerCursorRolloverTimeInSeconds=14400 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 -# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher +# Maximum amount of memory used hold data read from storage (or from the cache). +# This mechanism prevents the broker to have too many concurrent +# reads from storage and fall into Out of Memory errors in case +# of multiple concurrent reads to multiple concurrent consumers. +# Set 0 in order to disable the feature. +# +managedLedgerMaxReadsInFlightSizeInMB=0 + +# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 78314be45c390..7db020969b703 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -57,6 +57,11 @@ public class ManagedLedgerFactoryConfig { */ private boolean copyEntriesInCache = false; + /** + * Maximum number of (estimated) data in-flight reading from storage and the cache. + */ + private long managedLedgerMaxReadsInFlightSize = 0; + /** * Whether trace managed ledger task execution time. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 3050309239899..ab0c4ec28d2b1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -45,6 +45,8 @@ protected EntryImpl newObject(Handle handle) { private long entryId; ByteBuf data; + private Runnable onDeallocate; + public static EntryImpl create(LedgerEntry ledgerEntry) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); @@ -103,6 +105,22 @@ private EntryImpl(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } + public void onDeallocate(Runnable r) { + if (this.onDeallocate == null) { + this.onDeallocate = r; + } else { + // this is not expected to happen + Runnable previous = this.onDeallocate; + this.onDeallocate = () -> { + try { + previous.run(); + } finally { + r.run(); + } + }; + } + } + public long getTimestamp() { return timestamp; } @@ -160,6 +178,13 @@ public ReferenceCounted touch(Object hint) { @Override protected void deallocate() { // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it + if (onDeallocate != null) { + try { + onDeallocate.run(); + } finally { + onDeallocate = null; + } + } data.release(); data = null; timestamp = -1; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java new file mode 100644 index 0000000000000..f3848b6ddd99f --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import com.google.common.annotations.VisibleForTesting; +import io.prometheus.client.Gauge; +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class InflightReadsLimiter { + + private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge + .build() + .name("pulsar_ml_reads_inflight_bytes") + .help("Estimated number of bytes retained by data read from storage or cache") + .register(); + + private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge + .build() + .name("pulsar_ml_reads_available_inflight_bytes") + .help("Available space for inflight data read from storage or cache") + .register(); + + private final long maxReadsInFlightSize; + private long remainingBytes; + + public InflightReadsLimiter(long maxReadsInFlightSize) { + if (maxReadsInFlightSize <= 0) { + // set it to -1 in order to show in the metrics that the metric is not available + PULSAR_ML_READS_BUFFER_SIZE.set(-1); + PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1); + } + this.maxReadsInFlightSize = maxReadsInFlightSize; + this.remainingBytes = maxReadsInFlightSize; + } + + @VisibleForTesting + public synchronized long getRemainingBytes() { + return remainingBytes; + } + + @AllArgsConstructor + @ToString + static class Handle { + final long acquiredPermits; + final boolean success; + final int trials; + + final long creationTime; + } + + private static final Handle DISABLED = new Handle(0, true, 0, -1); + + Handle acquire(long permits, Handle current) { + if (maxReadsInFlightSize <= 0) { + // feature is disabled + return DISABLED; + } + synchronized (this) { + try { + if (current == null) { + if (remainingBytes == 0) { + return new Handle(0, false, 1, System.currentTimeMillis()); + } + if (remainingBytes >= permits) { + remainingBytes -= permits; + return new Handle(permits, true, 1, System.currentTimeMillis()); + } else { + long possible = remainingBytes; + remainingBytes = 0; + return new Handle(possible, false, 1, System.currentTimeMillis()); + } + } else { + if (current.trials >= 4 && current.acquiredPermits > 0) { + remainingBytes += current.acquiredPermits; + return new Handle(0, false, 1, current.creationTime); + } + if (remainingBytes == 0) { + return new Handle(current.acquiredPermits, false, current.trials + 1, + current.creationTime); + } + long needed = permits - current.acquiredPermits; + if (remainingBytes >= needed) { + remainingBytes -= needed; + return new Handle(permits, true, current.trials + 1, current.creationTime); + } else { + long possible = remainingBytes; + remainingBytes = 0; + return new Handle(current.acquiredPermits + possible, false, + current.trials + 1, current.creationTime); + } + } + } finally { + updateMetrics(); + } + } + } + + void release(Handle handle) { + if (handle == DISABLED) { + return; + } + synchronized (this) { + remainingBytes += handle.acquiredPermits; + updateMetrics(); + } + } + + private synchronized void updateMetrics() { + PULSAR_ML_READS_BUFFER_SIZE.set(maxReadsInFlightSize - remainingBytes); + PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(remainingBytes); + } + + public boolean isDisabled() { + return maxReadsInFlightSize <= 0; + } + + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 4f322cd71d8d1..3e7401bc51271 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import io.netty.buffer.ByteBuf; @@ -28,11 +29,15 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -48,18 +53,26 @@ */ public class RangeEntryCacheImpl implements EntryCache { + /** + * Overhead per-entry to take into account the envelope. + */ + private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64; + private final RangeEntryCacheManagerImpl manager; private final ManagedLedgerImpl ml; private ManagedLedgerInterceptor interceptor; private final RangeCache entries; private final boolean copyEntries; + private volatile long estimatedEntrySize = 10 * 1024; + private final long readEntryTimeoutMillis; private static final double MB = 1024 * 1024; public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); + this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; @@ -68,11 +81,21 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl } } + @VisibleForTesting + ManagedLedgerConfig getManagedLedgerConfig() { + return ml.getConfig(); + } + @Override public String getName() { return ml.getName(); } + @VisibleForTesting + InflightReadsLimiter getPendingReadsLimiter() { + return manager.getInflightReadsLimiter(); + } + public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, // preferDirect 0, // nHeapArenas, PooledByteBufAllocator.defaultNumDirectArena(), // nDirectArena @@ -256,6 +279,19 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @SuppressWarnings({ "unchecked", "rawtypes" }) private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null); + } + + void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) { + + final AsyncCallbacks.ReadEntriesCallback callback = + handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader, + originalCallback, ctx, handle); + if (callback == null) { + return; + } + final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry); @@ -329,6 +365,77 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo } } + + private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh, + long firstEntry, long lastEntry, + boolean shouldCacheEntry, + AsyncCallbacks.ReadEntriesCallback + originalCallback, + Object ctx, + InflightReadsLimiter.Handle handle) { + InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); + if (pendingReadsLimiter.isDisabled()) { + return originalCallback; + } + long estimatedReadSize = (1 + lastEntry - firstEntry) + * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + final AsyncCallbacks.ReadEntriesCallback callback; + InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle); + if (!newHandle.success) { + long now = System.currentTimeMillis(); + if (now - newHandle.creationTime > readEntryTimeoutMillis) { + String message = "Time-out elapsed while acquiring enough permits " + + "on the memory limiter to read from ledger " + + lh.getId() + + ", " + getName() + + ", estimated read size " + estimatedReadSize + " bytes" + + " for " + (1 + lastEntry - firstEntry) + + " entries (check managedLedgerMaxReadsInFlightSizeInMB)"; + log.error(message); + pendingReadsLimiter.release(newHandle); + originalCallback.readEntriesFailed( + new ManagedLedgerException.TooManyRequestsException(message), ctx); + return null; + } + ml.getExecutor().submitOrdered(lh.getId(), () -> { + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, + originalCallback, ctx, newHandle); + return null; + }); + return null; + } else { + callback = new AsyncCallbacks.ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List entries, Object ctx) { + if (!entries.isEmpty()) { + long size = entries.get(0).getLength(); + estimatedEntrySize = size; + + AtomicInteger remainingCount = new AtomicInteger(entries.size()); + for (Entry entry : entries) { + ((EntryImpl) entry).onDeallocate(() -> { + if (remainingCount.decrementAndGet() <= 0) { + pendingReadsLimiter.release(newHandle); + } + }); + } + } else { + pendingReadsLimiter.release(newHandle); + } + originalCallback.readEntriesComplete(entries, ctx); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + pendingReadsLimiter.release(newHandle); + originalCallback.readEntriesFailed(exception, ctx); + } + }; + } + return callback; + } + @Override public void clear() { Pair removedPair = entries.clear(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 8eb1fb0e7c8ab..8250c41c01c08 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -51,14 +51,16 @@ public class RangeEntryCacheManagerImpl implements EntryCacheManager { private final ManagedLedgerFactoryImpl mlFactory; protected final ManagedLedgerFactoryMBeanImpl mlFactoryMBean; + private final InflightReadsLimiter inflightReadsLimiter; protected static final double MB = 1024 * 1024; - private static final double evictionTriggerThresholdPercent = 0.98; public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) { this.maxSize = factory.getConfig().getMaxCacheSize(); + this.inflightReadsLimiter = new InflightReadsLimiter( + factory.getConfig().getManagedLedgerMaxReadsInFlightSize()); this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark(); this.evictionPolicy = new EntryCacheDefaultEvictionPolicy(); @@ -195,5 +197,9 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor return returnEntry; } + public InflightReadsLimiter getInflightReadsLimiter() { + return inflightReadsLimiter; + } + private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 885655db7672e..db5169202469b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; @@ -62,10 +63,12 @@ protected void setUpTestCase() throws Exception { when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); when(ml1.getExecutor()).thenReturn(super.executor); when(ml1.getFactory()).thenReturn(factory); + when(ml1.getConfig()).thenReturn(new ManagedLedgerConfig()); ml2 = mock(ManagedLedgerImpl.class); when(ml2.getScheduledExecutor()).thenReturn(executor); when(ml2.getName()).thenReturn("cache2"); + when(ml2.getConfig()).thenReturn(new ManagedLedgerConfig()); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index 85ff425c4e295..1bbc3f8b0a4b6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; @@ -58,6 +59,7 @@ protected void setUpTestCase() throws Exception { when(ml.getName()).thenReturn("name"); when(ml.getExecutor()).thenReturn(executor); when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); + when(ml.getConfig()).thenReturn(new ManagedLedgerConfig()); } @Test(timeOut = 5000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java new file mode 100644 index 0000000000000..e1345129a210a --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.Test; + +@Slf4j +public class InflightReadsLimiterTest { + + @Test + public void testDisabled() throws Exception { + + InflightReadsLimiter limiter = new InflightReadsLimiter(0); + assertTrue(limiter.isDisabled()); + + limiter = new InflightReadsLimiter(-1); + assertTrue(limiter.isDisabled()); + + limiter = new InflightReadsLimiter(1); + assertFalse(limiter.isDisabled()); + } + + @Test + public void testBasicAcquireRelease() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + InflightReadsLimiter.Handle handle = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 100); + assertEquals(1, handle.trials); + limiter.release(handle); + assertEquals(100, limiter.getRemainingBytes()); + } + + @Test + public void testNotEnoughPermits() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + InflightReadsLimiter.Handle handle = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 100); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 0); + assertEquals(1, handle2.trials); + + limiter.release(handle); + assertEquals(100, limiter.getRemainingBytes()); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + + @Test + public void testPartialAcquire() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + + InflightReadsLimiter.Handle handle = limiter.acquire(30, null); + assertEquals(70, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 30); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(1, handle2.trials); + + limiter.release(handle); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + + @Test + public void testTooManyTrials() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + + InflightReadsLimiter.Handle handle = limiter.acquire(30, null); + assertEquals(70, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 30); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(1, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(2, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(3, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(4, handle2.trials); + + // too many trials, start from scratch + handle2 = limiter.acquire(100, handle2); + assertEquals(70, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 0); + assertEquals(1, handle2.trials); + + limiter.release(handle); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c0f724a543618..2d695a2d0a696 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1745,6 +1745,12 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " + "inserting in cache") private boolean managedLedgerCacheCopyEntries = false; + + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size for bytes read from storage." + + " This is the memory retained by data read from storage (or cache) until it has been delivered to the" + + " Consumer Netty channel. Use O to disable") + private long managedLedgerMaxReadsInFlightSizeInMB = 0; + @FieldContext( category = CATEGORY_STORAGE_ML, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 234e11bee643e..dc3de7cf6a5d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -63,6 +63,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); + managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize( + conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds( conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index e54a3332a49df..119845f6c64ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty; import com.google.common.collect.ImmutableList; import io.netty.buffer.ByteBuf; +import io.prometheus.client.Gauge; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -59,6 +60,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { protected final Subscription subscription; + private static final Gauge PENDING_BYTES_TO_DISPATCH = Gauge + .build() + .name("pulsar_broker_pending_bytes_to_dispatch") + .help("Amount of bytes loaded in memory to be dispatched to Consumers") + .register(); + protected final ServiceConfiguration serviceConfig; protected final boolean dispatchThrottlingOnBatchMessageEnabled; /** @@ -397,4 +404,7 @@ protected static Pair computeReadLimits(int messagesToRead, int a protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName()); } + protected final void updatePendingBytesToDispatch(long size) { + PENDING_BYTES_TO_DISPATCH.inc(size); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 0d731f86c28e9..d642fd9d10064 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -221,6 +222,10 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); ctx.channel().eventLoop().execute(() -> { + // this list is always accessed in the same thread (the eventLoop here) + // and in the completion of the writePromise + // it is safe to use a simple ArrayList + List entriesToRelease = new ArrayList<>(entries.size()); for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); if (entry == null) { @@ -272,11 +277,20 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName, epoch), ctx.voidPromise()); - entry.release(); + entriesToRelease.add(entry); } // Use an empty write here so that we can just tie the flush with the write promise for last entry ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); + writePromise.addListener((future) -> { + // release the entries only after flushing the channel + // + // InflightReadsLimiter tracks the amount of memory retained by in-flight data to the + // consumer. It counts the memory as being released when the entry is deallocated + // that is that it reaches refcnt=0. + // so we need to call release only when we are sure that Netty released the internal ByteBuf + entriesToRelease.forEach(Entry::release); + }); batchSizes.recyle(); if (batchIndexesAcks != null) { batchIndexesAcks.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 91a155a91574d..23f6ca967875d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -525,8 +525,10 @@ public final synchronized void readEntriesComplete(List entries, Object c if (log.isDebugEnabled()) { log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } - + long size = entries.stream().mapToLong(Entry::getLength).sum(); + updatePendingBytesToDispatch(size); sendMessagesToConsumers(readType, entries); + updatePendingBytesToDispatch(-size); } protected final synchronized void sendMessagesToConsumers(ReadType readType, List entries) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 5235c13dc81fd..6f826cfda4304 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -91,7 +91,10 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger()) .getNextValidPosition((PositionImpl) entry.getPosition())); + long size = entry.getLength(); + updatePendingBytesToDispatch(size); sendMessagesToConsumers(readType, Lists.newArrayList(entry)); + updatePendingBytesToDispatch(-size); ctx.recycle(); } From 7f3906c16971b4049637d8a85253ea83312980d2 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 27 Nov 2023 20:08:39 +0800 Subject: [PATCH 3/8] [improve][sec] Support for Elliptic Curve Cryptography (EC, ECC) (certificates/private keys) (#21621) (cherry picked from commit e1d06b5f54f08c09debab7a9a513b7c173c1779b) --- pom.xml | 2 + .../auth/MockedPulsarServiceBaseTest.java | 57 ++++++- .../tls/ec/TlsWithECCertificateFileTest.java | 130 ++++++++++++++++ .../tls/ec/TlsWithECKeyStoreTest.java | 141 ++++++++++++++++++ .../pulsar/common/util/SecurityUtility.java | 34 ++++- .../ec/broker_client.cert.pem | 9 ++ .../ec/broker_client.csr.pem | 7 + .../ec/broker_client.key-pk8.pem | 5 + .../ec/broker_client.key.pem | 8 + tests/certificate-authority/ec/ca.cert.pem | 10 ++ tests/certificate-authority/ec/ca.cert.srl | 1 + tests/certificate-authority/ec/ca.key.pem | 8 + .../ec/certificate_generation.txt | 34 +++++ .../certificate-authority/ec/client.cert.pem | 8 + tests/certificate-authority/ec/client.csr.pem | 7 + .../ec/client.key-pk8.pem | 5 + tests/certificate-authority/ec/client.key.pem | 8 + .../ec/jks/broker_client.cert.pem | 10 ++ .../ec/jks/broker_client.keystore.jks | Bin 0 -> 2034 bytes .../ec/jks/broker_client.signed.cert.pem | 11 ++ .../certificate-authority/ec/jks/ca.cert.pem | 10 ++ .../certificate-authority/ec/jks/ca.cert.srl | 1 + tests/certificate-authority/ec/jks/ca.key.pem | 8 + .../ec/jks/ca.truststore.jks | Bin 0 -> 742 bytes .../ec/jks/client.cert.pem | 10 ++ .../ec/jks/client.keystore.jks | Bin 0 -> 1988 bytes .../ec/jks/client.signed.cert.pem | 10 ++ .../ec/jks/key_store_generation.txt | 33 ++++ .../ec/jks/server.cert.pem | 10 ++ .../ec/jks/server.keystore.jks | Bin 0 -> 2004 bytes .../ec/jks/server.signed.cert.pem | 10 ++ .../certificate-authority/ec/server.cert.pem | 13 ++ tests/certificate-authority/ec/server.conf | 40 +++++ tests/certificate-authority/ec/server.csr.pem | 7 + .../ec/server.key-pk8.pem | 5 + tests/certificate-authority/ec/server.key.pem | 8 + 36 files changed, 648 insertions(+), 12 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java create mode 100644 tests/certificate-authority/ec/broker_client.cert.pem create mode 100644 tests/certificate-authority/ec/broker_client.csr.pem create mode 100644 tests/certificate-authority/ec/broker_client.key-pk8.pem create mode 100644 tests/certificate-authority/ec/broker_client.key.pem create mode 100644 tests/certificate-authority/ec/ca.cert.pem create mode 100644 tests/certificate-authority/ec/ca.cert.srl create mode 100644 tests/certificate-authority/ec/ca.key.pem create mode 100644 tests/certificate-authority/ec/certificate_generation.txt create mode 100644 tests/certificate-authority/ec/client.cert.pem create mode 100644 tests/certificate-authority/ec/client.csr.pem create mode 100644 tests/certificate-authority/ec/client.key-pk8.pem create mode 100644 tests/certificate-authority/ec/client.key.pem create mode 100644 tests/certificate-authority/ec/jks/broker_client.cert.pem create mode 100644 tests/certificate-authority/ec/jks/broker_client.keystore.jks create mode 100644 tests/certificate-authority/ec/jks/broker_client.signed.cert.pem create mode 100644 tests/certificate-authority/ec/jks/ca.cert.pem create mode 100644 tests/certificate-authority/ec/jks/ca.cert.srl create mode 100644 tests/certificate-authority/ec/jks/ca.key.pem create mode 100644 tests/certificate-authority/ec/jks/ca.truststore.jks create mode 100644 tests/certificate-authority/ec/jks/client.cert.pem create mode 100644 tests/certificate-authority/ec/jks/client.keystore.jks create mode 100644 tests/certificate-authority/ec/jks/client.signed.cert.pem create mode 100644 tests/certificate-authority/ec/jks/key_store_generation.txt create mode 100644 tests/certificate-authority/ec/jks/server.cert.pem create mode 100644 tests/certificate-authority/ec/jks/server.keystore.jks create mode 100644 tests/certificate-authority/ec/jks/server.signed.cert.pem create mode 100644 tests/certificate-authority/ec/server.cert.pem create mode 100644 tests/certificate-authority/ec/server.conf create mode 100644 tests/certificate-authority/ec/server.csr.pem create mode 100644 tests/certificate-authority/ec/server.key-pk8.pem create mode 100644 tests/certificate-authority/ec/server.key.pem diff --git a/pom.xml b/pom.xml index 9f71cab2f2258..07081b8aab9c3 100644 --- a/pom.xml +++ b/pom.xml @@ -1488,6 +1488,8 @@ flexible messaging model and an intuitive client API. **/*.crt **/*.key **/*.csr + **/*.srl + **/*.txt **/*.pem **/*.json **/*.htpasswd diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index c1f75ba141d89..17b14d770f6c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -25,8 +25,11 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import com.google.common.collect.Sets; +import com.google.common.io.Resources; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; + +import java.io.File; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.URI; @@ -42,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; + import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.PulsarMockBookKeeper; @@ -73,6 +77,7 @@ import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; import org.testng.annotations.DataProvider; /** @@ -88,7 +93,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { protected PulsarAdmin admin; protected PulsarClient pulsarClient; protected PortForwarder brokerGateway; - protected boolean enableBrokerGateway = false; + protected boolean enableBrokerGateway = false; protected URL brokerUrl; protected URL brokerUrlTls; @@ -229,7 +234,7 @@ protected final void internalCleanup() throws Exception { mockZooKeeper.shutdown(); mockZooKeeper = null; } - if(sameThreadOrderedSafeExecutor != null) { + if (sameThreadOrderedSafeExecutor != null) { try { sameThreadOrderedSafeExecutor.shutdownNow(); sameThreadOrderedSafeExecutor.awaitTermination(5, TimeUnit.SECONDS); @@ -239,7 +244,7 @@ protected final void internalCleanup() throws Exception { } sameThreadOrderedSafeExecutor = null; } - if(bkExecutor != null) { + if (bkExecutor != null) { try { bkExecutor.shutdownNow(); bkExecutor.awaitTermination(5, TimeUnit.SECONDS); @@ -388,7 +393,7 @@ public static MockZooKeeper createMockZooKeeper() throws Exception { } public static MockZooKeeper createMockZooKeeperGlobal() { - return MockZooKeeper.newInstanceForGlobalZK(MoreExecutors.newDirectExecutorService()); + return MockZooKeeper.newInstanceForGlobalZK(MoreExecutors.newDirectExecutorService()); } public static NonClosableMockBookKeeper createMockBookKeeper(OrderedExecutor executor) throws Exception { @@ -506,7 +511,7 @@ protected void setupDefaultTenantAndNamespace() throws Exception { @DataProvider(name = "invalidPersistentPolicies") public Object[][] incorrectPersistentPolicies() { - return new Object[][] { + return new Object[][]{ {0, 0, 0}, {1, 0, 0}, {0, 0, 1}, @@ -535,7 +540,7 @@ protected void deleteNamespaceWithRetry(String ns, boolean force, PulsarAdmin ad /** * see {@link MockedPulsarServiceBaseTest#deleteNamespaceWithRetry(String, boolean, PulsarAdmin, Collection)} */ - public static void deleteNamespaceWithRetry(String ns, boolean force, PulsarAdmin admin, PulsarService...pulsars) + public static void deleteNamespaceWithRetry(String ns, boolean force, PulsarAdmin admin, PulsarService... pulsars) throws Exception { deleteNamespaceWithRetry(ns, force, admin, Arrays.asList(pulsars)); } @@ -564,4 +569,44 @@ public static void deleteNamespaceWithRetry(String ns, boolean force, PulsarAdmi } private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); + + + // EC certificate + protected static final String TLS_EC_TRUSTED_CERT_PATH = + getAbsolutePath("certificate-authority/ec/ca.cert.pem"); + protected static final String TLS_EC_SERVER_KEY_PATH = + getAbsolutePath("certificate-authority/ec/server.key-pk8.pem"); + protected static final String TLS_EC_SERVER_CERT_PATH = + getAbsolutePath("certificate-authority/ec/server.cert.pem"); + protected static final String TLS_EC_BROKER_CLIENT_KEY_PATH = + getAbsolutePath("certificate-authority/ec/broker_client.key-pk8.pem"); + protected static final String TLS_EC_BROKER_CLIENT_CERT_PATH = + getAbsolutePath("certificate-authority/ec/broker_client.cert.pem"); + protected static final String TLS_EC_CLIENT_KEY_PATH = + getAbsolutePath("certificate-authority/ec/client.key-pk8.pem"); + protected static final String TLS_EC_CLIENT_CERT_PATH = + getAbsolutePath("certificate-authority/ec/client.cert.pem"); + + // EC KeyStore + protected static final String TLS_EC_KS_SERVER_STORE = + getAbsolutePath("certificate-authority/ec/jks/server.keystore.jks"); + protected static final String TLS_EC_KS_SERVER_PASS = "serverpw"; + protected static final String TLS_EC_KS_BROKER_CLIENT_STORE = + getAbsolutePath("certificate-authority/ec/jks/broker_client.keystore.jks"); + protected static final String TLS_EC_KS_BROKER_CLIENT_PASS = "brokerclientpw"; + protected static final String TLS_EC_KS_CLIENT_STORE = + getAbsolutePath("certificate-authority/ec/jks/client.keystore.jks"); + protected static final String TLS_EC_KS_CLIENT_PASS = "clientpw"; + protected static final String TLS_EC_KS_TRUSTED_STORE = + getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks"); + protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw"; + + public static String getAbsolutePath(String resourceName) { + // On Windows, URL#getPath might return a string that starts with a disk name, e.g. "/C:/" + // It's invalid to use this path to open a file, so we need to get the absolute path via File. + return new File(Resources.getResource(resourceName).getPath()).getAbsolutePath(); + + } + + protected static final ObjectMapper mapper = new ObjectMapper(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java new file mode 100644 index 0000000000000..87823321487d2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.security.tls.ec; + + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +@Test +public class TlsWithECCertificateFileTest extends MockedPulsarServiceBaseTest { + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTlsEnabled(true); + conf.setBrokerServicePort(Optional.empty()); + conf.setWebServicePort(Optional.empty()); + conf.setTlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH); + conf.setTlsCertificateFilePath(TLS_EC_SERVER_CERT_PATH); + conf.setTlsKeyFilePath(TLS_EC_SERVER_KEY_PATH); + conf.setBrokerClientTlsEnabled(true); + conf.setBrokerClientTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH); + conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + final Map brokerClientAuthParams = new HashMap<>(); + brokerClientAuthParams.put("tlsCertFile", TLS_EC_BROKER_CLIENT_CERT_PATH); + brokerClientAuthParams.put("tlsKeyFile", TLS_EC_BROKER_CLIENT_KEY_PATH); + conf.setBrokerClientAuthenticationParameters(mapper.writeValueAsString(brokerClientAuthParams)); + conf.setBrokerClientAuthenticationParameters(mapper.writeValueAsString(brokerClientAuthParams)); + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + init(); + admin = pulsar.getAdminClient(); + setupDefaultTenantAndNamespace(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + @Test(expectedExceptions = PulsarClientException.class) + @SneakyThrows + public void testConnectionFailWithoutCertificate() { + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrlTls()) + .build(); + @Cleanup final Producer producer = client.newProducer() + .topic("should_be_failed") + .create(); + } + + + @Test + @SneakyThrows + public void testConnectionSuccessWithCertificate() { + final AuthenticationTls authentication = new AuthenticationTls(TLS_EC_CLIENT_CERT_PATH, TLS_EC_CLIENT_KEY_PATH); + final String topicName = "persistent://public/default/" + UUID.randomUUID(); + final int testMsgNum = 10; + @Cleanup final PulsarAdmin admin = PulsarAdmin.builder() + .authentication(authentication) + .serviceHttpUrl(pulsar.getWebServiceAddressTls()) + .tlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH) + .build(); + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, "sub-1", MessageId.earliest); + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrlTls()) + .authentication(authentication) + .tlsTrustCertsFilePath(TLS_EC_TRUSTED_CERT_PATH) + .build(); + @Cleanup final Producer producer = client.newProducer() + .topic(topicName) + .create(); + @Cleanup final Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .consumerName("cons-1") + .subscribe(); + for (int i = 0; i < testMsgNum; i++) { + producer.send((i + "").getBytes(StandardCharsets.UTF_8)); + } + + for (int i = 0; i < testMsgNum; i++) { + final Message message = consumer.receive(); + assertNotNull(message); + final byte[] b = message.getValue(); + final String s = new String(b, StandardCharsets.UTF_8); + assertEquals(s, i + ""); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java new file mode 100644 index 0000000000000..cd841c13bf675 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.security.tls.ec; + + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + + +@Test +public class TlsWithECKeyStoreTest extends MockedPulsarServiceBaseTest { + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTlsEnabled(true); + conf.setBrokerServicePort(Optional.empty()); + conf.setWebServicePort(Optional.empty()); + conf.setTlsEnabledWithKeyStore(true); + conf.setTlsKeyStore(TLS_EC_KS_SERVER_STORE); + conf.setTlsKeyStorePassword(TLS_EC_KS_SERVER_PASS); + conf.setTlsTrustStore(TLS_EC_KS_TRUSTED_STORE); + conf.setTlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS); + conf.setTlsRequireTrustedClientCertOnConnect(true); + conf.setBrokerClientTlsEnabled(true); + conf.setBrokerClientTlsEnabledWithKeyStore(true); + conf.setBrokerClientTlsTrustStore(TLS_EC_KS_TRUSTED_STORE); + conf.setBrokerClientTlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS); + conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName()); + final Map brokerClientAuthParams = new HashMap<>(); + brokerClientAuthParams.put("keyStorePath", TLS_EC_KS_BROKER_CLIENT_STORE); + brokerClientAuthParams.put("keyStorePassword", TLS_EC_KS_BROKER_CLIENT_PASS); + conf.setBrokerClientAuthenticationParameters(mapper.writeValueAsString(brokerClientAuthParams)); + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + init(); + admin = pulsar.getAdminClient(); + setupDefaultTenantAndNamespace(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test(expectedExceptions = PulsarClientException.class) + @SneakyThrows + public void testConnectionFailWithoutCertificate() { + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrlTls()) + .build(); + @Cleanup final Producer producer = client.newProducer() + .topic("should_be_failed") + .create(); + } + + + @Test + @SneakyThrows + public void testConnectionSuccessWithCertificate() { + final String topicName = "persistent://public/default/" + UUID.randomUUID(); + final int testMsgNum = 10; + final Map clientAuthParams = new HashMap<>(); + clientAuthParams.put("keyStorePath", TLS_EC_KS_CLIENT_STORE); + clientAuthParams.put("keyStorePassword", TLS_EC_KS_CLIENT_PASS); + @Cleanup final PulsarAdmin admin = PulsarAdmin.builder() + .useKeyStoreTls(true) + .tlsTrustStorePath(TLS_EC_KS_TRUSTED_STORE) + .tlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS) + .authentication(AuthenticationKeyStoreTls.class.getName(), mapper.writeValueAsString(clientAuthParams)) + .serviceHttpUrl(pulsar.getWebServiceAddressTls()) + .build(); + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, "sub-1", MessageId.earliest); + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrlTls()) + .useKeyStoreTls(true) + .tlsTrustStorePath(TLS_EC_KS_TRUSTED_STORE) + .tlsTrustStorePassword(TLS_EC_KS_TRUSTED_STORE_PASS) + .authentication(AuthenticationKeyStoreTls.class.getName(), mapper.writeValueAsString(clientAuthParams)) + .build(); + @Cleanup final Producer producer = client.newProducer() + .topic(topicName) + .create(); + @Cleanup final Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .consumerName("cons-1") + .subscribe(); + for (int i = 0; i < testMsgNum; i++) { + producer.send((i + "").getBytes(StandardCharsets.UTF_8)); + } + + for (int i = 0; i < testMsgNum; i++) { + final Message message = consumer.receive(); + assertNotNull(message); + final byte[] b = message.getValue(); + final String s = new String(b, StandardCharsets.UTF_8); + assertEquals(s, i + ""); + } + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java index c288c054bb9f2..caa12f20b1be3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java @@ -48,10 +48,14 @@ import java.security.cert.CertificateException; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.security.spec.InvalidKeySpecException; import java.security.spec.KeySpec; import java.security.spec.PKCS8EncodedKeySpec; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import javax.net.ssl.HostnameVerifier; @@ -79,6 +83,10 @@ public class SecurityUtility { public static final String BC_NON_FIPS_PROVIDER_CLASS = "org.bouncycastle.jce.provider.BouncyCastleProvider"; public static final String CONSCRYPT_PROVIDER_CLASS = "org.conscrypt.OpenSSLProvider"; public static final Provider CONSCRYPT_PROVIDER = loadConscryptProvider(); + private static final List KEY_FACTORIES = Arrays.asList( + createKeyFactory("RSA"), + createKeyFactory("EC") + ); // Security.getProvider("BC") / Security.getProvider("BCFIPS"). // also used to get Factories. e.g. CertificateFactory.getInstance("X.509", "BCFIPS") @@ -504,15 +512,21 @@ public static PrivateKey loadPrivateKeyFromPemStream(InputStream inStream) throw while ((currentLine = reader.readLine()) != null && !currentLine.startsWith("-----END")) { sb.append(currentLine); } - - KeyFactory kf = KeyFactory.getInstance("RSA"); - KeySpec keySpec = new PKCS8EncodedKeySpec(Base64.getDecoder().decode(sb.toString())); - privateKey = kf.generatePrivate(keySpec); - } catch (GeneralSecurityException | IOException e) { + final KeySpec keySpec = new PKCS8EncodedKeySpec(Base64.getDecoder().decode(sb.toString())); + final List failedAlgorithm = new ArrayList<>(KEY_FACTORIES.size()); + for (KeyFactory kf : KEY_FACTORIES) { + try { + return kf.generatePrivate(keySpec); + } catch (InvalidKeySpecException ex) { + failedAlgorithm.add(kf.getAlgorithm()); + } + } + throw new KeyManagementException("The private key algorithm is not supported. attempted: " + + StringUtils.join(failedAlgorithm, ",")); + } catch (IOException e) { throw new KeyManagementException("Private key loading error", e); } - return privateKey; } private static void setupTrustCerts(SslContextBuilder builder, boolean allowInsecureConnection, @@ -573,4 +587,12 @@ public static Provider resolveProvider(String providerName) throws NoSuchAlgorit return provider; } + + private static KeyFactory createKeyFactory(String algorithm) { + try { + return KeyFactory.getInstance(algorithm); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Illegal key factory algorithm " + algorithm), e); + } + } } diff --git a/tests/certificate-authority/ec/broker_client.cert.pem b/tests/certificate-authority/ec/broker_client.cert.pem new file mode 100644 index 0000000000000..2993ed41ad9d6 --- /dev/null +++ b/tests/certificate-authority/ec/broker_client.cert.pem @@ -0,0 +1,9 @@ +-----BEGIN CERTIFICATE----- +MIIBIjCBygIUSAxJKNrIEmn3SVyw5rcYhwhKulwwCgYIKoZIzj0EAwIwETEPMA0G +A1UEAwwGQ0FSb290MB4XDTIzMTEyNDExNTE1M1oXDTMzMTEyMTExNTE1M1owGDEW +MBQGA1UEAwwNYnJva2VyX2NsaWVudDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IA +BGxRL4naRhrTZ9T2WdMBkCNmiamkrzEiDO55RVjhpHGWIoqPOvzs8i97vCVx39GV +vV/9agDp2nSuXYW8ax3UKnkwCgYIKoZIzj0EAwIDRwAwRAIge8qxnGgmv5h+Yw3Y +Ab/6xFD5QWERGMlfIl4ZCO3o6S0CICS/4jj45GfAPZS9QPfuo15rEa9Rbvvmmi+K +yY0JA0SP +-----END CERTIFICATE----- diff --git a/tests/certificate-authority/ec/broker_client.csr.pem b/tests/certificate-authority/ec/broker_client.csr.pem new file mode 100644 index 0000000000000..1f10a3c77f2b6 --- /dev/null +++ b/tests/certificate-authority/ec/broker_client.csr.pem @@ -0,0 +1,7 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIHTMHoCAQAwGDEWMBQGA1UEAwwNYnJva2VyX2NsaWVudDBZMBMGByqGSM49AgEG +CCqGSM49AwEHA0IABGxRL4naRhrTZ9T2WdMBkCNmiamkrzEiDO55RVjhpHGWIoqP +Ovzs8i97vCVx39GVvV/9agDp2nSuXYW8ax3UKnmgADAKBggqhkjOPQQDAgNJADBG +AiEA8sGFcbQuUGIUTCXTQ0z9b0eIYFIDVOcGSInQ+0unMJMCIQCmH0GlXZRGB2lx +HtfIz76HNnVu153LsHE11AEx7d/j2g== +-----END CERTIFICATE REQUEST----- diff --git a/tests/certificate-authority/ec/broker_client.key-pk8.pem b/tests/certificate-authority/ec/broker_client.key-pk8.pem new file mode 100644 index 0000000000000..124073b024564 --- /dev/null +++ b/tests/certificate-authority/ec/broker_client.key-pk8.pem @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgA92tkFXxKHYUJbeB +vvnMaGBnP2IenpF66Fikb06xbUKhRANCAARsUS+J2kYa02fU9lnTAZAjZomppK8x +IgzueUVY4aRxliKKjzr87PIve7wlcd/Rlb1f/WoA6dp0rl2FvGsd1Cp5 +-----END PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/broker_client.key.pem b/tests/certificate-authority/ec/broker_client.key.pem new file mode 100644 index 0000000000000..4d4b5163b1bb4 --- /dev/null +++ b/tests/certificate-authority/ec/broker_client.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIAPdrZBV8Sh2FCW3gb75zGhgZz9iHp6ReuhYpG9OsW1CoAoGCCqGSM49 +AwEHoUQDQgAEbFEvidpGGtNn1PZZ0wGQI2aJqaSvMSIM7nlFWOGkcZYiio86/Ozy +L3u8JXHf0ZW9X/1qAOnadK5dhbxrHdQqeQ== +-----END EC PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/ca.cert.pem b/tests/certificate-authority/ec/ca.cert.pem new file mode 100644 index 0000000000000..c10385d997e86 --- /dev/null +++ b/tests/certificate-authority/ec/ca.cert.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBeDCCAR2gAwIBAgIUKRGzcPm3RVuI7tXdPDAZZ7Vhqs8wCgYIKoZIzj0EAwIw +ETEPMA0GA1UEAwwGQ0FSb290MB4XDTIzMTEyNDExNTExNVoXDTMzMTEyMTExNTEx +NVowETEPMA0GA1UEAwwGQ0FSb290MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE +kOKZaL45B7PUB+G25GLP1PPfTkio/DaHUML+KJjxpdCnSmq+mt/EAQWlqNPB1hJv +6kOJ52vSxKe02BMeuROed6NTMFEwHQYDVR0OBBYEFDkqfvrnJ7PJhxJ7FTA7o8+b +f+CRMB8GA1UdIwQYMBaAFDkqfvrnJ7PJhxJ7FTA7o8+bf+CRMA8GA1UdEwEB/wQF +MAMBAf8wCgYIKoZIzj0EAwIDSQAwRgIhAN9+TWNNbIz8rMdkf4LGoIeQzYcAEyGJ +90ORM5JciBdaAiEA8UsuQBD4wO1t6plnRydkGMTeb1dNDEnhsuXOXBps8fE= +-----END CERTIFICATE----- diff --git a/tests/certificate-authority/ec/ca.cert.srl b/tests/certificate-authority/ec/ca.cert.srl new file mode 100644 index 0000000000000..a30f44e979e72 --- /dev/null +++ b/tests/certificate-authority/ec/ca.cert.srl @@ -0,0 +1 @@ +480C4928DAC81269F7495CB0E6B71887084ABA5D diff --git a/tests/certificate-authority/ec/ca.key.pem b/tests/certificate-authority/ec/ca.key.pem new file mode 100644 index 0000000000000..1255354584869 --- /dev/null +++ b/tests/certificate-authority/ec/ca.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIPT1Jap2sJ7NUGWT6q0fnSRoVRNNryWe/JHPwttyQke4oAoGCCqGSM49 +AwEHoUQDQgAEkOKZaL45B7PUB+G25GLP1PPfTkio/DaHUML+KJjxpdCnSmq+mt/E +AQWlqNPB1hJv6kOJ52vSxKe02BMeuROedw== +-----END EC PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/certificate_generation.txt b/tests/certificate-authority/ec/certificate_generation.txt new file mode 100644 index 0000000000000..7a6caa7b8f4be --- /dev/null +++ b/tests/certificate-authority/ec/certificate_generation.txt @@ -0,0 +1,34 @@ +# CA Private Key +openssl ecparam -name secp256r1 -genkey -out ca.key.pem +# Request certificate +openssl req -x509 -new -nodes -key ca.key.pem -subj "/CN=CARoot" -days 3650 -out ca.cert.pem + +# Server Private Key +openssl ecparam -name secp256r1 -genkey -out server.key.pem +# Convert to pkcs8 +openssl pkcs8 -topk8 -inform PEM -outform PEM -in server.key.pem -out server.key-pk8.pem -nocrypt +# Request certificate +openssl req -new -config server.conf -key server.key.pem -out server.csr.pem -sha256 +# Sign with CA +openssl x509 -req -in server.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out server.cert.pem -days 3650 -extensions v3_ext -extfile server.conf -sha256 + +# Broker internal client Private Key +openssl ecparam -name secp256r1 -genkey -out broker_client.key.pem +# Convert to pkcs8 +openssl pkcs8 -topk8 -inform PEM -outform PEM -in broker_client.key.pem -out broker_client.key-pk8.pem -nocrypt +# Request certificate +openssl req -new -subj "/CN=broker_client" -key broker_client.key.pem -out broker_client.csr.pem -sha256 +# Sign with CA +openssl x509 -req -in broker_client.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out broker_client.cert.pem -days 3650 -sha256 + + +# Client Private Key +openssl ecparam -name secp256r1 -genkey -out client.key.pem +# Convert to pkcs8 +openssl pkcs8 -topk8 -inform PEM -outform PEM -in client.key.pem -out client.key-pk8.pem -nocrypt +# Request certificate +openssl req -new -subj "/CN=client" -key client.key.pem -out client.csr.pem -sha256 +# Sign with CA +openssl x509 -req -in client.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out client.cert.pem -days 3650 -sha256 + + diff --git a/tests/certificate-authority/ec/client.cert.pem b/tests/certificate-authority/ec/client.cert.pem new file mode 100644 index 0000000000000..87701a6938d25 --- /dev/null +++ b/tests/certificate-authority/ec/client.cert.pem @@ -0,0 +1,8 @@ +-----BEGIN CERTIFICATE----- +MIIBHDCBwwIUSAxJKNrIEmn3SVyw5rcYhwhKul0wCgYIKoZIzj0EAwIwETEPMA0G +A1UEAwwGQ0FSb290MB4XDTIzMTEyNDExNTIwNVoXDTMzMTEyMTExNTIwNVowETEP +MA0GA1UEAwwGY2xpZW50MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE4QZJuqZS +mSDbjkoFGKvtYmSVaJ3IjtmgWsgQio4F5phIXpM6IZZfcLkJToY0b9W2jGhODK55 +jA+zkRxHrICkwTAKBggqhkjOPQQDAgNIADBFAiEA0iGNqg4t16SxFdZJu7o9gK8R +XVXphQ/9XAtw4XqfCUYCIGLoExE9XKdkzZ+sahFOpKD6YLZ1GgPRBPpBJFBGTYu7 +-----END CERTIFICATE----- diff --git a/tests/certificate-authority/ec/client.csr.pem b/tests/certificate-authority/ec/client.csr.pem new file mode 100644 index 0000000000000..4ec08d410f504 --- /dev/null +++ b/tests/certificate-authority/ec/client.csr.pem @@ -0,0 +1,7 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIHLMHMCAQAwETEPMA0GA1UEAwwGY2xpZW50MFkwEwYHKoZIzj0CAQYIKoZIzj0D +AQcDQgAE4QZJuqZSmSDbjkoFGKvtYmSVaJ3IjtmgWsgQio4F5phIXpM6IZZfcLkJ +ToY0b9W2jGhODK55jA+zkRxHrICkwaAAMAoGCCqGSM49BAMCA0gAMEUCIQDNZOBD +Z/YAWKEeRSVqhPvIpFYob1gmQfDcBJdG8e0K8wIgcfO0PLquIZP9P8VrDkkLQdZ9 +krOKk+F/LF9aqQBHTbU= +-----END CERTIFICATE REQUEST----- diff --git a/tests/certificate-authority/ec/client.key-pk8.pem b/tests/certificate-authority/ec/client.key-pk8.pem new file mode 100644 index 0000000000000..2b07827f21472 --- /dev/null +++ b/tests/certificate-authority/ec/client.key-pk8.pem @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgrC3O+TuZ82b1bD1M +SI9lMu6aaebqfoggcnaaAyUUstKhRANCAAThBkm6plKZINuOSgUYq+1iZJVonciO +2aBayBCKjgXmmEhekzohll9wuQlOhjRv1baMaE4MrnmMD7ORHEesgKTB +-----END PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/client.key.pem b/tests/certificate-authority/ec/client.key.pem new file mode 100644 index 0000000000000..ac1207fa51c0b --- /dev/null +++ b/tests/certificate-authority/ec/client.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIKwtzvk7mfNm9Ww9TEiPZTLummnm6n6IIHJ2mgMlFLLSoAoGCCqGSM49 +AwEHoUQDQgAE4QZJuqZSmSDbjkoFGKvtYmSVaJ3IjtmgWsgQio4F5phIXpM6IZZf +cLkJToY0b9W2jGhODK55jA+zkRxHrICkwQ== +-----END EC PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/jks/broker_client.cert.pem b/tests/certificate-authority/ec/jks/broker_client.cert.pem new file mode 100644 index 0000000000000..8a12e941d4e43 --- /dev/null +++ b/tests/certificate-authority/ec/jks/broker_client.cert.pem @@ -0,0 +1,10 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIBXjCCAQQCAQAwcjEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93 +bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH +VW5rbm93bjEWMBQGA1UEAwwNYnJva2VyX2NsaWVudDBZMBMGByqGSM49AgEGCCqG +SM49AwEHA0IABNEOf45UIs53Va887xTFRkZlmCnJUwYeu50pEll1APUwcldIHMXY +EqRqoTOcBtSRx4CpO9LMPFmyCS1E+afXnbKgMDAuBgkqhkiG9w0BCQ4xITAfMB0G +A1UdDgQWBBQFrlAl1jTZMagQVrax+OLTDJAQujAKBggqhkjOPQQDAgNIADBFAiBA +sgj2HrKwxCfoUbBIjYqRcLPRRVBsbYOGk4e2uFTZPwIhAN/AdQn786S/ebnwSUzR +yPyKEH+Qspx9nB08sQNn9N6U +-----END NEW CERTIFICATE REQUEST----- diff --git a/tests/certificate-authority/ec/jks/broker_client.keystore.jks b/tests/certificate-authority/ec/jks/broker_client.keystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..81ecf4497198c7999b51d3cd24784a1a30fc3b43 GIT binary patch literal 2034 zcma)7c{J1u8=l{o#WENg2G+yytnI_dV}<{&_(LatR6ufDB|lLP&#s zguN~bAc0Z_G7ZK+ChyQmAOkS`KM5ef7=X$St+Z1(gvkG`d6u9G@w!OxwOn(VQ}-QhxDvN zXv;YBTEnauAupi*&FsKFX1kSXu>dfFa>6)w57Ln*>->VQ+}|)+U#8>lvvysa^59PE z*@A@cw+mtP%TRxLVpXd%ecl7L&+;*Rv#t-HAzN*qp&F10O3V@-8j!UWHT`(aCrU882g< z#8zxj=A#78P;3k?jeOFrN;$RWKm>FP(@;09ma+Oeu(P~Vs|(-Q?BRKy^2!H>uSK*P zRma+H@B5c^5)3#K=I!Cn@hooVL~zM}u0^&pGa?f|l+FS}dFCYacv2f`V4OU};+;~BARjxp)ZPQ48saW#wAk3@C1Crahq2bJ z(m>bIYilFH{Wq)?zY%wLT`iZ-%|R`9Z4RZ%t?twEKlWO3s2AR+5NtSEcx^n_`%z;; zE5?YgvR*%Wp4GRMxj0(3@=0}X4xxjz$rBrRPBr5;{Z@8j(<+}I=zOapObErHCyI)~=^~&MC&9A@rUhoF{D18|sM)vKJ zz4E#yA|CLWY?c&_(1lmdi+q zUntLLLP?$r3vj+In^xcSF)B=KzjU=I>2guvX!h3o%o*|IX>b0pgu!Xvn?0i03nlMh zUyr&(dE$VU4{p8`cJbSb9up_Gcn8$V@<^N&pQYy{9|3~ghbw$9OGJKRHh(1(zp zL2Jz**-%=zZJ9bBd;6iQQg^T&RXLZGZ_tr{J2j*5n38Qt?3ANk29Kx>HmW&uyiSxw zsXJO}?YhhivMYSHK40&qsgm%Z-yha*(aYSj>lj<~Z~arNA9wK&EL{3+E?N&4KBwbO z{q+qeg@nIuwDG=~eSXlZ#G*yNAS+R=D)Q{xJt*jwTluT?Q^oQ5aO+0|RC*j?N%$*e zwo@(8rF)xSPDvo^KKX+n{8*Wc_ZZt+>DlU0`S#0rRzoY(L_^FqIfiPgU-hu%U3V>| z{afZ?dTQciDXW)$N%yw8l4~YvIKW-#=~KI+Vs`%eq9uV@>90G>VyZff_vzM6jir7t zj{ay8a*SJHd7$0?PVS~cp_;aIOJXMXjQ_TRyuQeKa24mrZ#cQjp-j>s-@PVM)l7)x#IF78lpRvUfi~R z+$1)j>E&49H~yrvr1CF)FTveFW%}f53h?m*b&Cccz=))z99`*y~H zr#M0Stcj-`r1Tw6VYw6N2(&Rg$93b24b%osmuMK2xs%vD_{NYbd+F?0IvRh~3n82H zL`j_pVGihCcYkhLt--+-{I%WV70Zw))?iu3p0*$2_-%@g5e1bi6>JQR0tY6HD{4-~ zx!2f!5=Io4Eoo|S1)bfmYL6v|SYu0=lQPsw(}F0;=$R`bhG`c*RuaVWf_#6qT8b1u zf|G3x&&EymS}n{wPQHTNtt$84uJhBDfVUmV@#)Yexv~s2QVb+aFwyank?$t!muV~! zO|0@_(G#g%A-1X;mRZNxgij{;$D%jp%s1|J+&DK!pEO9wd*4%ap=jt#!ojDq&JJ56 zlvag&J+F?3Qwk|d44UT`!xSOV2Gj#Zzkfm~0EfU7$AadQ`+wfW1n28rKU1|VNG)xO q&a(Ff&^}cIdm>)ez%UsL+?YYQ^_>TPn9$mL4t{*$8S($TjK2Z!esZ<| literal 0 HcmV?d00001 diff --git a/tests/certificate-authority/ec/jks/broker_client.signed.cert.pem b/tests/certificate-authority/ec/jks/broker_client.signed.cert.pem new file mode 100644 index 0000000000000..b91c69400c5d1 --- /dev/null +++ b/tests/certificate-authority/ec/jks/broker_client.signed.cert.pem @@ -0,0 +1,11 @@ +-----BEGIN CERTIFICATE----- +MIIBfTCCASQCFAJ6wB27laA1BCNConaAQPValPtaMAoGCCqGSM49BAMCMBExDzAN +BgNVBAMMBkNBUm9vdDAeFw0yMzExMjUwNzAzNTNaFw0zMzExMjIwNzAzNTNaMHIx +EDAOBgNVBAYTB1Vua25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vu +a25vd24xEDAOBgNVBAoTB1Vua25vd24xEDAOBgNVBAsTB1Vua25vd24xFjAUBgNV +BAMMDWJyb2tlcl9jbGllbnQwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATRDn+O +VCLOd1WvPO8UxUZGZZgpyVMGHrudKRJZdQD1MHJXSBzF2BKkaqEznAbUkceAqTvS +zDxZsgktRPmn152yMAoGCCqGSM49BAMCA0cAMEQCIArXdTOx19Nn/a6bsfTYurQW +4cepF5VKKijEjzyV69/BAiBpg60QwoZeSmz6bmil2zSb65jXrTzwhLpUZckVuHKn +og== +-----END CERTIFICATE----- diff --git a/tests/certificate-authority/ec/jks/ca.cert.pem b/tests/certificate-authority/ec/jks/ca.cert.pem new file mode 100644 index 0000000000000..a235464be7064 --- /dev/null +++ b/tests/certificate-authority/ec/jks/ca.cert.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBdjCCAR2gAwIBAgIUfHm94cF84m6FrJVNywJI4qTGZAEwCgYIKoZIzj0EAwIw +ETEPMA0GA1UEAwwGQ0FSb290MB4XDTIzMTEyNTAxMzQzM1oXDTMzMTEyMjAxMzQz +M1owETEPMA0GA1UEAwwGQ0FSb290MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE +Sxvkij8HQ+g07SnOLz1in81iGKY7lOAbJ1r4ihMVnOVjS2A4ZVGXHM2wp5ZB9r3Y +jPByBiaPApm/J17JwlXynqNTMFEwHQYDVR0OBBYEFKqDJwbgz0/Q3EKJ78OVJI5k +8+RYMB8GA1UdIwQYMBaAFKqDJwbgz0/Q3EKJ78OVJI5k8+RYMA8GA1UdEwEB/wQF +MAMBAf8wCgYIKoZIzj0EAwIDRwAwRAIgEF9RiwV0oBh9x1AvLFPoK5nnUlJ+0MNE +zz8Zw284zkICIDUZOPN/E7ZmTKzfoZ0EkxRrinEZ5M538aNbYFAUYoK+ +-----END CERTIFICATE----- diff --git a/tests/certificate-authority/ec/jks/ca.cert.srl b/tests/certificate-authority/ec/jks/ca.cert.srl new file mode 100644 index 0000000000000..c7b003ddff287 --- /dev/null +++ b/tests/certificate-authority/ec/jks/ca.cert.srl @@ -0,0 +1 @@ +027AC01DBB95A035042342A2768040F55A94FB5B diff --git a/tests/certificate-authority/ec/jks/ca.key.pem b/tests/certificate-authority/ec/jks/ca.key.pem new file mode 100644 index 0000000000000..57e595f139525 --- /dev/null +++ b/tests/certificate-authority/ec/jks/ca.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIJ/5AX63GN8cadJUCa5Aza5592JS7go9TXNfYemS4Ku4oAoGCCqGSM49 +AwEHoUQDQgAESxvkij8HQ+g07SnOLz1in81iGKY7lOAbJ1r4ihMVnOVjS2A4ZVGX +HM2wp5ZB9r3YjPByBiaPApm/J17JwlXyng== +-----END EC PRIVATE KEY----- diff --git a/tests/certificate-authority/ec/jks/ca.truststore.jks b/tests/certificate-authority/ec/jks/ca.truststore.jks new file mode 100644 index 0000000000000000000000000000000000000000..e2a667b21d6ac6593ce22d5796914de120835457 GIT binary patch literal 742 zcmXqLVtT~H$ZXKW)WgQ9)#lOmotKfFaX}MPElU$qB~Z8&hzk)?Y(S|bpimqWBLk2M zN60W5q`^6?yatg576>knfi{asy+Qu6J3Wq54c>_FyJV5sYA4L}j1P=c^HWP%D`of| z#wJgD_SMU2UsH2pjrA;!gvpPrUk04zaNf^zboY@fM_SAHA1wNmxp6;h{`M#GPj(6K z|G3BY{iiiz;wujt2lGyi`l1){U~RyUhz@oYmCrktemE(*@ys{*TkrFhFUqcYVfdpg z=E`aL6@t%t-taH`_ezobQqS$DbHA=DTyJX6x=ZjKcktXL_k)*Yak%lHDxd77@rEU# zjp@#uGK(xTrKkn@QmOkEKGkVa^qt;sDy6XMkiYMto>S$ltwzUnZ1VMFWKZiRH|uIK z9%(agFn#gLH&!IM*gS1kgwqt&`QbV@m&NIGt(IT4AXO{jf2C{Beev`vT@$q$)8cLH zl^s*Fnl~wQ%-X3~vQpcSGy74y=4GF=pDq;nO%k$LSuOEpMYKoX&RI{a|B5bur#ZP~ z=jum6mnIrG)m-0b^WdaIg!iA-N%Qv!i=2^^{Vwp=rJGrIQ&!+ViBg;VT#jPl^8eSP z_&HY_2}G@4);m?_g8^H=nA!z5mxFtbh@RYKV!|rZSoqB0aNUK}2g$wd+a=vi{*;DX z-Rm~DRGKID{Z@f1PgWehcK3pP@I2!KZ&`Yy_w&8>3O~2xd9GXmgLT`96Uo67vi%Mm z^571$-g9%VeZY>~_83ji>2FpX{$BpU@_o61uYsWfFFYAAF|snSDERag9FI*^e%f{!WCJ>9<518FL*wbjHbTl;hfED3ROTrlhDiE+rHqxg3QZ z?d(u)$&~0~h>*^aE@~{fl^yT%zE3^xU+?+j`+T48=kk5N&*z^nz(ce{z)*mPI1EP; zGZ-05XfOg?#zU}SJcQQ<>;dq=#Q&kdnlK(1zX2<6loDLwzb=$K7*dwj2BZmr&~JJ< zG~feJzK#Y^ZAB~0(n&Q?C%)5)`S z3w92;R9ZNp)TA}qlzu7V{ZrZVYVu~xyhl?|!dVnl!9&Abe6s4+Y5WT|P|$#wuv@;z zFeBRln%|B^;>e0RC{PfH13Cl>0Y!kK0p0JU3K<8({&Xljgh1X602Hbj#gsxe-3{>I z-$e*`84r%#fD|EM&_+Xivm?O&FC;cq`Ae~gAGBq5{n=|!>CHVos!9JOB!^<1!cW6w zg^G)h8jtR`3rgk5<)l^gI;-^^ln-5 zE5*tU43>LLPvOy2v)Lt|J6b7W>)S2(qq|g-tHfd0)-AuIgNmb~{QLlXPqz)*9Ud=j%U zd!d6~k&fGXeD#fUn1e$*SGtvIR7zc6^e(|@u^Co6S+G{CR@E?_pw4lpzUDieAl!?~ z)hDK}KQwRBKYY|wk&4TG-9%joQPRuGUshD8fm)M%^kHm4ewGW5{k5KzD{PT(OyDTa@=gI-wro@7Uy11+y!uhy%W<( z_<$pVCF0L|x=l)-wvmV*;{DpD_r?V?Wxia86B2OjLZ!XD_))B0%n53gtz~ycEohED zCdztvpV9nYdtZ@?sA1(Lc8_B4j9S`aP1H%Ff_W3RAX4nt8|F5Pu=&9%_f*8?JN^AgOw&JHUyyCKxRRHIUXqG$fSFajQ3_4tJCCnsa#+}yw$#;0GmQ~MMBDXZFEw@ZmtH#0e+-rVs)iE>`j~2P-UX_WX$sv;?M|GOMrg&CUBe@>CS=oH;pK+Qk zKK|JKcP{l0=&g&~@;gSC462-jU=NM&! zfg?iJMdK(PJ?Is8^_rq{r_a}C0se^ePl-{HspWOn7V29b?rNo9n0*j?sd`VgWGpk2 zS#zAW#Y6~+*xV_TP#&xqy9_fZw<6!|l&}Xw8@8UGjb@w~3g?fm`jWT{k9y*RRf9(R za*CF9)UVyI3hM~Ymt?+r^K95j=B|=90ndD!KXG?nXgj#H?rry+t1R$47#oUxH3d1M zDO0j|QP8&^?Gcrdc+nbvJ=V_PRdU$^I`!+m=m5HkNlJ3om3d{Q_&^dT{KrZ)vphr! zw)^V{_*Y}(`r>WV&m&q5AEo4A7ByaZY_I8)owU^0{#&#LMzi$G5FRr+pc=Od+m<|J z6zK#c^gM3wT@39aea`i;w+gUTPO1#5o;?ZCzk24Ho;mso&UB`lckHstsmZn%{a1ab zgurqbioiao-eRS7yH2)_ZBBMKT_J$K!>ysLDMt zbNuKc=z|M+=18h*KK)#4;9$qhNXb2iL2zP>HRTtZ{E!{Uo4$9X-R~?JDyk7Tj@Vcu zgtHt+cFF+pUiQ7jrGx-UkG33V5?Hoz4i1L$wR4FZ+}!3Y$cmD6+ix;hhQ z{l%@_NabXYineM@4$ahkDTW$=c(uXbzrB+q){~ zO#F}&w#LHl2ef`ZE31Z8)JB5npkPomDE6-m==~zqv1(8iZ)!xaF4i0X2m~y_#1tT6 z0Tvwla|i)1W5M-*K-v&6=tn{QV!;1Pq!YvRrGo|VPOA2$l==hhP3^}i+y5VtK5Uvn zP*Hc1Um}vZ0-LIfOctzcb5Tt)(o@K%>PP|$wIp zeRtI8EB2H)X@`&>Pfhwh9b&AMoLCuMHjYq_YidcEW)>&BNl@SG$eb_cEmwt2SMPDD zqdPp(<`Nt49USIIqowqdstY#jZ88Ev2cEI3-5qh*f`%78%A8WnaM6&Mie}@CD}#fq zGx{!8_Clk_+)9GzS@-MAYR8mu)foTX#!n(HJ50Rwra z|LS?kH+z6R2yn)km;e>bK!{mugQah6i~Hrhb|{;OTC(>|?fi_A$v4+$@~8rjgg({Q z8bKvZVvbFb>c?D`X)C&gy_;m-!f)0QP4t1$7GFue!x$Ib9dj>GhK2cwkwvv`<; z9YbtV_5^zT`F__6+eAYSlSy*l;`&RvCO#fDby%}noqD=QX_l`IYqlM&=X+0jvvZC6 z1BK!-#-sCFBIj5QnHa}!7U3}(xVWhV`MsUM})*VL7@tLNv!yiY+z(- zo|!iRP2*gwSY?(5&dV~JX;)8mzD{LN>;6%{TIuh7qCl$LsvalssGqvOmS$^jD^iXw zkE?3l>Q!^ESxxS$j{2Ect64TFXssn5W-Mjk{0PyTxwlHj$wg6eS+H|Qmvn4<)~J&0 zeYeYv7uO5QgG(H*?^fUB>yA&U;trQkTN5_txE{{&tBKmJ=$0P!Jdua|i=|T%%wa$@Ant;b}`P!xKp0b=O6Ub;%?z8 zO$F{>j2Of94a4GZZkEx_*0M@NOtG&DVLH?!uN!(~KKpU$*r8bBnTtM3<7{?~TrHHluAYT*pD+c!1q3+o9xyLcJqQpHt1Rk;Z zf#Ug?9)2NrGA)RC2D6SXQLu7pM4q?wM6l~l3Wl{Bh&ySKM?fT#+6AvfwN+Y%7?|59 znzMvN+f7mG^? z2pc4EBD6ae9tyw7o}uHS*kb|?$piP6;zPkaxuD{tbV_#?X`rKbK`KsEB2#=&@u62F zMU%#b_<(>vlqJ1Qixbt>;h1H{g!f{VZ^_$zH$wDmF=cT<Buaq=ut#8aC z@aLIr=1$fbq`+ly?x-f3mtdo~L597h4`AaJ!tK&4Mqbwn-;Of){ZBn~(vVXYZAW7` zfm^m4INqU%h!tAAid~U=zSP5*-Hqzh>wcY_9zD0^YZUWh^s81~t96TA=rLB$#gRY>T|A zp2-&p`<7MI4s7k%@HhSkkA0$3*uyzYVq5m6JRa7FsL$6~efS1)zy2zWDAd8jHKoVq zhEHgIPK=}t+2vcA9NDYXg^WhGFiF(;XI3AH)%_T Date: Thu, 28 Dec 2023 23:24:59 +0800 Subject: [PATCH 4/8] By pass gpg check --- buildtools/pom.xml | 24 ++++++++++++------------ pom.xml | 24 ++++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/buildtools/pom.xml b/buildtools/pom.xml index 799f416c896c0..17ff00250e389 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -269,18 +269,18 @@ - - maven-gpg-plugin - - - sign-artifacts - verify - - sign - - - - + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 07081b8aab9c3..157cb1ddb0b47 100644 --- a/pom.xml +++ b/pom.xml @@ -2314,18 +2314,18 @@ flexible messaging model and an intuitive client API. - - maven-gpg-plugin - - - sign-artifacts - verify - - sign - - - - + + + + + + + + + + + + From c5b04e4efc93c9ebfc2a41d6e38e1ebd8b546169 Mon Sep 17 00:00:00 2001 From: Mattison Chao Date: Fri, 29 Dec 2023 15:48:48 +0800 Subject: [PATCH 5/8] Fix class not found exception --- .../pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 17b14d770f6c0..089bde4d70a4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -24,11 +24,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import com.google.common.io.Resources; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; - import java.io.File; import java.lang.reflect.Field; import java.net.InetSocketAddress; @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; - import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.PulsarMockBookKeeper; @@ -77,7 +76,6 @@ import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; import org.testng.annotations.DataProvider; /** From 9cf4779aadd3720fb244fcbeec7c3776d3ae364a Mon Sep 17 00:00:00 2001 From: Mattison Chao Date: Fri, 29 Dec 2023 23:10:33 +0800 Subject: [PATCH 6/8] Fix license check --- distribution/server/src/assemble/LICENSE.bin.txt | 1 + pulsar-sql/presto-distribution/LICENSE | 3 +++ 2 files changed, 4 insertions(+) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 73097e53f49f3..2b1cf57203f9e 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -394,6 +394,7 @@ The Apache Software License, Version 2.0 - org.apache.logging.log4j-log4j-core-2.18.0.jar - org.apache.logging.log4j-log4j-slf4j-impl-2.18.0.jar - org.apache.logging.log4j-log4j-web-2.18.0.jar + - org.apache.logging.log4j-log4j-layout-template-json-2.20.0.jar * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar * BookKeeper - org.apache.bookkeeper-bookkeeper-common-4.14.7.jar diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index bfc8923dba862..61530681d0156 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -349,6 +349,9 @@ The Apache Software License, Version 2.0 - leveldb-api-0.10.jar * Log4j implemented over SLF4J - log4j-over-slf4j-1.7.32.jar + - log4j-api-2.18.0.jar + - log4j-core-2.18.0.jar + - log4j-layout-template-json-2.20.0.jar * Lucene Common Analyzers - lucene-analyzers-common-8.4.1.jar - lucene-core-8.4.1.jar From 28cd8dca33c07748e5658456e519fb6b13debc11 Mon Sep 17 00:00:00 2001 From: Mattison Chao Date: Fri, 29 Dec 2023 23:13:52 +0800 Subject: [PATCH 7/8] Fix flaky test --- .../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 85c8eca8fa054..70c00558bf81a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1683,6 +1683,7 @@ public void testReplicatorWithFailedAck() throws Exception { MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); ConcurrentOpenHashMap replicators = topic.getReplicators(); + Awaitility.await().untilAsserted(() -> Assert.assertNotNull(replicators.get("r2"))); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) From 0fabeb8b05ef462ca9cb9ec393a811ab6e5187d9 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Mon, 18 Sep 2023 13:54:04 +0800 Subject: [PATCH 8/8] Fix license check issue --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 157cb1ddb0b47..461733944a451 100644 --- a/pom.xml +++ b/pom.xml @@ -1554,6 +1554,7 @@ flexible messaging model and an intuitive client API. **/*.dylib src/test/resources/*.txt **/dependency-reduced-pom.xml + src/test/resources/ssl/README.md