Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc authored Aug 19, 2022
2 parents 5cce38f + 3ecf34a commit 9ef3f2a
Show file tree
Hide file tree
Showing 1,830 changed files with 3,404 additions and 1,230 deletions.
2 changes: 1 addition & 1 deletion build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function test_group_other() {
-Dexclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java
**/PrimitiveSchemaTest.java,
BlobStoreManagedLedgerOffloaderTest.java'
BlobStoreManagedLedgerOffloaderTest.java' -DtestReuseFork=false

mvn_test -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java'
Expand Down
4 changes: 2 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1483,8 +1483,8 @@ gcsManagedLedgerOffloadServiceAccountKeyFile=
# For Azure BlobStore ledger offload, Blob Container (Bucket) to place offloaded ledger into
managedLedgerOffloadBucket=

#For File System Storage, file system profile path
fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
#For File System Storage, file system profile path (Use a relative path, the current dir is the pulsar dir)
fileSystemProfilePath=conf/filesystem_offload_core_site.xml

#For File System Storage, file system uri
fileSystemURI=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2288,9 +2288,7 @@ List<Entry> filterReadEntries(List<Entry> entries) {
} else {
// Remove from the entry list all the entries that were already marked for deletion
return Lists.newArrayList(Collections2.filter(entries, entry -> {
boolean includeEntry = !individualDeletedMessages.contains(
((PositionImpl) entry.getPosition()).getLedgerId(),
((PositionImpl) entry.getPosition()).getEntryId());
boolean includeEntry = !individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId());
if (!includeEntry) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Filtering entry at {} - already deleted", ledger.getName(), name,
Expand Down
19 changes: 19 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,25 @@

<build>
<plugins>
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
<configuration>
<failOnViolations>false</failOnViolations>
<javaVersion>17</javaVersion>
<violationLogLevel>warn</violationLogLevel>
</configuration>
<executions>
<execution>
<id>modernizer</id>
<phase>verify</phase>
<goals>
<goal>modernizer</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
private LoadSheddingTask loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
private PulsarAdmin adminClient = null;
Expand Down Expand Up @@ -448,7 +448,11 @@ public CompletableFuture<Void> closeAsync() {
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
* getConfiguration()
.getBrokerShutdownTimeoutMs())));

// close protocol handler before closing broker service
if (protocolHandlers != null) {
protocolHandlers.close();
protocolHandlers = null;
}

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
Expand Down Expand Up @@ -528,11 +532,6 @@ public CompletableFuture<Void> closeAsync() {

offloadersCache.close();

if (protocolHandlers != null) {
protocolHandlers.close();
protocolHandlers = null;
}

if (coordinationService != null) {
coordinationService.close();
}
Expand Down Expand Up @@ -1062,20 +1061,17 @@ protected void startLeaderElectionService() {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
long loadSheddingInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());

if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask.cancel();
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadSheddingTask(loadManager),
loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
Expand All @@ -1086,7 +1082,7 @@ protected void startLeaderElectionService() {
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
loadSheddingTask.cancel(false);
loadSheddingTask.cancel();
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su
private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1590,7 +1590,7 @@ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncRespo
Optional<Position> position,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1646,7 +1646,7 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes
String subName, Map<String, String> subscriptionProperties,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
Expand All @@ -1673,7 +1673,7 @@ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncRespon
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenApply((Topic topic) -> {
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1776,7 +1776,7 @@ protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
Subscription sub = topic.getSubscription(subName);
Expand Down Expand Up @@ -1923,7 +1923,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
Expand Down Expand Up @@ -2332,7 +2332,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
Expand Down Expand Up @@ -2870,7 +2870,7 @@ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName
ret = CompletableFuture.completedFuture(null);
}
return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES, subName))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
CompletableFuture<Entry> entry;
Expand Down Expand Up @@ -3790,7 +3790,7 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St
}
future.thenCompose(__ ->
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
Expand Down Expand Up @@ -3942,7 +3942,7 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES, subName))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,9 +31,18 @@
public class LoadSheddingTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
private final AtomicReference<LoadManager> loadManager;
private final ScheduledExecutorService loadManagerExecutor;

public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
private final ServiceConfiguration config;

private volatile boolean isCancel = false;

public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
ScheduledExecutorService loadManagerExecutor,
ServiceConfiguration config) {
this.loadManager = loadManager;
this.loadManagerExecutor = loadManagerExecutor;
this.config = config;
}

@Override
Expand All @@ -39,6 +51,26 @@ public void run() {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
} finally {
if (!isCancel && loadManagerExecutor != null && config != null) {
loadManagerExecutor.schedule(
new LoadSheddingTask(loadManager, loadManagerExecutor, config),
config.getLoadBalancerSheddingIntervalMinutes(),
TimeUnit.MINUTES);
}
}
}

public void start() {
if (loadManagerExecutor != null && config != null) {
loadManagerExecutor.schedule(
new LoadSheddingTask(loadManager, loadManagerExecutor, config),
config.getLoadBalancerSheddingIntervalMinutes(),
TimeUnit.MINUTES);
}
}

public void cancel() {
isCancel = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,16 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
return writePromise;
}
int unackedMessages = totalMessages;
// Note
// Must ensure that the message is written to the pendingAcks before sent is first, because this consumer
// is possible to disconnect at this time.
if (pendingAcks != null) {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry != null) {
int totalEntries = 0;

for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry != null) {
totalEntries++;
// Note
// Must ensure that the message is written to the pendingAcks before sent is first,
// because this consumer is possible to disconnect at this time.
if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
int stickyKeyHash = getStickyKeyHash(entry);
long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
Expand All @@ -317,10 +320,10 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
// calculate avg message per entry
if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
// set init value.
avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries);
} else {
avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
+ (1 - avgPercent) * totalMessages / entries.size());
+ (1 - avgPercent) * totalMessages / totalEntries);
}

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
Expand Down
Loading

0 comments on commit 9ef3f2a

Please sign in to comment.