diff --git a/kafka-0-10/pom.xml b/kafka-0-10/pom.xml
new file mode 100644
index 0000000000..cc67c47dae
--- /dev/null
+++ b/kafka-0-10/pom.xml
@@ -0,0 +1,73 @@
+
+
+
+
+ pulsar-protocol-handler-kafka-parent
+ io.streamnative.pulsar.handlers
+ 2.9.0-SNAPSHOT
+
+ 4.0.0
+
+ kafka-0-10
+ StreamNative :: Pulsar Protocol Handler :: Kafka Client 0.10.0.0
+ The Kafka client wrapper for 0.10.0.0
+
+
+
+ io.streamnative.pulsar.handlers
+ kafka-client-api
+ 2.9.0-SNAPSHOT
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.10.0.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${mavem-shade-plugin.version}
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.kafka.common
+ org.apache.kafka010.common
+
+
+ org.apache.kafka.clients
+ org.apache.kafka010.clients
+
+
+
+
+
+
+
+
+
diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java
new file mode 100644
index 0000000000..21325fffb1
--- /dev/null
+++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Consumer010Impl.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.kafka.client.zero.ten;
+
+import io.streamnative.kafka.client.api.Consumer;
+import io.streamnative.kafka.client.api.ConsumerConfiguration;
+import io.streamnative.kafka.client.api.ConsumerRecord;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ * The implementation of Kafka consumer 0.10.0.0.
+ */
+public class Consumer010Impl extends KafkaConsumer implements Consumer {
+
+ public Consumer010Impl(final ConsumerConfiguration conf) {
+ super(conf.toProperties());
+ }
+
+ @Override
+ public List> receive(long timeoutMs) {
+ final List> records = new ArrayList<>();
+ poll(timeoutMs).forEach(record -> records.add(ConsumerRecord.createOldRecord(record)));
+ return records;
+ }
+}
diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java
new file mode 100644
index 0000000000..26981237f5
--- /dev/null
+++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/Producer010Impl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.kafka.client.zero.ten;
+
+import io.streamnative.kafka.client.api.ProduceContext;
+import io.streamnative.kafka.client.api.Producer;
+import io.streamnative.kafka.client.api.ProducerConfiguration;
+import io.streamnative.kafka.client.api.RecordMetadata;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * The implementation of Kafka producer 0.10.0.0.
+ */
+public class Producer010Impl extends KafkaProducer implements Producer {
+
+ public Producer010Impl(final ProducerConfiguration conf) {
+ super(conf.toProperties());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future sendAsync(final ProduceContext context) {
+ send(context.createProducerRecord(ProducerRecord.class), context::complete);
+ return context.getFuture();
+ }
+}
diff --git a/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java
new file mode 100644
index 0000000000..093de2fa55
--- /dev/null
+++ b/kafka-0-10/src/main/java/io/streamnative/kafka/client/zero/ten/package-info.java
@@ -0,0 +1,14 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.kafka.client.zero.ten;
diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java
index e65a7407c2..f9265df853 100644
--- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java
+++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ConsumerRecord.java
@@ -13,6 +13,7 @@
*/
package io.streamnative.kafka.client.api;
+import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -45,4 +46,19 @@ public static ConsumerRecord create(T originalRecord) {
(long) ReflectionUtils.invoke(clazz, "offset", originalRecord),
headers);
}
+
+ //support kafka message before 0.11.x
+ public static ConsumerRecord createOldRecord(T originalRecord) {
+ final Class> clazz = originalRecord.getClass();
+
+ final List headerList = new ArrayList<>();
+ headerList.add(new Header(null, null));
+
+ return new ConsumerRecord<>((K) ReflectionUtils.invoke(clazz, "key", originalRecord),
+ (V) ReflectionUtils.invoke(clazz, "value", originalRecord),
+ (String) ReflectionUtils.invoke(clazz, "topic", originalRecord),
+ (int) ReflectionUtils.invoke(clazz, "partition", originalRecord),
+ (long) ReflectionUtils.invoke(clazz, "offset", originalRecord),
+ headerList);
+ }
}
diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java
index 0ac4ebabc9..36574ea502 100644
--- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java
+++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/KafkaVersion.java
@@ -24,7 +24,7 @@
*/
public enum KafkaVersion {
- DEFAULT("default"), KAFKA_1_0_0("100");
+ DEFAULT("default"), KAFKA_1_0_0("100"), KAFKA_0_10_0_0("010");
@Getter
private String name;
diff --git a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java
index 7ad8be3922..6887939cae 100644
--- a/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java
+++ b/kafka-client-api/src/main/java/io/streamnative/kafka/client/api/ProduceContext.java
@@ -38,7 +38,8 @@ public class ProduceContext {
private final CompletableFuture future = new CompletableFuture<>();
/**
- * Create an instance of Kafka's ProducerRecord.
+ * Create an instance of Kafka's ProducerRecord
+ * for kafka version higher than or equal to 0.11.x.
*
* @param clazz the class type of Kafka's ProducerRecord
* @param headerConstructor the constructor of Kafka's Header implementation
@@ -60,6 +61,27 @@ public T createProducerRecord(final Class clazz,
}
}
+ /**
+ * Create an instance of Kafka's ProducerRecord less than 0.11.x.
+ * Because there is no header in ProducerRecord before 0.11.x.
+ *
+ * @param clazz the class type of Kafka's ProducerRecord
+ * @param it should be org.apache.kafka.clients.producer.ProducerRecord
+ * @return an instance of org.apache.kafka.clients.producer.ProducerRecord
+ */
+ public T createProducerRecord(final Class clazz) {
+ try {
+ return clazz.getConstructor(
+ String.class, Integer.class, Long.class, Object.class, Object.class
+ ).newInstance(topic, partition, timestamp, key, value);
+ } catch (InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
/**
* Complete the internal `future` field.
*
diff --git a/kafka-client-factory/pom.xml b/kafka-client-factory/pom.xml
index 039a6d23d1..2dee20af88 100644
--- a/kafka-client-factory/pom.xml
+++ b/kafka-client-factory/pom.xml
@@ -39,5 +39,10 @@
kafka-1-0
2.9.0-SNAPSHOT
+
+ io.streamnative.pulsar.handlers
+ kafka-0-10
+ 2.9.0-SNAPSHOT
+
\ No newline at end of file
diff --git a/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java b/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java
index 26129bc98a..310d5f8f3d 100644
--- a/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java
+++ b/kafka-client-factory/src/main/java/io/streamnative/kafka/client/api/KafkaClientFactoryImpl.java
@@ -15,6 +15,8 @@
import io.streamnative.kafka.client.one.zero.ConsumerImpl;
import io.streamnative.kafka.client.one.zero.ProducerImpl;
+import io.streamnative.kafka.client.zero.ten.Consumer010Impl;
+import io.streamnative.kafka.client.zero.ten.Producer010Impl;
/**
* The factory class to create Kafka producers or consumers with a specific version.
@@ -31,6 +33,8 @@ public KafkaClientFactoryImpl(final KafkaVersion kafkaVersion) {
public Producer createProducer(final ProducerConfiguration conf) {
if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) {
return new ProducerImpl<>(conf);
+ } else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ return new Producer010Impl<>(conf);
}
throw new IllegalArgumentException("No producer for version: " + kafkaVersion);
}
@@ -39,6 +43,8 @@ public Producer createProducer(final ProducerConfiguration conf) {
public Consumer createConsumer(final ConsumerConfiguration conf) {
if (kafkaVersion.equals(KafkaVersion.KAFKA_1_0_0)) {
return new ConsumerImpl<>(conf);
+ } else if (kafkaVersion.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ return new Consumer010Impl<>(conf);
}
throw new IllegalArgumentException("No consumer for version: " + kafkaVersion);
}
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
index 449d5eff3a..33e7097f1a 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java
@@ -28,7 +28,10 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -43,6 +46,8 @@ public class KafkaEntryFormatter implements EntryFormatter {
// These key-value identifies the entry's format as kafka
private static final String IDENTITY_KEY = "entry.format";
private static final String IDENTITY_VALUE = EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase();
+ // Kafka MemoryRecords downConvert method needs time
+ private static final Time time = Time.SYSTEM;
@Override
public ByteBuf encode(MemoryRecords records, int numMessages) {
@@ -61,15 +66,41 @@ public DecodeResult decode(List entries, byte magic) {
// reset header information
final List orderedByteBuf = new ArrayList<>();
+
for (Entry entry : entries) {
try {
long startOffset = MessageIdUtils.peekBaseOffsetFromEntry(entry);
final ByteBuf byteBuf = entry.getDataBuffer();
final MessageMetadata metadata = Commands.parseMessageMetadata(byteBuf);
if (isKafkaEntryFormat(metadata)) {
+ byte batchMagic = byteBuf.getByte(byteBuf.readerIndex() + MAGIC_OFFSET);
byteBuf.setLong(byteBuf.readerIndex() + OFFSET_OFFSET, startOffset);
- byteBuf.setByte(byteBuf.readerIndex() + MAGIC_OFFSET, magic);
- orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
+
+ // batch magic greater than the magic corresponding to the version requested by the client
+ // need down converted
+ if (batchMagic > magic) {
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf));
+ //down converted, batch magic will be set to client magic
+ ConvertedRecords convertedRecords =
+ memoryRecords.downConvert(magic, startOffset, time);
+
+ final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(convertedRecords.records().buffer());
+ orderedByteBuf.add(kafkaBuffer);
+ if (!optionalByteBufs.isPresent()) {
+ optionalByteBufs = Optional.of(new ArrayList<>());
+ }
+ optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(byteBuf));
+ optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
+
+ if (log.isTraceEnabled()) {
+ log.trace("[{}:{}] downConvert record, start offset {}, entry magic: {}, client magic: {}"
+ , entry.getLedgerId(), entry.getEntryId(), startOffset, batchMagic, magic);
+ }
+
+ } else {
+ //not need down converted, batch magic retains the magic value written in production
+ orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
+ }
} else {
final MemoryRecords records =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
@@ -80,7 +111,11 @@ public DecodeResult decode(List entries, byte magic) {
}
optionalByteBufs.ifPresent(byteBufs -> byteBufs.add(kafkaBuffer));
}
- } catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry
+ // Almost all exceptions in Kafka inherit from KafkaException and will be captured
+ // and processed in KafkaApis. Here, whether it is down-conversion or the IOException
+ // in builder.appendWithOffset in decodePulsarEntryToKafkaRecords will be caught by Kafka
+ // and the KafkaException will be thrown. So we need to catch KafkaException here.
+ } catch (KoPMessageMetadataNotFoundException | IOException | KafkaException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry. ", entry.getLedgerId(), entry.getEntryId(), e);
entry.release();
}
diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java
index e0e5bbb117..2372ea656e 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java
@@ -109,7 +109,6 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat
new EndTransactionMarker(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE
? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0));
}
-
final int uncompressedSize = metadata.getUncompressedSize();
final CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
final ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
@@ -145,24 +144,39 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat
final ByteBuffer value = singleMessageMetadata.isNullValue()
? null
: getNioBuffer(singleMessagePayload);
- final Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList());
- builder.appendWithOffset(baseOffset + i,
- timestamp,
- getKeyByteBuffer(singleMessageMetadata),
- value,
- headers);
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ final Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList());
+ builder.appendWithOffset(baseOffset + i,
+ timestamp,
+ getKeyByteBuffer(singleMessageMetadata),
+ value,
+ headers);
+ } else {
+ // record less than magic=2, no header attribute
+ builder.appendWithOffset(baseOffset + i,
+ timestamp,
+ getKeyByteBuffer(singleMessageMetadata),
+ value);
+ }
singleMessagePayload.release();
}
} else {
final long timestamp = (metadata.getEventTime() > 0)
? metadata.getEventTime()
: metadata.getPublishTime();
- final Header[] headers = getHeadersFromMetadata(metadata.getPropertiesList());
- builder.appendWithOffset(baseOffset,
- timestamp,
- getKeyByteBuffer(metadata),
- getNioBuffer(uncompressedPayload),
- headers);
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ final Header[] headers = getHeadersFromMetadata(metadata.getPropertiesList());
+ builder.appendWithOffset(baseOffset,
+ timestamp,
+ getKeyByteBuffer(metadata),
+ getNioBuffer(uncompressedPayload),
+ headers);
+ } else {
+ builder.appendWithOffset(baseOffset,
+ timestamp,
+ getKeyByteBuffer(metadata),
+ getNioBuffer(uncompressedPayload));
+ }
}
final MemoryRecords records = builder.build();
diff --git a/pom.xml b/pom.xml
index 390332d777..d1689c8295 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
+ kafka-0-10
kafka-1-0
kafka-client-api
kafka-client-factory
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java
index 666b790e89..2eee06c4fc 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndPulsarTest.java
@@ -69,7 +69,12 @@ public void testKafkaProducePulsarConsume() throws Exception {
.build();
keys.add(record.getKey());
values.add(record.getValue());
- headers.add(record.getHeaders().get(0));
+ // message has no header before Kafka 0.11.x version
+ if (version.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ headers.add(null);
+ } else {
+ headers.add(record.getHeaders().get(0));
+ }
final RecordMetadata metadata = record.sendAsync().get();
log.info("Kafka client {} sent {} to {}", version, record.getValue(), metadata);
@@ -139,7 +144,9 @@ public void testPulsarProduceKafkaConsume() throws Exception {
Assert.assertEquals(records.size(), 1);
Assert.assertEquals(records.get(0).getValue(), value);
Assert.assertEquals(records.get(0).getKey(), key);
- Assert.assertEquals(records.get(0).getHeaders().get(0), header);
+ if (!version.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ Assert.assertEquals(records.get(0).getHeaders().get(0), header);
+ }
consumer.close();
}
}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java
index 0b870ee5d7..8634a0f00c 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/compatibility/BasicEndToEndTestBase.java
@@ -72,6 +72,15 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ /**
+ * Test Kafka production and Kafka consumption.
+ * after kafka client 0.11.x versions, producer will send record with magic=2.
+ * Therefore, the previous test will not appear issue: https://github.com/streamnative/kop/issues/656
+ * After introducing the kafka 0-10 module, we can reuse the current test,
+ * Because 0.10 will produce a message with magic=1, and the kafka-1-0 and default modules
+ * will use the apiVersion corresponding to magic=2 to send FETCH requests
+ * @throws Exception
+ */
protected void testKafkaProduceKafkaConsume() throws Exception {
final String topic = "test-kafka-produce-kafka-consume";
admin.topics().createPartitionedTopic(topic, 1);
@@ -101,7 +110,10 @@ protected void testKafkaProduceKafkaConsume() throws Exception {
value = "value-from-" + version.name() + offset;
keys.add(key);
values.add(value);
- headers.add(new Header("header-" + key, "header-" + value));
+ // Because there is no header in ProducerRecord before 0.11.x.
+ if (!version.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ headers.add(new Header("header-" + key, "header-" + value));
+ }
metadata = producer.newContextBuilder(topic, value)
.key(key)
@@ -119,6 +131,11 @@ protected void testKafkaProduceKafkaConsume() throws Exception {
}
for (KafkaVersion version : kafkaClientFactories.keySet()) {
+ // Due to some known issues, kop return the minimum Fetch apiVersion is 4,
+ // kafka client versions before 0.11.x not support apiVersion=4
+ if (version.equals(KafkaVersion.KAFKA_0_10_0_0)) {
+ continue;
+ }
final Consumer consumer = kafkaClientFactories.get(version)
.createConsumer(consumerConfiguration(version));
consumer.subscribe(topic);