Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Support kafka LogValidator validate inner records and compression codec when handle producer request with entryFormat=kafka #791

Merged
merged 37 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5d08a6c
Support kafka LogValidator validator inner records and compression co…
wenbingshen Oct 7, 2021
1ed687f
only need to validate when entryFormat=kafka
wenbingshen Oct 7, 2021
af595ef
add FieldContext doc
wenbingshen Oct 7, 2021
f07f484
add KopLogValidator
wenbingshen Oct 7, 2021
6bb270d
add LongRef and CompressionCodec
wenbingshen Oct 7, 2021
5580cef
fix Codacy static code error
wenbingshen Oct 7, 2021
23545b0
fix checkstyle
wenbingshen Oct 7, 2021
4395153
merge master and resolve conflict
wenbingshen Oct 9, 2021
1453cbc
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/K…
wenbingshen Oct 9, 2021
015d174
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 10, 2021
e232f17
move validateMessagesAndAssignOffsets call to KafkaEntryFormatter#encode
wenbingshen Oct 10, 2021
75d6cc2
add test for LongRef and CompressionCodec
wenbingshen Oct 10, 2021
9a1362c
add MixedKafkaEntryFormatter support 0.10.x kafka clients
wenbingshen Oct 11, 2021
6365176
fix checkstyle error
wenbingshen Oct 11, 2021
1010d01
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/f…
wenbingshen Oct 12, 2021
a267fb3
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/f…
wenbingshen Oct 12, 2021
98456a5
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/u…
wenbingshen Oct 12, 2021
ebe03bf
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 12, 2021
8e1350e
addressed reviewer's comments
wenbingshen Oct 12, 2021
41f513c
remove unnecessary AtomicReference
wenbingshen Oct 12, 2021
5713a66
Update docs/configuration.md
wenbingshen Oct 12, 2021
f0ff60b
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 12, 2021
d06ca93
Merge branch 'addLogValidator' of https://github.com/wenbingshen/kop …
wenbingshen Oct 12, 2021
9001b80
Update docs/configuration.md
wenbingshen Oct 13, 2021
38f6801
Merge remote-tracking branch 'origin/addLogValidator' into addLogVali…
wenbingshen Oct 14, 2021
5537fb1
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 14, 2021
0997a20
rename KafkaEntryFormatter and add Recycler to encode/decode result
wenbingshen Oct 14, 2021
12a9703
rename test class name
wenbingshen Oct 14, 2021
3f71139
fix Codacy Static Code
wenbingshen Oct 14, 2021
eb9e291
Fix the number of repeated releases of byteBuf is greater than the re…
wenbingshen Oct 14, 2021
e4b3638
Addressed reviewer's comments
wenbingshen Oct 15, 2021
fcb046d
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 15, 2021
1a3625a
fix checkstyle and Codacy
wenbingshen Oct 15, 2021
2b4fe46
fix checkstyle
wenbingshen Oct 15, 2021
4c955d5
Addressed comments
wenbingshen Oct 20, 2021
7f53e2b
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 20, 2021
c214740
fix codacy static code
wenbingshen Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This section lists configurations that may affect the performance.

| Name | Description | Range | Default |
| ----------------- | ------------------------------------------------------------ | ----------------- | ------- |
| entryFormat | The format of an entry. Changing it to `pulsar` will avoid unnecessary encoding and decoding to improve the performance, whose cost is that a topic cannot be used by mixed Pulsar clients and Kafka clients. | kafka,<br> pulsar | kafka |
| entryFormat | The format of an entry. If it is set to`kafka`, there is no unnecessary encoding and decoding work, which helps improve the performance. However, in this situation, a topic cannot be used by mixed Pulsar clients and Kafka clients. If it is set to `mixed_kafka`, Kafka clients (0.10.x) are supported. <br>- **Note**: Compared with performance for `mixed_kafka`, performance is improved by 2 to 3 times when the parameter is set to `kafka`. | kafka, <br> mixed_kafka,<br> pulsar | kafka |
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
| maxReadEntriesNum | The maximum number of entries that are read from the cursor once per time.<br>Increasing this value can make FETCH request read more bytes each time.<br>**NOTE**: Currently, KoP does not check the maximum byte limit. Therefore, if the value is too great, the response size may be over the network limit. | | 5 |

## Network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.AbortedIndexEntry;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.format.EncodeRequest;
import io.streamnative.pulsar.handlers.kop.format.EncodeResult;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.format.KafkaMixedEntryFormatter;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
Expand Down Expand Up @@ -90,6 +93,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -321,7 +325,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.topicManager = new KafkaTopicManager(this);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat());
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig);
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
Expand Down Expand Up @@ -914,19 +918,22 @@ private void completeSendOperationForThrottling(long msgSize) {
}

