Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[FEATURE] Support kafka LogValidator validate inner records and compr…
Browse files Browse the repository at this point in the history
…ession codec when handle producer request with entryFormat=kafka (#791)

fixes #687 

### Motivation
Previously, in order to support low version kafka clients, such as 0.10 version clients, we verified the data when processing fetch requests. Because the lower message format would maintain the internal message set, according to the production situation I encountered, in a certain sometimes there is an error in the offset of the internal message set, such as the bug in the low version of the sarama client.

The problem is that for the low version message format, no matter whether there is a problem with the internal message set, we will regenerate the message records, which is unnecessary in some cases.

### Modifications
Support message set verification in production like kafka, and only need to verify when entryFormat=kafka.
Because when entryFormat=pulsar, we still cannot avoid doing message conversion during consumption.

Co-authored-by: Yunze Xu <xyzinfernity@163.com>
Co-authored-by: Kai Wang <kwang@streamnative.io>
Co-authored-by: Huanli Meng <48120384+Huanli-Meng@users.noreply.github.com>
  • Loading branch information
4 people committed Oct 20, 2021
1 parent 61a0fdd commit fa94761
Show file tree
Hide file tree
Showing 29 changed files with 1,666 additions and 246 deletions.
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This section lists configurations that may affect the performance.

| Name | Description | Range | Default |
| ----------------- | ------------------------------------------------------------ | ----------------- | ------- |
| entryFormat | The format of an entry. Changing it to `pulsar` will avoid unnecessary encoding and decoding to improve the performance, whose cost is that a topic cannot be used by mixed Pulsar clients and Kafka clients. | kafka,<br> pulsar | kafka |
| entryFormat | The format of an entry. If it is set to`kafka`, there is no unnecessary encoding and decoding work, which helps improve the performance. However, in this situation, a topic cannot be used by mixed Pulsar clients and Kafka clients. If it is set to `mixed_kafka`, Kafka clients (0.10.x) are supported. <br>- **Note**: Compared with performance for `mixed_kafka`, performance is improved by 2 to 3 times when the parameter is set to `kafka`. | kafka, <br> mixed_kafka,<br> pulsar | kafka |
| maxReadEntriesNum | The maximum number of entries that are read from the cursor once per time.<br>Increasing this value can make FETCH request read more bytes each time.<br>**NOTE**: Currently, KoP does not check the maximum byte limit. Therefore, if the value is too great, the response size may be over the network limit. | | 5 |

## Network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.AbortedIndexEntry;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
import io.streamnative.pulsar.handlers.kop.format.EncodeResult;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.format.KafkaMixedEntryFormatter;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
Expand Down Expand Up @@ -90,6 +93,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -321,7 +325,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.topicManager = new KafkaTopicManager(this);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat());
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig);
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
Expand Down Expand Up @@ -914,19 +918,22 @@ private void completeSendOperationForThrottling(long msgSize) {
}

