Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] Support kafka LogValidator validate inner records and compression codec when handle producer request with entryFormat=kafka #791

Merged
merged 37 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5d08a6c
Support kafka LogValidator validator inner records and compression co…
wenbingshen Oct 7, 2021
1ed687f
only need to validate when entryFormat=kafka
wenbingshen Oct 7, 2021
af595ef
add FieldContext doc
wenbingshen Oct 7, 2021
f07f484
add KopLogValidator
wenbingshen Oct 7, 2021
6bb270d
add LongRef and CompressionCodec
wenbingshen Oct 7, 2021
5580cef
fix Codacy static code error
wenbingshen Oct 7, 2021
23545b0
fix checkstyle
wenbingshen Oct 7, 2021
4395153
merge master and resolve conflict
wenbingshen Oct 9, 2021
1453cbc
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/K…
wenbingshen Oct 9, 2021
015d174
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 10, 2021
e232f17
move validateMessagesAndAssignOffsets call to KafkaEntryFormatter#encode
wenbingshen Oct 10, 2021
75d6cc2
add test for LongRef and CompressionCodec
wenbingshen Oct 10, 2021
9a1362c
add MixedKafkaEntryFormatter support 0.10.x kafka clients
wenbingshen Oct 11, 2021
6365176
fix checkstyle error
wenbingshen Oct 11, 2021
1010d01
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/f…
wenbingshen Oct 12, 2021
a267fb3
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/f…
wenbingshen Oct 12, 2021
98456a5
Update kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/u…
wenbingshen Oct 12, 2021
ebe03bf
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 12, 2021
8e1350e
addressed reviewer's comments
wenbingshen Oct 12, 2021
41f513c
remove unnecessary AtomicReference
wenbingshen Oct 12, 2021
5713a66
Update docs/configuration.md
wenbingshen Oct 12, 2021
f0ff60b
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 12, 2021
d06ca93
Merge branch 'addLogValidator' of https://github.com/wenbingshen/kop …
wenbingshen Oct 12, 2021
9001b80
Update docs/configuration.md
wenbingshen Oct 13, 2021
38f6801
Merge remote-tracking branch 'origin/addLogValidator' into addLogVali…
wenbingshen Oct 14, 2021
5537fb1
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 14, 2021
0997a20
rename KafkaEntryFormatter and add Recycler to encode/decode result
wenbingshen Oct 14, 2021
12a9703
rename test class name
wenbingshen Oct 14, 2021
3f71139
fix Codacy Static Code
wenbingshen Oct 14, 2021
eb9e291
Fix the number of repeated releases of byteBuf is greater than the re…
wenbingshen Oct 14, 2021
e4b3638
Addressed reviewer's comments
wenbingshen Oct 15, 2021
fcb046d
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 15, 2021
1a3625a
fix checkstyle and Codacy
wenbingshen Oct 15, 2021
2b4fe46
fix checkstyle
wenbingshen Oct 15, 2021
4c955d5
Addressed comments
wenbingshen Oct 20, 2021
7f53e2b
Merge remote-tracking branch 'kop/master' into addLogValidator
wenbingshen Oct 20, 2021
c214740
fix codacy static code
wenbingshen Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.format.KafkaEntryFormatter;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
Expand All @@ -49,7 +50,9 @@
import io.streamnative.pulsar.handlers.kop.security.auth.SimpleAclAuthorizer;
import io.streamnative.pulsar.handlers.kop.stats.StatsLogger;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopLogValidator;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.LongRef;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
Expand All @@ -76,6 +79,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -84,6 +88,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand All @@ -97,12 +102,14 @@
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
Expand Down Expand Up @@ -233,6 +240,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final long resumeThresholdPendingBytes;
private final AtomicLong pendingBytes = new AtomicLong(0);
private volatile boolean autoReadDisabledPublishBufferLimiting = false;
private final String brokerCompressionType;

