Skip to content

Commit

Permalink
Moved entries filtering from consumer to dispatcher (#4329)
Browse files Browse the repository at this point in the history
* Moved entries filtering from consumer to dispatcher

* Optimize when all messages were filtered out

* Fixed pending adds adding and re-added check for older consumers

* Fixed reusing of thread local from different thread

* Pass the redelivery tracker to consumer.sendMessages()
  • Loading branch information
merlimat authored May 22, 2019
1 parent bf06ef3 commit cb34e6a
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 225 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;

import java.util.Collections;
import java.util.List;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;

public abstract class AbstractBaseDispatcher {

protected final Subscription subscription;

protected AbstractBaseDispatcher(Subscription subscription) {
this.subscription = subscription;
}

/**
* Filter messages that are being sent to a consumers.
* <p>
* Messages can be filtered out for multiple reasons:
* <ul>
* <li>Checksum or metadata corrupted
* <li>Message is an internal marker
* <li>Message is not meant to be delivered immediately
* </ul>
*
* @param entries
* a list of entries as read from storage
*
* @param batchSizes
* an array where the batch size for each entry (the number of messages within an entry) is stored. This
* array needs to be of at least the same size as the entries list
*
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
* @param subscription
* the subscription object
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo) {
int totalMessages = 0;
long totalBytes = 0;

for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
ByteBuf metadataAndPayload = entry.getDataBuffer();
PositionImpl pos = (PositionImpl) entry.getPosition();

MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);

try {
if (msgMetadata == null) {
// Message metadata was corrupted
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
}

int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
batchSizes.setBatchSize(i, batchSize);
} finally {
msgMetadata.recycle();
}
}

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
*/
public abstract class AbstractDispatcherMultipleConsumers {
public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDispatcher {

protected final CopyOnWriteArrayList<Consumer> consumerList = new CopyOnWriteArrayList<>();
protected final ObjectSet<Consumer> consumerSet = new ObjectHashSet<>();
Expand All @@ -39,6 +39,11 @@ public abstract class AbstractDispatcherMultipleConsumers {
protected static final AtomicIntegerFieldUpdater<AbstractDispatcherMultipleConsumers> IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(AbstractDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
}

public boolean isConsumerConnected() {
return !consumerList.isEmpty();
}
Expand Down Expand Up @@ -127,7 +132,7 @@ public Consumer getNextConsumer() {

/**
* Finds index of first available consumer which has higher priority then given targetPriority
*
*
* @param targetPriority
* @return -1 if couldn't find any available consumer
*/
Expand Down Expand Up @@ -187,7 +192,7 @@ private int getNextConsumerFromSameOrLowerLevel(int currentRoundRobinIndex) {

/**
* Finds index of first consumer in list which has same priority as given targetPriority
*
*
* @param targetPriority
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDispatcherSingleActiveConsumer {
public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher {

protected final String topicName;
protected static final AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer> ACTIVE_CONSUMER_UPDATER =
Expand All @@ -53,7 +53,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer {
private volatile int isClosed = FALSE;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName) {
String topicName, Subscription subscription) {
super(subscription);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
Expand Down
Loading

0 comments on commit cb34e6a

Please sign in to comment.