private void publishMessages(final Optional<PersistentTopic> persistentTopicOpt,
final ByteBuf byteBuf,
final int numMessages,
final MemoryRecords records,
final EncodeResult encodeResult,
final TopicPartition topicPartition,
final Consumer<Long> offsetConsumer,
final Consumer<Errors> errorsConsumer) {
final MemoryRecords records = encodeResult.getRecords();
final int numMessages = encodeResult.getNumMessages();
final ByteBuf byteBuf = encodeResult.getEncodedByteBuf();
if (!persistentTopicOpt.isPresent()) {
encodeResult.recycle();
// It will trigger a retry send of Kafka client
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}
PersistentTopic persistentTopic = persistentTopicOpt.get();
if (persistentTopic.isSystemTopic()) {
encodeResult.recycle();
log.error("Not support producing message to system topic: {}", persistentTopic);
errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION);
return;
Expand All @@ -948,7 +955,7 @@ private void publishMessages(final Optional<PersistentTopic> persistentTopicOpt,
final RecordBatch batch = records.batchIterator().next();
offsetFuture.whenComplete((offset, e) -> {
completeSendOperationForThrottling(byteBuf.readableBytes());
byteBuf.release();
encodeResult.recycle();
if (e == null) {
if (batch.isTransactional()) {
getTransactionCoordinator().addActivePidOffset(TopicName.get(partitionName), batch.producerId(),
Expand Down Expand Up @@ -1071,17 +1078,6 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
final long beforeRecordsProcess = MathUtils.nowInNano();
final MemoryRecords validRecords =
validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records);
final int numMessages = EntryFormatter.parseNumMessages(validRecords);
final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(byteBuf.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

final CompletableFuture<Optional<PersistentTopic>> topicFuture =
topicManager.getTopic(fullPartitionName);
Expand All @@ -1098,8 +1094,31 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
}

final Consumer<Optional<PersistentTopic>> persistentTopicConsumer = persistentTopicOpt -> {
publishMessages(persistentTopicOpt, byteBuf, numMessages, validRecords, topicPartition,
offsetConsumer, errorsConsumer);
if (!persistentTopicOpt.isPresent()) {
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}

final EncodeRequest encodeRequest = EncodeRequest.get(validRecords);
if (entryFormatter instanceof KafkaMixedEntryFormatter) {
final ManagedLedger managedLedger = persistentTopicOpt.get().getManagedLedger();
final long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger);
encodeRequest.setBaseOffset(logEndOffset);
}

final EncodeResult encodeResult = entryFormatter.encode(encodeRequest);
encodeRequest.recycle();
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(encodeResult.getEncodedByteBuf().readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

publishMessages(persistentTopicOpt, encodeResult, topicPartition, offsetConsumer, errorsConsumer);
};

if (topicFuture.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {

@FieldContext(
category = CATEGORY_KOP,
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]"
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka, mixed_kafka]"
)
private String entryFormat = "pulsar";

Expand Down Expand Up @@ -388,6 +388,14 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private Set<String> kopAllowedNamespaces;

@FieldContext(
category = CATEGORY_KOP,
doc = "KOP server compression type. Only used for entryFormat=mixed_kafka. If it's not set to none, "
+ "the client messages will be used compression type which configured in here.\n"
+ "The supported compression types are: [\"none\", \"gzip\", \"snappy\", \"lz4\"]"
)
private String kafkaCompressionType = "none";

private String checkAdvertisedListeners(String advertisedListeners) {
StringBuilder listenersReBuilder = new StringBuilder();
for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void complete() {
if (resultFuture.isCancelled()) {
// The request was cancelled by KafkaCommandDecoder when channel is closed or this request is expired,
// so the Netty buffers should be released.
decodeResults.forEach(DecodeResult::release);
decodeResults.forEach(DecodeResult::recycle);
return;
}
if (resultFuture.isDone()) {
Expand Down Expand Up @@ -251,7 +251,7 @@ public void complete() {
fetchRequest.metadata().sessionId()),
() -> {
// release the batched ByteBuf if necessary
decodeResults.forEach(DecodeResult::release);
decodeResults.forEach(DecodeResult::recycle);
}));
recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopRecordsUtil;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -44,14 +40,13 @@ public abstract class AbstractEntryFormatter implements EntryFormatter {
// These key-value identifies the entry's format as kafka
public static final String IDENTITY_KEY = "entry.format";
public static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
private final Time time = Time.SYSTEM;

@Override
public DecodeResult decode(List<Entry> entries, byte magic) {
Optional<List<ByteBuf>> optionalByteBufs = Optional.empty();

// reset header information
final List<ByteBuf> orderedByteBuf = new ArrayList<>();

int totalSize = 0;
// batched ByteBuf should be released after sending to client
ByteBuf batchedByteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);
for (Entry entry : entries) {
try {
long startOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry);
Expand All @@ -63,66 +58,53 @@ public DecodeResult decode(List<Entry> entries, byte magic) {

// batch magic greater than the magic corresponding to the version requested by the client
// need down converted
if (batchMagic > magic || batchMagic != RecordBatch.MAGIC_VALUE_V2) {
if (batchMagic > magic) {
MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
//down converted, batch magic will be set to client magic
// down converted, batch magic will be set to client magic
ConvertedRecords<MemoryRecords> convertedRecords =
KopRecordsUtil.convertAndAssignOffsets(memoryRecords.batches(), magic, startOffset);
memoryRecords.downConvert(magic, startOffset, time);

final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));

totalSize += kafkaBuffer.readableBytes();
batchedByteBuf.writeBytes(kafkaBuffer);
kafkaBuffer.release();
if (log.isTraceEnabled()) {
log.trace("[{}:{}] convertAndAssignOffsets record for down converted"
+ " or assign offsets with v0 and v1 magic, start offset {},"
log.trace("[{}:{}] MemoryRecords down converted, start offset {},"
+ " entry magic: {}, client magic: {}",
entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
}

} else {
//not need down converted, batch magic retains the magic value written in production
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
// not need down converted, batch magic retains the magic value written in production
ByteBuf buf = byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes());
totalSize += buf.readableBytes();
batchedByteBuf.writeBytes(buf);
}
} else {
final DecodeResult decodeResult =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
final ByteBuf kafkaBuffer = decodeResult.getOrCreateByteBuf();
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
totalSize += kafkaBuffer.readableBytes();
batchedByteBuf.writeBytes(kafkaBuffer);
decodeResult.recycle();
}

// Almost all exceptions in Kafka inherit from KafkaException and will be captured
// and processed in KafkaApis. Here, whether it is down-conversion or the IOException
// in builder.appendWithOffset in decodePulsarEntryToKafkaRecords will be caught by Kafka
// and the KafkaException will be thrown. So we need to catch KafkaException here.
} catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e);
} finally {
entry.release();
}
}

// batched ByteBuf should be released after sending to client
int totalSize = orderedByteBuf.stream().mapToInt(ByteBuf::readableBytes).sum();
ByteBuf batchedByteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);

for (ByteBuf byteBuf : orderedByteBuf) {
batchedByteBuf.writeBytes(byteBuf);
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.forEach(ReferenceCounted::release));

// release entries
entries.forEach(Entry::release);
return new DecodeResult(
return DecodeResult.get(
MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(batchedByteBuf)), batchedByteBuf);
}

private static boolean isKafkaEntryFormat(final MessageMetadata messageMetadata) {
protected static boolean isKafkaEntryFormat(final MessageMetadata messageMetadata) {
final List<KeyValue> keyValues = messageMetadata.getPropertiesList();
for (KeyValue keyValue : keyValues) {
if (keyValue.hasKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,52 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.AllArgsConstructor;
import lombok.Data;
import io.netty.util.Recycler;
import lombok.Getter;
import lombok.NonNull;
import org.apache.kafka.common.record.MemoryRecords;

/**
* Result of decode in entry formatter.
*/
@Data
@AllArgsConstructor
public class DecodeResult {

private @NonNull MemoryRecords records;
@Getter
private MemoryRecords records;
private ByteBuf releasedByteBuf;

public DecodeResult(@NonNull MemoryRecords records) {
this.records = records;
private final Recycler.Handle<DecodeResult> recyclerHandle;

public static DecodeResult get(MemoryRecords records) {
return get(records, null);
}

public static DecodeResult get(MemoryRecords records,
ByteBuf releasedByteBuf) {
DecodeResult decodeResult = RECYCLER.get();
decodeResult.records = records;
decodeResult.releasedByteBuf = releasedByteBuf;
return decodeResult;
}

private DecodeResult(Recycler.Handle<DecodeResult> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

public void release() {
private static final Recycler<DecodeResult> RECYCLER = new Recycler<DecodeResult>() {
@Override
protected DecodeResult newObject(Recycler.Handle<DecodeResult> handle) {
return new DecodeResult(handle);
}
};

public void recycle() {
records = null;
if (releasedByteBuf != null) {
releasedByteBuf.release();
releasedByteBuf = null;
}
recyclerHandle.recycle(this);
}

public @NonNull ByteBuf getOrCreateByteBuf() {
Expand All @@ -47,4 +70,5 @@ public void release() {
return Unpooled.wrappedBuffer(records.buffer());
}
}

}
Loading