private String getCurrentTenant() {
if (kafkaConfig.isKafkaEnableMultiTenantMetadata()
Expand Down Expand Up @@ -312,6 +320,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.maxPendingBytes = kafkaConfig.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;
this.resumeThresholdPendingBytes = this.maxPendingBytes / 2;
this.failedAuthenticationDelayMs = kafkaConfig.getFailedAuthenticationDelayMs();
this.brokerCompressionType = kafkaConfig.getKafkaCompressionType();

// update alive channel count stats
RequestStats.ALIVE_CHANNEL_COUNT_INSTANCE.incrementAndGet();
Expand Down Expand Up @@ -1007,17 +1016,6 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
final long beforeRecordsProcess = MathUtils.nowInNano();
final MemoryRecords validRecords =
validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records);
final int numMessages = EntryFormatter.parseNumMessages(validRecords);
final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(byteBuf.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

final CompletableFuture<Optional<PersistentTopic>> topicFuture =
topicManager.getTopic(fullPartitionName);
Expand All @@ -1034,7 +1032,46 @@ private void handlePartitionRecords(final KafkaHeaderAndRequest produceHar,
}

final Consumer<Optional<PersistentTopic>> persistentTopicConsumer = persistentTopicOpt -> {
publishMessages(persistentTopicOpt, byteBuf, numMessages, validRecords, topicPartition,
if (!persistentTopicOpt.isPresent()) {
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}

final MemoryRecords finalValidRecords;
if (entryFormatter instanceof KafkaEntryFormatter) {
final KopLogValidator.CompressionCodec sourceCodec = getSourceCodec(validRecords);
final KopLogValidator.CompressionCodec targetCodec = getTargetCodec(sourceCodec);

ManagedLedger managedLedger = persistentTopicOpt.get().getManagedLedger();
long logEndOffset = MessageIdUtils.getLogEndOffset(managedLedger);
LongRef offset = new LongRef(logEndOffset);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved

finalValidRecords = KopLogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
System.currentTimeMillis(),
sourceCodec,
targetCodec,
false,
RecordBatch.MAGIC_VALUE_V2,
TimestampType.CREATE_TIME,
Long.MAX_VALUE);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
} else {
finalValidRecords = validRecords;
}

final int numMessages = EntryFormatter.parseNumMessages(finalValidRecords);
final ByteBuf byteBuf = entryFormatter.encode(finalValidRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);
startSendOperationForThrottling(byteBuf.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, "
+ "request size: {} ", ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

publishMessages(persistentTopicOpt, byteBuf, numMessages, finalValidRecords, topicPartition,
offsetConsumer, errorsConsumer);
};

Expand Down Expand Up @@ -2534,6 +2571,31 @@ private static MemoryRecords validateRecords(short version, TopicPartition topic
return validRecords;
}

private KopLogValidator.CompressionCodec getSourceCodec(MemoryRecords records) {
AtomicReference<KopLogValidator.CompressionCodec> sourceCodec =
new AtomicReference<>(new KopLogValidator.CompressionCodec(
CompressionType.NONE.name, CompressionType.NONE.id));
records.batches().forEach(batch -> {
CompressionType compressionType = CompressionType.forId(batch.compressionType().id);
KopLogValidator.CompressionCodec messageCodec = new KopLogValidator.CompressionCodec(
compressionType.name, compressionType.id);
if (!messageCodec.name().equals("none")) {
sourceCodec.set(messageCodec);
}
});

return sourceCodec.get();
}

private KopLogValidator.CompressionCodec getTargetCodec(KopLogValidator.CompressionCodec sourceCodec) {
if (brokerCompressionType.equals("producer")) {
return sourceCodec;
} else {
CompressionType compressionType = CompressionType.forName(brokerCompressionType);
return new KopLogValidator.CompressionCodec(compressionType.name, compressionType.id);
}
}

private void updateProducerStats(final TopicPartition topicPartition, final int numMessages, final int numBytes) {
requestStats.getStatsLogger()
.scopeLabel(TOPIC_SCOPE, topicPartition.topic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private Set<String> kopAllowedNamespaces;

@FieldContext(
category = CATEGORY_KOP,
doc = "KOP server compression type. Only used for entryFormat=kafka. If it's not set to producer. "
+ "The client messages will be used compression type which configured in here."
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
)
private String kafkaCompressionType = "producer";
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved

private String checkAdvertisedListeners(String advertisedListeners) {
StringBuilder listenersReBuilder = new StringBuilder();
for (String listener : advertisedListeners.split(EndPoint.END_POINT_SEPARATOR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.util.ReferenceCounted;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException;
import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopRecordsUtil;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -32,7 +31,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -47,6 +46,7 @@ public class KafkaEntryFormatter implements EntryFormatter {
// These key-value identifies the entry's format as kafka
private static final String IDENTITY_KEY = "entry.format";
private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
private final Time time = Time.SYSTEM;

@Override
public ByteBuf encode(MemoryRecords records, int numMessages) {
Expand Down Expand Up @@ -77,11 +77,11 @@ public DecodeResult decode(List<Entry> entries, byte magic) {

// batch magic greater than the magic corresponding to the version requested by the client
// need down converted
if (batchMagic > magic || batchMagic != RecordBatch.MAGIC_VALUE_V2) {
if (batchMagic > magic) {
MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
//down converted, batch magic will be set to client magic
ConvertedRecords<MemoryRecords> convertedRecords =
KopRecordsUtil.convertAndAssignOffsets(memoryRecords.batches(), magic, startOffset);
memoryRecords.downConvert(magic, startOffset, time);

final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
Expand All @@ -91,8 +91,7 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));

if (log.isTraceEnabled()) {
log.trace("[{}:{}] convertAndAssignOffsets record for down converted"
+ " or assign offsets with v0 and v1 magic, start offset {},"
log.trace("[{}:{}] MemoryRecords down converted, start offset {},"
+ " entry magic: {}, client magic: {}",
entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
}
Expand Down
Loading