diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java index 9d5199e81c..2180473174 100644 --- a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java +++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java @@ -13,14 +13,20 @@ */ package io.streamnative.kafka.client.zero.ten; +import com.google.common.collect.Maps; import io.streamnative.kafka.client.api.Consumer; import io.streamnative.kafka.client.api.ConsumerConfiguration; import io.streamnative.kafka.client.api.ConsumerRecord; +import io.streamnative.kafka.client.api.TopicOffsetAndMetadata; +import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; /** * The implementation of Kafka consumer 0.10.0.0. @@ -42,4 +48,16 @@ public List> receive(long timeoutMs) { public Map> listTopics(long timeoutMS) { return listTopics(); } + + @Override + public void commitOffsetSync(List offsets, Duration timeout) { + HashMap offsetsMap = Maps.newHashMap(); + offsets.forEach( + offsetAndMetadata -> offsetsMap.put( + offsetAndMetadata.createTopicPartition(TopicPartition.class), + offsetAndMetadata.createOffsetAndMetadata(OffsetAndMetadata.class) + ) + ); + commitSync(offsetsMap); + } } diff --git a/kafka-1-0/src/main/java/io/streamnative/kafka/client/one/zero/ConsumerImpl.java b/kafka-1-0/src/main/java/io/streamnative/kafka/client/one/zero/ConsumerImpl.java index 6a6d7a7638..d1fb8a09b8 100644 --- a/kafka-1-0/src/main/java/io/streamnative/kafka/client/one/zero/ConsumerImpl.java +++ b/kafka-1-0/src/main/java/io/streamnative/kafka/client/one/zero/ConsumerImpl.java @@ -13,14 +13,20 @@ */ package io.streamnative.kafka.client.one.zero; +import com.google.common.collect.Maps; import io.streamnative.kafka.client.api.Consumer; import io.streamnative.kafka.client.api.ConsumerConfiguration; import io.streamnative.kafka.client.api.ConsumerRecord; +import io.streamnative.kafka.client.api.TopicOffsetAndMetadata; +import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; /** * The implementation of Kafka consumer 1.0.0. @@ -42,4 +48,16 @@ public List> receive(long timeoutMs) { public Map> listTopics(long timeoutMS) { return listTopics(); } + + @Override + public void commitOffsetSync(List offsets, Duration timeout) { + HashMap offsetsMap = Maps.newHashMap(); + offsets.forEach( + offsetAndMetadata -> offsetsMap.put( + offsetAndMetadata.createTopicPartition(TopicPartition.class), + offsetAndMetadata.createOffsetAndMetadata(OffsetAndMetadata.class) + ) + ); + commitSync(offsetsMap); + } } diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Consumer.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Consumer.java index 5250c04e26..a47598cb8e 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Consumer.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Consumer.java @@ -14,6 +14,7 @@ package io.streamnative.kafka.client.api; import java.io.Closeable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,4 +62,6 @@ default List> receiveUntil(int maxNumMessages, long timeout } Map> listTopics(long timeoutMS); + + void commitOffsetSync(List offsets, Duration timeout); } diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerConfiguration.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerConfiguration.java index a00e64ff88..6ed09acc0e 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerConfiguration.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerConfiguration.java @@ -33,6 +33,8 @@ public class ConsumerConfiguration { private String userName; private String password; private String requestTimeoutMs; + private Boolean enableAutoCommit; + private String sessionTimeOutMs; public Properties toProperties() { final Properties props = new Properties(); @@ -64,6 +66,12 @@ public Properties toProperties() { if (requestTimeoutMs != null) { props.put("request.timeout.ms", requestTimeoutMs); } + if (enableAutoCommit != null) { + props.put("enable.auto.commit", enableAutoCommit); + } + if (sessionTimeOutMs != null) { + props.put("session.timeout.ms", sessionTimeOutMs); + } return props; } } diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Producer.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Producer.java index abf660d685..424bd8b3db 100644 --- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Producer.java +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/Producer.java @@ -23,9 +23,14 @@ public interface Producer extends AutoCloseable { Future sendAsync(ProduceContext context); default ProduceContext.ProduceContextBuilder newContextBuilder(String topic, V value) { + return newContextBuilder(topic, value, null); + } + + default ProduceContext.ProduceContextBuilder newContextBuilder(String topic, V value, Integer partition) { return ProduceContext.builder() .producer(this) .topic(topic) - .value(value); + .value(value) + .partition(partition); } } diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/TopicOffsetAndMetadata.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/TopicOffsetAndMetadata.java new file mode 100644 index 0000000000..da2e49ae42 --- /dev/null +++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/TopicOffsetAndMetadata.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kafka.client.api; + +import java.lang.reflect.InvocationTargetException; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * A completable class of org.apache.kafka.clients.consumer.OffsetAndMetadata. + */ +@AllArgsConstructor +@Getter +public class TopicOffsetAndMetadata { + + private String topic; + private int partition; + private long offset; + + public T createTopicPartition(final Class clazz) { + try { + return clazz.getConstructor( + String.class, int.class + ).newInstance(topic, partition); + } catch (InvocationTargetException + | InstantiationException + | IllegalAccessException + | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + } + + public T createOffsetAndMetadata(final Class clazz) { + try { + return clazz.getConstructor( + long.class + ).newInstance(offset); + } catch (InstantiationException + | IllegalAccessException + | InvocationTargetException + | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 5c1a7d4e72..40cb0b4067 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -369,11 +369,6 @@ protected ApiVersionsResponse overloadDefaultApiVersionsResponse(boolean unsuppo for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey.minRequiredInterBrokerMagic <= RecordBatch.CURRENT_MAGIC_VALUE) { switch (apiKey) { - case FETCH: - // V4 added MessageSets responses. We need to make sure RecordBatch format is not used - versionList.add(new ApiVersionsResponse.ApiVersion((short) 1, (short) 4, - apiKey.latestVersion())); - break; case LIST_OFFSETS: // V0 is needed for librdkafka versionList.add(new ApiVersionsResponse.ApiVersion((short) 2, (short) 0, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java index 0b9bf6efd9..2f761eb5b4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java @@ -21,6 +21,7 @@ import io.netty.util.ReferenceCounted; import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException; import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; +import io.streamnative.pulsar.handlers.kop.utils.KopRecordsUtil; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.util.ArrayList; @@ -31,7 +32,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.record.ConvertedRecords; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.record.RecordBatch; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -46,8 +47,6 @@ public class KafkaEntryFormatter implements EntryFormatter { // These key-value identifies the entry's format as kafka private static final String IDENTITY_KEY = "entry.format"; private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase(); - // Kafka MemoryRecords downConvert method needs time - private static final Time time = Time.SYSTEM; @Override public ByteBuf encode(MemoryRecords records, int numMessages) { @@ -78,23 +77,24 @@ public DecodeResult decode(List entries, byte magic) { // batch magic greater than the magic corresponding to the version requested by the client // need down converted - if (batchMagic > magic) { + if (batchMagic > magic || batchMagic != RecordBatch.MAGIC_VALUE_V2) { MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf)); //down converted, batch magic will be set to client magic ConvertedRecords convertedRecords = - memoryRecords.downConvert(magic, startOffset, time); + KopRecordsUtil.convertAndAssignOffsets(memoryRecords.batches(), magic, startOffset); final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer()); orderedByteBuf.add(kafkaBuffer); if (!optionalByteBufs.isPresent()) { optionalByteBufs = Optional.of(new ArrayList<>()); } - optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf)); optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer)); if (log.isTraceEnabled()) { - log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}" - , entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic); + log.trace("[{}:{}] convertAndAssignOffsets record for down converted" + + " or assign offsets with v0 and v1 magic, start offset {}," + + " entry magic: {}, client magic: {}", + entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic); } } else { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopRecordsUtil.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopRecordsUtil.java new file mode 100644 index 0000000000..a126360f02 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopRecordsUtil.java @@ -0,0 +1,140 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.ConvertedRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; + +/** + * Utils for DownConverted and ReAssignOffset operations. + */ +@Slf4j +public class KopRecordsUtil { + + public static ConvertedRecords convertAndAssignOffsets(Iterable batches, + byte toMagic, + long firstOffset) throws IOException { + // maintain the batch along with the decompressed records to avoid the need to decompress again + List recordBatchAndRecordsList = new ArrayList<>(); + int totalSizeEstimate = 0; + + long batchStartOffset = firstOffset; + for (RecordBatch batch : batches) { + byte toBatchMagic = toMagic; + if (toMagic < RecordBatch.MAGIC_VALUE_V2) { + if (batch.isControlBatch()) { + continue; + } + + if (batch.compressionType().name.equals("zstd")) { + throw new IOException("Down-conversion of zstandard-compressed batches " + + "is not supported"); + } + } + + List records = new ArrayList<>(); + long batchIndex = 0; + for (Record record : batch) { + records.add(record); + batchIndex++; + } + + if (records.isEmpty()) { + continue; + } + + // handle the batch.magic() <= toMagic case + // Since the internal message set of magic 0 and magic 1 has an offset, + // the internal offset may be destroyed, so we still need + // to deal with the message of batch.magic <= toMagic. + // The only thing that remains unchanged is to ensure that the magic remains unchanged. + if (batch.magic() < toMagic) { + toBatchMagic = batch.magic(); + } + + totalSizeEstimate += AbstractRecords.estimateSizeInBytes( + toBatchMagic, batchStartOffset, batch.compressionType(), records); + recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, batchStartOffset, toBatchMagic)); + batchStartOffset += batchIndex; + } + + ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate); + for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) { + MemoryRecordsBuilder builder = convertRecordBatch(buffer, recordBatchAndRecords); + buffer = builder.buffer(); + } + + buffer.flip(); + recordBatchAndRecordsList.clear(); + return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), null); + } + + private static MemoryRecordsBuilder convertRecordBatch(ByteBuffer buffer, + RecordBatchAndRecords recordBatchAndRecords) { + RecordBatch batch = recordBatchAndRecords.batch; + byte toBatchMagic = recordBatchAndRecords.toBatchMagic; + final TimestampType timestampType = batch.timestampType(); + long logAppendTime = timestampType + == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; + + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, toBatchMagic, batch.compressionType(), + timestampType, recordBatchAndRecords.baseOffset, logAppendTime); + + long startOffset = recordBatchAndRecords.baseOffset; + for (Record record : recordBatchAndRecords.records) { + if (toBatchMagic > RecordBatch.MAGIC_VALUE_V1) { + builder.appendWithOffset(startOffset++, + record.timestamp(), + record.key(), + record.value(), + record.headers()); + } else { + builder.appendWithOffset(startOffset++, + record.timestamp(), + record.key(), + record.value()); + } + } + + builder.close(); + return builder; + } + + private static class RecordBatchAndRecords { + private final RecordBatch batch; + private final List records; + private final Long baseOffset; + private final byte toBatchMagic; + + private RecordBatchAndRecords(RecordBatch batch, + List records, + Long baseOffset, + byte toBatchMagic) { + this.batch = batch; + this.records = records; + this.baseOffset = baseOffset; + this.toBatchMagic = toBatchMagic; + } + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndKafkaTest.java index d6babdd7cd..fb510803b6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndKafkaTest.java @@ -28,4 +28,9 @@ public BasicEndToEndKafkaTest() { protected void testKafkaProduceKafkaConsume() throws Exception { super.testKafkaProduceKafkaConsume(); } + + @Test(timeOut = 60000) + protected void testKafkaProduceKafkaCommitOffset() throws Exception { + super.testKafkaProduceKafkaCommitOffset(); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java index 2eee06c4fc..fdf2f2bef6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java @@ -29,11 +29,15 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.MathUtils; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.Hash; +import org.apache.pulsar.client.impl.JavaStringHash; import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.collections.Maps; /** * Basic end-to-end test for different versions of Kafka clients with `entryFormat=kafka`. @@ -50,6 +54,11 @@ protected void testKafkaProduceKafkaConsume() throws Exception { super.testKafkaProduceKafkaConsume(); } + @Test(timeOut = 60000) + protected void testKafkaProduceKafkaCommitOffset() throws Exception { + super.testKafkaProduceKafkaCommitOffset(); + } + @Test(timeOut = 30000) public void testKafkaProducePulsarConsume() throws Exception { final String topic = "test-kafka-produce-pulsar-consume"; @@ -151,6 +160,47 @@ public void testPulsarProduceKafkaConsume() throws Exception { } } + @Test(timeOut = 60000) + public void testPulsarProduceKafkaCommit() throws Exception { + final String topic = "test-pulsar-produce-kafka-commit-offset"; + int numPartitions = 5; + admin.topics().createPartitionedTopic(topic, numPartitions); + + Map> valuesMap = Maps.newHashMap(); + + int sends = 75; + + // 1.Produce messages with pulsar producer + org.apache.pulsar.client.api.Producer producer = + pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Hash hash = JavaStringHash.getInstance(); + for (int i = 0; i < sends; i++) { + final String key = "pulsar-key-" + i; + final String value = "pulsar-value-" + i; + + int partition = MathUtils.signSafeMod(hash.makeHash(key), numPartitions); + List values = valuesMap.computeIfAbsent(partition, k -> new ArrayList<>()); + values.add(value); + // Record the message corresponding to each partition, + // and later we will use it to compare the consumption results. + valuesMap.put(partition, values); + + producer.newMessage(Schema.STRING) + .key(key) + .value(value) + .send(); + } + + producer.close(); + + // 2.Consume messages with different kafka client versions + super.verifyManualCommitOffset(topic, sends, numPartitions, valuesMap); + + } + private static Header convertFirstPropertyToHeader(final Map properties) { if (properties == null || properties.isEmpty()) { return null; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java index 8634a0f00c..2baa0961e4 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java @@ -23,9 +23,14 @@ import io.streamnative.kafka.client.api.Producer; import io.streamnative.kafka.client.api.ProducerConfiguration; import io.streamnative.kafka.client.api.RecordMetadata; +import io.streamnative.kafka.client.api.TopicOffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; +import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -35,6 +40,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.collections.Maps; /** * Basic end-to-end test for different versions of Kafka clients. @@ -131,11 +137,6 @@ protected void testKafkaProduceKafkaConsume() throws Exception { } for (KafkaVersion version : kafkaClientFactories.keySet()) { - // Due to some known issues, kop return the minimum Fetch apiVersion is 4, - // kafka client versions before 0.11.x not support apiVersion=4 - if (version.equals(KafkaVersion.KAFKA_0_10_0_0)) { - continue; - } final Consumer consumer = kafkaClientFactories.get(version) .createConsumer(consumerConfiguration(version)); consumer.subscribe(topic); @@ -153,15 +154,154 @@ protected void testKafkaProduceKafkaConsume() throws Exception { .filter(Objects::nonNull) .collect(Collectors.toList()), keys); } - Assert.assertEquals(records.stream() - .map(ConsumerRecord::getHeaders) - .filter(Objects::nonNull) - .map(headerList -> headerList.get(0)) - .collect(Collectors.toList()), headers); + + // Because there is no header in ProducerRecord before 0.11.x. + if (!version.equals(KafkaVersion.KAFKA_0_10_0_0)) { + Assert.assertEquals(records.stream() + .map(ConsumerRecord::getHeaders) + .filter(Objects::nonNull) + .map(headerList -> headerList.get(0)) + .collect(Collectors.toList()), headers); + } consumer.close(); } } + protected void verifyManualCommitOffset(final String topic, + final int count, + final int numPartitions, + final Map> valuesMap) + throws IOException { + for (KafkaVersion version : kafkaClientFactories.keySet()) { + // 3.Forbidden to commit the offset automatically. + // We will manually submit the offset later. + Consumer consumer = kafkaClientFactories.get(version) + .createConsumer(consumerConfiguration(version, false)); + + consumer.subscribe(topic); + + // 4.Consume all messages + List> records = consumer.receiveUntil(count, 6000); + Assert.assertEquals(count, records.size()); + Map>> totalRecordsGroupByPartition = records.stream() + .collect(Collectors.groupingBy(ConsumerRecord::getPartition)); + + // 5.The results of consumption are grouped according to partitions, + // and the records of each partition are compared with the partition data recorded above. + totalRecordsGroupByPartition.keySet().forEach( + partition -> Assert.assertEquals( + totalRecordsGroupByPartition.get(partition).stream() + .map(ConsumerRecord::getValue) + .collect(Collectors.toList()), + valuesMap.get(partition)) + ); + + // 6.Group by partition, record the offset and value of all data in the partition, + // and later we will use it to compare with the consumption result after manually committing offsets. + Map> partitionAndOffsetValue = Maps.newConcurrentMap(); + records.forEach(record -> { + int partition = record.getPartition(); + long offset = record.getOffset(); + String value = record.getValue(); + + Map offsetAndValue = partitionAndOffsetValue + .computeIfAbsent(partition, k -> new HashMap<>()); + + offsetAndValue.put(offset, value); + partitionAndOffsetValue.put(partition, offsetAndValue); + }); + + // 7.Manually commit offset of each partition as the int value of the partition number, + // this is for the convenience of calculation + List commitOffsets = new ArrayList<>(); + + int messagesSize = count; + for (int i = 0; i < numPartitions; i++) { + commitOffsets.add(new TopicOffsetAndMetadata(topic, i, i)); + messagesSize -= i; + } + + // 8.Commit offset synchronously + consumer.commitOffsetSync(commitOffsets, Duration.ofMillis(10000)); + + // 9.Close current consumer + consumer.close(); + + // 10.Use the same consumer group to start a new consumer group, + // and consumers will start to consume from the offset manually committed above + consumer = kafkaClientFactories.get(version) + .createConsumer(consumerConfiguration(version, false)); + consumer.subscribe(topic); + + // 11.Consume messages and verify that the number of messages is the same + // as the number of messages calculated according to the commit offset + List> commitRecordList = consumer.receiveUntil(messagesSize, 12000); + Assert.assertEquals(messagesSize, commitRecordList.size()); + + // 12.The results of consumption are grouped according to partitions to facilitate + // the comparison between the results of consumption and the messages produced later + Map>> recordsGroupByPartition = commitRecordList.stream() + .collect(Collectors.groupingBy(ConsumerRecord::getPartition)); + + // 13.Compare the number of messages consumed by each partition + // is the same as the number calculated by manual commit offset + recordsGroupByPartition.keySet().forEach( + partition -> Assert.assertEquals(count / numPartitions - partition, + recordsGroupByPartition.getOrDefault(partition, Collections.emptyList()).size()) + ); + + // 14.Verify the accuracy of the offset and the corresponding message value consumed by each partition + commitRecordList.forEach(record -> { + Assert.assertEquals(partitionAndOffsetValue.get(record.getPartition()).get(record.getOffset()), + record.getValue()); + }); + + consumer.close(); + } + } + + protected void testKafkaProduceKafkaCommitOffset() throws Exception { + final String topic = "test-kafka-produce-kafka-commit-offset"; + int numPartitions = 5; + admin.topics().createPartitionedTopic(topic, numPartitions); + + Map> valuesMap = Maps.newHashMap(); + + // The number of messages sent by the producer of each version + long sends = 25; + + // Record the total number of messages + int count = 0; + // 1.Produce messages with different versions + for (KafkaVersion version : kafkaClientFactories.keySet()) { + + Producer producer = kafkaClientFactories.get(version) + .createProducer(producerConfiguration(version)); + + for (int i = 0; i < sends; i++) { + String value = "value-commit-from" + version.name() + i; + + // (Round Robin) Select the partition to send record + int partition = i % numPartitions; + List values = valuesMap.computeIfAbsent(partition, k -> new ArrayList<>()); + values.add(value); + // Record the message corresponding to each partition, + // and later we will use it to compare the consumption results. + valuesMap.put(partition, values); + + RecordMetadata recordMetadata = producer.newContextBuilder(topic, value, partition). + build().sendAsync().get(); + log.info("Kafka client {} sent {} to {}", version, value, recordMetadata); + Assert.assertEquals(recordMetadata.getTopic(), topic); + Assert.assertEquals(recordMetadata.getPartition(), partition); + count++; + } + } + + // 2.Consume messages with different versions + verifyManualCommitOffset(topic, count, numPartitions, valuesMap); + } + protected ProducerConfiguration producerConfiguration(final KafkaVersion version) { return ProducerConfiguration.builder() .bootstrapServers("localhost:" + getKafkaBrokerPort()) @@ -171,12 +311,19 @@ protected ProducerConfiguration producerConfiguration(final KafkaVersion version } protected ConsumerConfiguration consumerConfiguration(final KafkaVersion version) { + return consumerConfiguration(version, true); + } + + protected ConsumerConfiguration consumerConfiguration(final KafkaVersion version, Boolean enableAutoCommit) { return ConsumerConfiguration.builder() .bootstrapServers("localhost:" + getKafkaBrokerPort()) .groupId("group-" + version.name()) .keyDeserializer(version.getStringDeserializer()) .valueDeserializer(version.getStringDeserializer()) .fromEarliest(true) + .enableAutoCommit(enableAutoCommit) + .sessionTimeOutMs("10000") .build(); } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/DefaultKafkaClientFactory.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/DefaultKafkaClientFactory.java index 451aaad0c0..76605d69b2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/DefaultKafkaClientFactory.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/DefaultKafkaClientFactory.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop.compatibility; +import com.google.common.collect.Maps; import io.streamnative.kafka.client.api.Consumer; import io.streamnative.kafka.client.api.ConsumerConfiguration; import io.streamnative.kafka.client.api.ConsumerRecord; @@ -21,15 +22,19 @@ import io.streamnative.kafka.client.api.Producer; import io.streamnative.kafka.client.api.ProducerConfiguration; import io.streamnative.kafka.client.api.RecordMetadata; +import io.streamnative.kafka.client.api.TopicOffsetAndMetadata; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeader; /** @@ -78,5 +83,17 @@ public List> receive(long timeoutMs) { public Map> listTopics(long timeoutMS) { return listTopics(Duration.ofMillis(timeoutMS)); } + + @Override + public void commitOffsetSync(List offsets, Duration timeout) { + HashMap offsetsMap = Maps.newHashMap(); + offsets.forEach( + offsetAndMetadata -> offsetsMap.put( + offsetAndMetadata.createTopicPartition(TopicPartition.class), + offsetAndMetadata.createOffsetAndMetadata(OffsetAndMetadata.class) + ) + ); + commitSync(offsetsMap, timeout); + } } }