Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][broker] PIP 37: Support chunking with Shared subscription #16202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
*/
public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
Expand All @@ -96,7 +96,7 @@ public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSi
* EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
*/
public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray, int startOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
List<? extends Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
int totalMessages = 0;
long totalBytes = 0;
Expand All @@ -105,14 +105,17 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
final Entry entry = entries.get(i);
if (entry == null) {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
.orElseGet(() -> Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
.orElseGet(() -> (entry instanceof EntryAndMetadata)
? ((EntryAndMetadata) entry).getMetadata()
: Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
);
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
Expand Down Expand Up @@ -203,6 +204,34 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
}

@VisibleForTesting
Consumer(String consumerName, int availablePermits) {
this.subscription = null;
this.subType = null;
this.cnx = null;
this.appId = null;
this.topicName = null;
this.partitionIdx = 0;
this.consumerId = 0L;
this.priorityLevel = 0;
this.readCompacted = false;
this.consumerName = consumerName;
this.msgOut = null;
this.msgRedeliver = null;
this.msgOutCounter = null;
this.bytesOutCounter = null;
this.messageAckRate = null;
this.pendingAcks = null;
this.stats = null;
this.isDurable = false;
this.metadata = null;
this.keySharedMeta = null;
this.clientAddress = null;
this.startMessageId = null;
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
}

public SubType subType() {
return subType;
}
Expand All @@ -227,7 +256,7 @@ public boolean readCompacted() {
return readCompacted;
}

public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes,
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long totalChunkedMessages,
RedeliveryTracker redeliveryTracker) {
Expand All @@ -241,7 +270,7 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc
*
* @return a SendMessageInfo object that contains the detail of what was sent to consumer
*/
public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes,
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long totalChunkedMessages,
RedeliveryTracker redeliveryTracker, long epoch) {
Expand Down Expand Up @@ -820,8 +849,13 @@ public KeySharedMeta getKeySharedMeta() {

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
if (subscription != null && cnx != null) {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
} else {
return MoreObjects.toStringHelper(this).add("consumerId", consumerId)
.add("consumerName", consumerName).toString();
}
}

public CompletableFuture<Void> checkPermissionsAsync() {
Expand Down Expand Up @@ -1041,7 +1075,12 @@ public Map<String, String> getMetadata() {
}

private int getStickyKeyHash(Entry entry) {
byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName());
final byte[] stickyKey;
if (entry instanceof EntryAndMetadata) {
stickyKey = ((EntryAndMetadata) entry).getStickyKey();
} else {
stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName());
}
return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import lombok.Getter;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;

public class EntryAndMetadata implements Entry {

private final Entry entry;
@Getter
@Nullable
private final MessageMetadata metadata;

private EntryAndMetadata(final Entry entry, @Nullable final MessageMetadata metadata) {
this.entry = entry;
this.metadata = metadata;
}

public static EntryAndMetadata create(final Entry entry, final MessageMetadata metadata) {
return new EntryAndMetadata(entry, metadata);
}

@VisibleForTesting
static EntryAndMetadata create(final Entry entry) {
return create(entry, Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1));
}

public byte[] getStickyKey() {
if (metadata != null) {
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
}
}
return "NONE_KEY".getBytes(StandardCharsets.UTF_8);
}

@Override
public String toString() {
String s = entry.getLedgerId() + ":" + entry.getEntryId();
if (metadata != null) {
s += ("@" + metadata.getProducerName() + "-" + metadata.getSequenceId());
if (metadata.hasChunkId() && metadata.hasNumChunksFromMsg()) {
s += ("-" + metadata.getChunkId() + "-" + metadata.getNumChunksFromMsg());
}
}
return s;
}

@Override
public byte[] getData() {
return entry.getData();
}

@Override
public byte[] getDataAndRelease() {
return entry.getDataAndRelease();
}

@Override
public int getLength() {
return entry.getLength();
}

@Override
public ByteBuf getDataBuffer() {
return entry.getDataBuffer();
}

@Override
public Position getPosition() {
return entry.getPosition();
}

@Override
public long getLedgerId() {
return entry.getLedgerId();
}

@Override
public long getEntryId() {
return entry.getEntryId();
}

@Override
public boolean release() {
return entry.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boo
void sendReachedEndOfTopic(long consumerId);

Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker, long epoch);
int partitionIdx, List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker, long epoch);

void sendTcClientConnectResponse(long requestId, ServerError error, String message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ public void sendReachedEndOfTopic(long consumerId) {

@Override
public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker, long epoch) {
int partitionIdx, List<? extends Entry> entries,
EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker, long epoch) {
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
ctx.channel().eventLoop().execute(() -> {
Expand Down
Loading