From 12f780809a9dcb32090faf5aab38360b091f9066 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Thu, 19 Aug 2021 17:32:39 +0800 Subject: [PATCH] [FEATURE] Add fetch down converted when entryFormat=kafka (#660) Fixes #656 ### Motivation When entryFormat=kafka If SASL/PLAIN authentication is not turned on, so kop does not limit the version of the producer client. When a lower version producer writes a message of magic=0 or magic=1, it will cause the higher version consumer to check the record error and the consumer client will be down. ### Modifications When entryFormat=kafka, increase KafkaEntryFormatter.decode to check batch.magic and client magic of kafka records. When batch.magic is higher than client magic, perform down conversion. This pr is part of the support for the lower version of Kafka less than 0.11.x. Since the FETCH version is still 4, it is also a bugfix for the higher version consumer to read the lower version of the magic record verification error. --- kafka-0-10/pom.xml | 73 +++++++++++++++++++ .../client/zero/ten/Consumer010Impl.java | 38 ++++++++++ .../client/zero/ten/Producer010Impl.java | 39 ++++++++++ .../kafka/client/zero/ten/package-info.java | 14 ++++ .../kafka/client/api/ConsumerRecord.java | 16 ++++ .../kafka/client/api/KafkaVersion.java | 2 +- .../kafka/client/api/ProduceContext.java | 24 +++++- kafka-client-factory/pom.xml | 5 ++ .../client/api/KafkaClientFactoryImpl.java | 6 ++ .../kop/format/KafkaEntryFormatter.java | 41 ++++++++++- .../handlers/kop/utils/ByteBufUtils.java | 40 ++++++---- pom.xml | 1 + .../BasicEndToEndPulsarTest.java | 11 ++- .../compatibility/BasicEndToEndTestBase.java | 19 ++++- 14 files changed, 308 insertions(+), 21 deletions(-) create mode 100644 kafka-0-10/pom.xml create mode 100644 kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java create mode 100644 kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java create mode 100644 kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java diff --git a/kafka-0-10/pom.xml b/kafka-0-10/pom.xml new file mode 100644 index 0000000000..cc67c47dae --- /dev/null +++ b/kafka-0-10/pom.xml @@ -0,0 +1,73 @@ + + + + + pulsar-protocol-handler-kafka-parent + io.streamnative.pulsar.handlers + 2.9.0-SNAPSHOT + + 4.0.0 + + kafka-0-10 + StreamNative :: Pulsar Protocol Handler :: Kafka Client 0.10.0.0 + The Kafka client wrapper for 0.10.0.0 + + + + io.streamnative.pulsar.handlers + kafka-client-api + 2.9.0-SNAPSHOT + + + org.apache.kafka + kafka-clients + 0.10.0.0 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${mavem-shade-plugin.version} + + + package + + shade + + + + + org.apache.kafka.common + org.apache.kafka010.common + + + org.apache.kafka.clients + org.apache.kafka010.clients + + + + + + + + + diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java new file mode 100644 index 0000000000..21325fffb1 --- /dev/null +++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kafka.client.zero.ten; + +import io.streamnative.kafka.client.api.Consumer; +import io.streamnative.kafka.client.api.ConsumerConfiguration; +import io.streamnative.kafka.client.api.ConsumerRecord; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +/** + * The implementation of Kafka consumer 0.10.0.0. + */ +public class Consumer010Impl extends KafkaConsumer implements Consumer { + + public Consumer010Impl(final ConsumerConfiguration conf) { + super(conf.toProperties()); + } + + @Override + public List> receive(long timeoutMs) { + final List> records = new ArrayList<>(); + poll(timeoutMs).forEach(record -> records.add(ConsumerRecord.createOldRecord(record))); + return records; + } +} diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java new file mode 100644 index 0000000000..26981237f5 --- /dev/null +++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kafka.client.zero.ten; + +import io.streamnative.kafka.client.api.ProduceContext; +import io.streamnative.kafka.client.api.Producer; +import io.streamnative.kafka.client.api.ProducerConfiguration; +import io.streamnative.kafka.client.api.RecordMetadata; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * The implementation of Kafka producer 0.10.0.0. + */ +public class Producer010Impl extends KafkaProducer implements Producer { + + public Producer010Impl(final ProducerConfiguration conf) { + super(conf.toProperties()); + } + + @SuppressWarnings("unchecked") + @Override + public Future sendAsync(final ProduceContext context) { + send(context.createProducerRecord(ProducerRecord.class), context::complete); + return context.getFuture(); + } +} diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java new file mode 100644 index 0000000000..093de2fa55 --- /dev/null +++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java @@ -0,0 +1,14 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kafka.client.zero.ten; diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java index e65a7407c2..f9265df853 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java @@ -13,6 +13,7 @@ */ package io.streamnative.kafka.client.api; +import java.util.ArrayList; import java.util.List; import lombok.AllArgsConstructor; import lombok.Getter; @@ -45,4 +46,19 @@ public static ConsumerRecord create(T originalRecord) { (long) ReflectionUtils.invoke(clazz, "offset", originalRecord), headers); } + + //support kafka message before 0.11.x + public static ConsumerRecord createOldRecord(T originalRecord) { + final Class clazz = originalRecord.getClass(); + + final List
headerList = new ArrayList<>(); + headerList.add(new Header(null, null)); + + return new ConsumerRecord<>((K) ReflectionUtils.invoke(clazz, "key", originalRecord), + (V) ReflectionUtils.invoke(clazz, "value", originalRecord), + (String) ReflectionUtils.invoke(clazz, "topic", originalRecord), + (int) ReflectionUtils.invoke(clazz, "partition", originalRecord), + (long) ReflectionUtils.invoke(clazz, "offset", originalRecord), + headerList); + } } diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java index 0ac4ebabc9..36574ea502 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java @@ -24,7 +24,7 @@ */ public enum KafkaVersion { - DEFAULT("default"), KAFKA_1_0_0("100"); + DEFAULT("default"), KAFKA_1_0_0("100"), KAFKA_0_10_0_0("010"); @Getter private String name; diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java index 7ad8be3922..6887939cae 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java @@ -38,7 +38,8 @@ public class ProduceContext { private final CompletableFuture future = new CompletableFuture<>(); /** - * Create an instance of Kafka's ProducerRecord. + * Create an instance of Kafka's ProducerRecord + * for kafka version higher than or equal to 0.11.x. * * @param clazz the class type of Kafka's ProducerRecord * @param headerConstructor the constructor of Kafka's Header implementation @@ -60,6 +61,27 @@ public T createProducerRecord(final Class clazz, } } + /** + * Create an instance of Kafka's ProducerRecord less than 0.11.x. + * Because there is no header in ProducerRecord before 0.11.x. + * + * @param clazz the class type of Kafka's ProducerRecord + * @param it should be org.apache.kafka.clients.producer.ProducerRecord + * @return an instance of org.apache.kafka.clients.producer.ProducerRecord + */ + public T createProducerRecord(final Class clazz) { + try { + return clazz.getConstructor( + String.class, Integer.class, Long.class, Object.class, Object.class + ).newInstance(topic, partition, timestamp, key, value); + } catch (InstantiationException + | IllegalAccessException + | InvocationTargetException + | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + } + /** * Complete the internal `future` field. * diff --git a/kafka-client-factory/pom.xml b/kafka-client-factory/pom.xml index 039a6d23d1..2dee20af88 100644 --- a/kafka-client-factory/pom.xml +++ b/kafka-client-factory/pom.xml @@ -39,5 +39,10 @@ kafka-1-0 2.9.0-SNAPSHOT + + io.streamnative.pulsar.handlers + kafka-0-10 + 2.9.0-SNAPSHOT + \ No newline at end of file diff --git a/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java b/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java index 26129bc98a..310d5f8f3d 100644 --- a/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java +++ b/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java @@ -15,6 +15,8 @@ import io.streamnative.kafka.client.one.zero.ConsumerImpl; import io.streamnative.kafka.client.one.zero.ProducerImpl; +import io.streamnative.kafka.client.zero.ten.Consumer010Impl; +import io.streamnative.kafka.client.zero.ten.Producer010Impl; /** * The factory class to create Kafka producers or consumers with a specific version. @@ -31,6 +33,8 @@ public KafkaClientFactoryImpl(final KafkaVersion kafkaVersion) { public Producer createProducer(final ProducerConfiguration conf) { if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) { return new ProducerImpl<>(conf); + } else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) { + return new Producer010Impl<>(conf); } throw new IllegalArgumentException("No producer for version: " + kafkaVersion); } @@ -39,6 +43,8 @@ public Producer createProducer(final ProducerConfiguration conf) { public Consumer createConsumer(final ConsumerConfiguration conf) { if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) { return new ConsumerImpl<>(conf); + } else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) { + return new Consumer010Impl<>(conf); } throw new IllegalArgumentException("No consumer for version: " + kafkaVersion); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java index 449d5eff3a..33e7097f1a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java @@ -28,7 +28,10 @@ import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.ConvertedRecords; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.Time; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -43,6 +46,8 @@ public class KafkaEntryFormatter implements EntryFormatter { // These key-value identifies the entry's format as kafka private static final String IDENTITY_KEY = "entry.format"; private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase(); + // Kafka MemoryRecords downConvert method needs time + private static final Time time = Time.SYSTEM; @Override public ByteBuf encode(MemoryRecords records, int numMessages) { @@ -61,15 +66,41 @@ public DecodeResult decode(List entries, byte magic) { // reset header information final List orderedByteBuf = new ArrayList<>(); + for (Entry entry : entries) { try { long startOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry); final ByteBuf byteBuf = entry.getDataBuffer(); final MessageMetadata metadata = Commands.parseMessageMetadata(byteBuf); if (isKafkaEntryFormat(metadata)) { + byte batchMagic = byteBuf.getByte(byteBuf.readerIndex() + MAGIC_OFFSET); byteBuf.setLong(byteBuf.readerIndex() + OFFSET_OFFSET, startOffset); - byteBuf.setByte(byteBuf.readerIndex() + MAGIC_OFFSET, magic); - orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes())); + + // batch magic greater than the magic corresponding to the version requested by the client + // need down converted + if (batchMagic > magic) { + MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf)); + //down converted, batch magic will be set to client magic + ConvertedRecords convertedRecords = + memoryRecords.downConvert(magic, startOffset, time); + + final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer()); + orderedByteBuf.add(kafkaBuffer); + if (!optionalByteBufs.isPresent()) { + optionalByteBufs = Optional.of(new ArrayList<>()); + } + optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf)); + optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer)); + + if (log.isTraceEnabled()) { + log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}" + , entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic); + } + + } else { + //not need down converted, batch magic retains the magic value written in production + orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes())); + } } else { final MemoryRecords records = ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic); @@ -80,7 +111,11 @@ public DecodeResult decode(List entries, byte magic) { } optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer)); } - } catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry + // Almost all exceptions in Kafka inherit from KafkaException and will be captured + // and processed in KafkaApis. Here, whether it is down-conversion or the IOException + // in builder.appendWithOffset in decodePulsarEntryToKafkaRecords will be caught by Kafka + // and the KafkaException will be thrown. So we need to catch KafkaException here. + } catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e); entry.release(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java index e0e5bbb117..2372ea656e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java @@ -109,7 +109,6 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat new EndTransactionMarker(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE ? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0)); } - final int uncompressedSize = metadata.getUncompressedSize(); final CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); final ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); @@ -145,24 +144,39 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat final ByteBuffer value = singleMessageMetadata.isNullValue() ? null : getNioBuffer(singleMessagePayload); - final Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); - builder.appendWithOffset(baseOffset + i, - timestamp, - getKeyByteBuffer(singleMessageMetadata), - value, - headers); + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + final Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); + builder.appendWithOffset(baseOffset + i, + timestamp, + getKeyByteBuffer(singleMessageMetadata), + value, + headers); + } else { + // record less than magic=2, no header attribute + builder.appendWithOffset(baseOffset + i, + timestamp, + getKeyByteBuffer(singleMessageMetadata), + value); + } singleMessagePayload.release(); } } else { final long timestamp = (metadata.getEventTime() > 0) ? metadata.getEventTime() : metadata.getPublishTime(); - final Header[] headers = getHeadersFromMetadata(metadata.getPropertiesList()); - builder.appendWithOffset(baseOffset, - timestamp, - getKeyByteBuffer(metadata), - getNioBuffer(uncompressedPayload), - headers); + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + final Header[] headers = getHeadersFromMetadata(metadata.getPropertiesList()); + builder.appendWithOffset(baseOffset, + timestamp, + getKeyByteBuffer(metadata), + getNioBuffer(uncompressedPayload), + headers); + } else { + builder.appendWithOffset(baseOffset, + timestamp, + getKeyByteBuffer(metadata), + getNioBuffer(uncompressedPayload)); + } } final MemoryRecords records = builder.build(); diff --git a/pom.xml b/pom.xml index 390332d777..d1689c8295 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ + kafka-0-10 kafka-1-0 kafka-client-api kafka-client-factory diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java index 666b790e89..2eee06c4fc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java @@ -69,7 +69,12 @@ public void testKafkaProducePulsarConsume() throws Exception { .build(); keys.add(record.getKey()); values.add(record.getValue()); - headers.add(record.getHeaders().get(0)); + // message has no header before Kafka 0.11.x version + if (version.equals(KafkaVersion.KAFKA_0_10_0_0)) { + headers.add(null); + } else { + headers.add(record.getHeaders().get(0)); + } final RecordMetadata metadata = record.sendAsync().get(); log.info("Kafka client {} sent {} to {}", version, record.getValue(), metadata); @@ -139,7 +144,9 @@ public void testPulsarProduceKafkaConsume() throws Exception { Assert.assertEquals(records.size(), 1); Assert.assertEquals(records.get(0).getValue(), value); Assert.assertEquals(records.get(0).getKey(), key); - Assert.assertEquals(records.get(0).getHeaders().get(0), header); + if (!version.equals(KafkaVersion.KAFKA_0_10_0_0)) { + Assert.assertEquals(records.get(0).getHeaders().get(0), header); + } consumer.close(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java index 0b870ee5d7..8634a0f00c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java @@ -72,6 +72,15 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + /** + * Test Kafka production and Kafka consumption. + * after kafka client 0.11.x versions, producer will send record with magic=2. + * Therefore, the previous test will not appear issue: https://github.com/streamnative/kop/issues/656 + * After introducing the kafka 0-10 module, we can reuse the current test, + * Because 0.10 will produce a message with magic=1, and the kafka-1-0 and default modules + * will use the apiVersion corresponding to magic=2 to send FETCH requests + * @throws Exception + */ protected void testKafkaProduceKafkaConsume() throws Exception { final String topic = "test-kafka-produce-kafka-consume"; admin.topics().createPartitionedTopic(topic, 1); @@ -101,7 +110,10 @@ protected void testKafkaProduceKafkaConsume() throws Exception { value = "value-from-" + version.name() + offset; keys.add(key); values.add(value); - headers.add(new Header("header-" + key, "header-" + value)); + // Because there is no header in ProducerRecord before 0.11.x. + if (!version.equals(KafkaVersion.KAFKA_0_10_0_0)) { + headers.add(new Header("header-" + key, "header-" + value)); + } metadata = producer.newContextBuilder(topic, value) .key(key) @@ -119,6 +131,11 @@ protected void testKafkaProduceKafkaConsume() throws Exception { } for (KafkaVersion version : kafkaClientFactories.keySet()) { + // Due to some known issues, kop return the minimum Fetch apiVersion is 4, + // kafka client versions before 0.11.x not support apiVersion=4 + if (version.equals(KafkaVersion.KAFKA_0_10_0_0)) { + continue; + } final Consumer consumer = kafkaClientFactories.get(version) .createConsumer(consumerConfiguration(version)); consumer.subscribe(topic);