Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[FEATURE] Add fetch down converted when entryFormat=kafka (#660)
Browse files Browse the repository at this point in the history
Fixes #656 

### Motivation
When entryFormat=kafka

If SASL/PLAIN authentication is not turned on, so kop does not limit the version of the producer client. When a lower version producer writes a message of magic=0 or magic=1, it will cause the higher version consumer to check the record error and the consumer client will be down.

### Modifications
When entryFormat=kafka, increase KafkaEntryFormatter.decode to check batch.magic and client magic of kafka records. When batch.magic is higher than client magic, perform down conversion.

This pr is part of the support for the lower version of Kafka less than 0.11.x. Since the FETCH version is still 4, it is also a bugfix for the higher version consumer to read the lower version of the magic record verification error.
  • Loading branch information
wenbingshen committed Aug 19, 2021
1 parent 6faba39 commit 12f7808
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 21 deletions.
73 changes: 73 additions & 0 deletions kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pulsar-protocol-handler-kafka-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-0-10</artifactId>
<name>StreamNative :: Pulsar Protocol Handler :: Kafka Client 0.10.0.0</name>
<description>The Kafka client wrapper for 0.10.0.0</description>

<dependencies>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>kafka-client-api</artifactId>
<version>2.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${mavem-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.kafka.common</pattern>
<shadedPattern>org.apache.kafka010.common</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka.clients</pattern>
<shadedPattern>org.apache.kafka010.clients</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kafka.client.zero.ten;

import io.streamnative.kafka.client.api.Consumer;
import io.streamnative.kafka.client.api.ConsumerConfiguration;
import io.streamnative.kafka.client.api.ConsumerRecord;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* The implementation of Kafka consumer 0.10.0.0.
*/
public class Consumer010Impl<K, V> extends KafkaConsumer<K, V> implements Consumer<K, V> {

public Consumer010Impl(final ConsumerConfiguration conf) {
super(conf.toProperties());
}

@Override
public List<ConsumerRecord<K, V>> receive(long timeoutMs) {
final List<ConsumerRecord<K, V>> records = new ArrayList<>();
poll(timeoutMs).forEach(record -> records.add(ConsumerRecord.createOldRecord(record)));
return records;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kafka.client.zero.ten;

import io.streamnative.kafka.client.api.ProduceContext;
import io.streamnative.kafka.client.api.Producer;
import io.streamnative.kafka.client.api.ProducerConfiguration;
import io.streamnative.kafka.client.api.RecordMetadata;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* The implementation of Kafka producer 0.10.0.0.
*/
public class Producer010Impl<K, V> extends KafkaProducer<K, V> implements Producer<K, V> {

public Producer010Impl(final ProducerConfiguration conf) {
super(conf.toProperties());
}

@SuppressWarnings("unchecked")
@Override
public Future<RecordMetadata> sendAsync(final ProduceContext<K, V> context) {
send(context.createProducerRecord(ProducerRecord.class), context::complete);
return context.getFuture();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kafka.client.zero.ten;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.kafka.client.api;

import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -45,4 +46,19 @@ public static <K, V, T> ConsumerRecord<K, V> create(T originalRecord) {
(long) ReflectionUtils.invoke(clazz, "offset", originalRecord),
headers);
}

//support kafka message before 0.11.x
public static <K, V, T> ConsumerRecord<K, V> createOldRecord(T originalRecord) {
final Class<?> clazz = originalRecord.getClass();

final List<Header> headerList = new ArrayList<>();
headerList.add(new Header(null, null));

return new ConsumerRecord<>((K) ReflectionUtils.invoke(clazz, "key", originalRecord),
(V) ReflectionUtils.invoke(clazz, "value", originalRecord),
(String) ReflectionUtils.invoke(clazz, "topic", originalRecord),
(int) ReflectionUtils.invoke(clazz, "partition", originalRecord),
(long) ReflectionUtils.invoke(clazz, "offset", originalRecord),
headerList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public enum KafkaVersion {

DEFAULT("default"), KAFKA_1_0_0("100");
DEFAULT("default"), KAFKA_1_0_0("100"), KAFKA_0_10_0_0("010");

@Getter
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class ProduceContext<K, V> {
private final CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

/**
* Create an instance of Kafka's ProducerRecord.
* Create an instance of Kafka's ProducerRecord
* for kafka version higher than or equal to 0.11.x.
*
* @param clazz the class type of Kafka's ProducerRecord
* @param headerConstructor the constructor of Kafka's Header implementation
Expand All @@ -60,6 +61,27 @@ public <T, HeaderT> T createProducerRecord(final Class<T> clazz,
}
}

/**
* Create an instance of Kafka's ProducerRecord less than 0.11.x.
* Because there is no header in ProducerRecord before 0.11.x.
*
* @param clazz the class type of Kafka's ProducerRecord
* @param <T> it should be org.apache.kafka.clients.producer.ProducerRecord
* @return an instance of org.apache.kafka.clients.producer.ProducerRecord
*/
public <T> T createProducerRecord(final Class<T> clazz) {
try {
return clazz.getConstructor(
String.class, Integer.class, Long.class, Object.class, Object.class
).newInstance(topic, partition, timestamp, key, value);
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Complete the internal `future` field.
*
Expand Down
5 changes: 5 additions & 0 deletions kafka-client-factory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@
<artifactId>kafka-1-0</artifactId>
<version>2.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>kafka-0-10</artifactId>
<version>2.9.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.streamnative.kafka.client.one.zero.ConsumerImpl;
import io.streamnative.kafka.client.one.zero.ProducerImpl;
import io.streamnative.kafka.client.zero.ten.Consumer010Impl;
import io.streamnative.kafka.client.zero.ten.Producer010Impl;

/**
* The factory class to create Kafka producers or consumers with a specific version.
Expand All @@ -31,6 +33,8 @@ public KafkaClientFactoryImpl(final KafkaVersion kafkaVersion) {
public <K, V> Producer<K, V> createProducer(final ProducerConfiguration conf) {
if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) {
return new ProducerImpl<>(conf);
} else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) {
return new Producer010Impl<>(conf);
}
throw new IllegalArgumentException("No producer for version: " + kafkaVersion);
}
Expand All @@ -39,6 +43,8 @@ public <K, V> Producer<K, V> createProducer(final ProducerConfiguration conf) {
public <K, V> Consumer<K, V> createConsumer(final ConsumerConfiguration conf) {
if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) {
return new ConsumerImpl<>(conf);
} else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) {
return new Consumer010Impl<>(conf);
}
throw new IllegalArgumentException("No consumer for version: " + kafkaVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -43,6 +46,8 @@ public class KafkaEntryFormatter implements EntryFormatter {
// These key-value identifies the entry's format as kafka
private static final String IDENTITY_KEY = "entry.format";
private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
// Kafka MemoryRecords downConvert method needs time
private static final Time time = Time.SYSTEM;

@Override
public ByteBuf encode(MemoryRecords records, int numMessages) {
Expand All @@ -61,15 +66,41 @@ public DecodeResult decode(List<Entry> entries, byte magic) {

// reset header information
final List<ByteBuf> orderedByteBuf = new ArrayList<>();

for (Entry entry : entries) {
try {
long startOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry);
final ByteBuf byteBuf = entry.getDataBuffer();
final MessageMetadata metadata = Commands.parseMessageMetadata(byteBuf);
if (isKafkaEntryFormat(metadata)) {
byte batchMagic = byteBuf.getByte(byteBuf.readerIndex() + MAGIC_OFFSET);
byteBuf.setLong(byteBuf.readerIndex() + OFFSET_OFFSET, startOffset);
byteBuf.setByte(byteBuf.readerIndex() + MAGIC_OFFSET, magic);
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));

// batch magic greater than the magic corresponding to the version requested by the client
// need down converted
if (batchMagic > magic) {
MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
//down converted, batch magic will be set to client magic
ConvertedRecords<MemoryRecords> convertedRecords =
memoryRecords.downConvert(magic, startOffset, time);

final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf));
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));

if (log.isTraceEnabled()) {
log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}"
, entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
}

} else {
//not need down converted, batch magic retains the magic value written in production
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
}
} else {
final MemoryRecords records =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
Expand All @@ -80,7 +111,11 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
}
} catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry
// Almost all exceptions in Kafka inherit from KafkaException and will be captured
// and processed in KafkaApis. Here, whether it is down-conversion or the IOException
// in builder.appendWithOffset in decodePulsarEntryToKafkaRecords will be caught by Kafka
// and the KafkaException will be thrown. So we need to catch KafkaException here.
} catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e);
entry.release();
}
Expand Down
Loading

0 comments on commit 12f7808

Please sign in to comment.