private void publishMessages(final Optional<PersistentTopic> persistentTopicOpt,
final ByteBuf byteBuf,
final int numMessages,
final MemoryRecords records,
final EncodeResult encodeResult,
final TopicPartition topicPartition,
final Consumer<Long> offsetConsumer,
final Consumer<Errors> errorsConsumer) {
final MemoryRecords records = encodeResult.getRecords();
final int numMessages = encodeResult.getNumMessages();
final ByteBuf byteBuf = encodeResult.getEncodedByteBuf();
if (!persistentTopicOpt.isPresent()) {
encodeResult.recycle();
// It will trigger a retry send of Kafka client
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}
PersistentTopic persistentTopic = persistentTopicOpt.get();
if (persistentTopic.isSystemTopic()) {
encodeResult.recycle();
log.error("Not support producing message to system topic: {}", persistentTopic);
errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION);
return;
Expand All @@ -948,7 +955,7 @@ private void publishMessages(final Optional<PersistentTopic> persistentTopicOpt,
final RecordBatch batch = records.batchIterator().next();
offsetFuture.whenComplete((offset, e) -> {
completeSendOperationForThrottling(byteBuf.readableBytes());
byteBuf.release();
encodeResult.recycle();
if (e == null) {
if (batch.isTransactional()) {
getTransactionCoordinator().addActivePidOffset(TopicName.get(partitionName), batch.producerId(),
Expand Down Expand Up @@ -1071,17 +1078,6 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
final long beforeRecordsProcess = MathUtils.nowInNano();
final MemoryRecords validRecords =
validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records);
final int numMessages = EntryFormatter.parseNumMessages(validRecords);
final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(byteBuf.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

final CompletableFuture<Optional<PersistentTopic>> topicFuture =
topicManager.getTopic(fullPartitionName);
Expand All @@ -1098,8 +1094,31 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
}

final Consumer<Optional<PersistentTopic>> persistentTopicConsumer = persistentTopicOpt -> {
publishMessages(persistentTopicOpt, byteBuf, numMessages, validRecords, topicPartition,
offsetConsumer, errorsConsumer);
if (!persistentTopicOpt.isPresent()) {
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}

final EncodeRequest encodeRequest = EncodeRequest.get(validRecords);
if (entryFormatter instanceof KafkaMixedEntryFormatter) {
final ManagedLedger managedLedger = persistentTopicOpt.get().getManagedLedger();
final long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger);
encodeRequest.setBaseOffset(logEndOffset);
}

final EncodeResult encodeResult = entryFormatter.encode(encodeRequest);
encodeRequest.recycle();
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(encodeResult.getEncodedByteBuf().readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

publishMessages(persistentTopicOpt, encodeResult, topicPartition, offsetConsumer, errorsConsumer);
};

if (topicFuture.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {

@FieldContext(
category = CATEGORY_KOP,
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]"
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka, mixed_kafka]"
)
private String entryFormat = "pulsar";

Expand Down Expand Up @@ -388,6 +388,14 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private Set<String> kopAllowedNamespaces;

@FieldContext(
category = CATEGORY_KOP,
doc = "KOP server compression type. Only used for entryFormat=mixed_kafka. If it's not set to none, "
+ "the client messages will be used compression type which configured in here.\n"
+ "The supported compression types are: [\"none\", \"gzip\", \"snappy\", \"lz4\"]"
)
private String kafkaCompressionType = "none";

private String checkAdvertisedListeners(String advertisedListeners) {
StringBuilder listenersReBuilder = new StringBuilder();
for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void complete() {
if (resultFuture.isCancelled()) {
// The request was cancelled by KafkaCommandDecoder when channel is closed or this request is expired,
// so the Netty buffers should be released.
decodeResults.forEach(DecodeResult::release);
decodeResults.forEach(DecodeResult::recycle);
return;
}
if (resultFuture.isDone()) {
Expand Down Expand Up @@ -251,7 +251,7 @@ public void complete() {
fetchRequest.metadata().sessionId()),
() -> {
// release the batched ByteBuf if necessary
decodeResults.forEach(DecodeResult::release);
decodeResults.forEach(DecodeResult::recycle);
}));
recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopRecordsUtil;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -44,14 +40,13 @@ public abstract class AbstractEntryFormatter implements EntryFormatter {
// These key-value identifies the entry's format as kafka
public static final String IDENTITY_KEY = "entry.format";
public static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
private final Time time = Time.SYSTEM;

@Override
public DecodeResult decode(List<Entry> entries, byte magic) {
Optional<List<ByteBuf>> optionalByteBufs = Optional.empty();

// reset header information
final List<ByteBuf> orderedByteBuf = new ArrayList<>();

int totalSize = 0;
// batched ByteBuf should be released after sending to client
ByteBuf batchedByteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);
for (Entry entry : entries) {
try {
long startOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry);
Expand All @@ -63,66 +58,53 @@ public DecodeResult decode(List<Entry> entries, byte magic) {

// batch magic greater than the magic corresponding to the version requested by the client
// need down converted
if (batchMagic > magic || batchMagic != RecordBatch.MAGIC_VALUE_V2) {
if (batchMagic > magic) {
MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
//down converted, batch magic will be set to client magic
// down converted, batch magic will be set to client magic
ConvertedRecords<MemoryRecords> convertedRecords =
KopRecordsUtil.convertAndAssignOffsets(memoryRecords.batches(), magic, startOffset);
memoryRecords.downConvert(magic, startOffset, time);

final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));

totalSize += kafkaBuffer.readableBytes();
batchedByteBuf.writeBytes(kafkaBuffer);
kafkaBuffer.release();
if (log.isTraceEnabled()) {
log.trace("[{}:{}] convertAndAssignOffsets record for down converted"
+ " or assign offsets with v0 and v1 magic, start offset {},"
log.trace("[{}:{}] MemoryRecords down converted, start offset {},"
+ " entry magic: {}, client magic: {}",
entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
}

} else {
//not need down converted, batch magic retains the magic value written in production
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
// not need down converted, batch magic retains the magic value written in production
ByteBuf buf = byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes());
totalSize += buf.readableBytes();
batchedByteBuf.writeBytes(buf);
}
} else {
final DecodeResult decodeResult =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
final ByteBuf kafkaBuffer = decodeResult.getOrCreateByteBuf();
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
totalSize += kafkaBuffer.readableBytes();
batchedByteBuf.writeBytes(kafkaBuffer);
decodeResult.recycle();
}

// Almost all exceptions in Kafka inherit from KafkaException and will be captured
// and processed in KafkaApis. Here, whether it is down-conversion or the IOException
// in builder.appendWithOffset in decodePulsarEntryToKafkaRecords will be caught by Kafka
// and the KafkaException will be thrown. So we need to catch KafkaException here.
} catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e);
} finally {
entry.release();
}
}

// batched ByteBuf should be released after sending to client
int totalSize = orderedByteBuf.stream().mapToInt(ByteBuf::readableBytes).sum();
ByteBuf batchedByteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);

for (ByteBuf byteBuf : orderedByteBuf) {
batchedByteBuf.writeBytes(byteBuf);
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.forEach(ReferenceCounted::release));

// release entries
entries.forEach(Entry::release);
return new DecodeResult(
return DecodeResult.get(
MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(batchedByteBuf)), batchedByteBuf);
}

private static boolean isKafkaEntryFormat(final MessageMetadata messageMetadata) {
protected static boolean isKafkaEntryFormat(final MessageMetadata messageMetadata) {
final List<KeyValue> keyValues = messageMetadata.getPropertiesList();
for (KeyValue keyValue : keyValues) {
if (keyValue.hasKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,52 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.AllArgsConstructor;
import lombok.Data;
import io.netty.util.Recycler;
import lombok.Getter;
import lombok.NonNull;
import org.apache.kafka.common.record.MemoryRecords;

/**
* Result of decode in entry formatter.
*/
@Data
@AllArgsConstructor
public class DecodeResult {

private @NonNull MemoryRecords records;
@Getter
private MemoryRecords records;
private ByteBuf releasedByteBuf;

public DecodeResult(@NonNull MemoryRecords records) {
this.records = records;
private final Recycler.Handle<DecodeResult> recyclerHandle;

public static DecodeResult get(MemoryRecords records) {
return get(records, null);
}

public static DecodeResult get(MemoryRecords records,
ByteBuf releasedByteBuf) {
DecodeResult decodeResult = RECYCLER.get();
decodeResult.records = records;
decodeResult.releasedByteBuf = releasedByteBuf;
return decodeResult;
}

private DecodeResult(Recycler.Handle<DecodeResult> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

public void release() {
private static final Recycler<DecodeResult> RECYCLER = new Recycler<DecodeResult>() {
@Override
protected DecodeResult newObject(Recycler.Handle<DecodeResult> handle) {
return new DecodeResult(handle);
}
};

public void recycle() {
records = null;
if (releasedByteBuf != null) {
releasedByteBuf.release();
releasedByteBuf = null;
}
recyclerHandle.recycle(this);
}

public @NonNull ByteBuf getOrCreateByteBuf() {
Expand All @@ -47,4 +70,5 @@ public void release() {
return Unpooled.wrappedBuffer(records.buffer());
}
}

}
Loading

0 comments on commit fa94761

Please sign in to comment.