readCompacted(boolean readCompacted) {
+ conf.setReadCompacted(readCompacted);
+ return this;
+ }
+}
diff --git a/src/main/java/org/apache/pulsar/client/package-info.java b/src/main/java/org/apache/pulsar/client/package-info.java
new file mode 100644
index 0000000000000..03ec6345b6197
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/client/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Timer related classes.
+ *
+ * The classes under this package are ported from Kafka.
+ */
+package org.apache.pulsar.client;
diff --git a/src/test/java/io/streamnative/kop/KafkaApisTest.java b/src/test/java/io/streamnative/kop/KafkaApisTest.java
index 0f963ee39c13a..eb8a2eb466ad9 100644
--- a/src/test/java/io/streamnative/kop/KafkaApisTest.java
+++ b/src/test/java/io/streamnative/kop/KafkaApisTest.java
@@ -176,7 +176,8 @@ CompletableFuture checkInvalidPartition(String topic,
return kafkaRequestHandler.handleOffsetCommitRequest(request);
}
- @Test(timeOut = 20000)
+ @Test(timeOut = 20000, enabled = false)
+ // https://github.com/streamnative/kop/issues/51
public void testOffsetCommitWithInvalidPartition() throws Exception {
String topicName = "kopOffsetCommitWithInvalidPartition";
diff --git a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java
index 43485d5c811f2..dec0b09e2daf7 100644
--- a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java
+++ b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java
@@ -482,7 +482,7 @@ public void testTopicConsumerManager() throws Exception {
kafkaService.getAdminClient().topics().createPartitionedTopic(kafkaTopicName, partitionNumber);
int totalMsgs = 10;
- String messageStrPrefix = "Message_Kop_PulsarProduceKafkaConsume_" + partitionNumber + "_";
+ String messageStrPrefix = "Message_Kop_testTopicConsumerManager_" + partitionNumber + "_";
ProducerBuilder producerBuilder = pulsarClient.newProducer()
.topic(pulsarTopicName)
diff --git a/src/test/java/io/streamnative/kop/LogOffsetTest.java b/src/test/java/io/streamnative/kop/LogOffsetTest.java
index 8f27c5db7d691..233e80b31bd51 100644
--- a/src/test/java/io/streamnative/kop/LogOffsetTest.java
+++ b/src/test/java/io/streamnative/kop/LogOffsetTest.java
@@ -35,7 +35,8 @@
@Slf4j
public class LogOffsetTest extends KafkaApisTest {
- @Test(timeOut = 20000)
+ @Test(timeOut = 20000, enabled = false)
+ // https://github.com/streamnative/kop/issues/51
public void testGetOffsetsForUnknownTopic() throws Exception {
String topicName = "kopTestGetOffsetsForUnknownTopic";
diff --git a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java
index 62fd7da2b5ab3..652577db88b9e 100644
--- a/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java
+++ b/src/test/java/io/streamnative/kop/MockKafkaServiceBaseTest.java
@@ -81,11 +81,11 @@ public abstract class MockKafkaServiceBaseTest {
protected URI lookupUrl;
protected PulsarClient pulsarClient;
- protected final int brokerWebservicePort = PortManager.nextFreePort();
- protected final int brokerWebservicePortTls = PortManager.nextFreePort();
- protected final int brokerPort = PortManager.nextFreePort();
- protected final int kafkaBrokerPort = PortManager.nextFreePort();
- protected final int kafkaBrokerPortTls = PortManager.nextFreePort();
+ protected int brokerWebservicePort = PortManager.nextFreePort();
+ protected int brokerWebservicePortTls = PortManager.nextFreePort();
+ protected int brokerPort = PortManager.nextFreePort();
+ protected int kafkaBrokerPort = PortManager.nextFreePort();
+ protected int kafkaBrokerPortTls = PortManager.nextFreePort();
protected MockZooKeeper mockZookKeeper;
protected NonClosableMockBookKeeper mockBookKeeper;
@@ -115,6 +115,7 @@ protected void resetConfig() {
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
this.conf.setEnableGroupCoordinator(true);
+ this.conf.setOffsetsTopicNumPartitions(1);
this.conf.setAuthenticationEnabled(false);
this.conf.setAuthorizationEnabled(false);
this.conf.setAllowAutoTopicCreation(true);
@@ -164,7 +165,7 @@ protected final void internalCleanup() throws Exception {
pulsarClient.close();
}
if (kafkaService != null) {
- kafkaService.close();
+ stopBroker();
}
if (mockBookKeeper != null) {
mockBookKeeper.reallyShutdown();
@@ -431,20 +432,20 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
public static class KConsumer implements Closeable {
private final KafkaConsumer consumer;
private final String topic;
+ private final String consumerGroup;
public KConsumer(
String topic, String host, int port,
- boolean autoCommit, String username, String password) {
+ boolean autoCommit, String username, String password, String consumerGroup) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoKafkaOnPulsarConsumer");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (autoCommit) {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
} else {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
if (null != username && null != password) {
@@ -464,20 +465,25 @@ public KConsumer(
this.consumer = new KafkaConsumer<>(props);
this.topic = topic;
+ this.consumerGroup = consumerGroup;
}
public KConsumer(String topic, int port, boolean autoCommit) {
- this(topic, "localhost", port, autoCommit, null, null);
+ this(topic, "localhost", port, autoCommit, null, null, "DemoKafkaOnPulsarConsumer");
}
public KConsumer(String topic, String host, int port) {
- this(topic, "localhost", port, false, null, null);
+ this(topic, "localhost", port, false, null, null, "DemoKafkaOnPulsarConsumer");
}
public KConsumer(String topic, int port) {
this(topic, "localhost", port);
}
+ public KConsumer(String topic, int port, String group) {
+ this(topic, "localhost", port, false, null, null, group);
+ }
+
@Override
public void close() {
this.consumer.close();
diff --git a/src/test/java/io/streamnative/kop/SaslPlainTest.java b/src/test/java/io/streamnative/kop/SaslPlainTest.java
index 5f490af7990a2..93649b53e9ff0 100644
--- a/src/test/java/io/streamnative/kop/SaslPlainTest.java
+++ b/src/test/java/io/streamnative/kop/SaslPlainTest.java
@@ -112,7 +112,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
- @Test(timeOut = 20000)
+ @Test(timeOut = 40000)
void simpleProduceAndConsume() throws Exception {
KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(),
SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken);
@@ -121,11 +121,11 @@ void simpleProduceAndConsume() throws Exception {
for (int i = 0; i < totalMsgs; i++) {
String messageStr = messageStrPrefix + i;
- kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, i, messageStr)).get();
+ kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, i, messageStr));
}
KConsumer kConsumer = new KConsumer(KAFKA_TOPIC, "localhost", getKafkaBrokerPort(), false,
- SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken);
+ SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer");
kConsumer.getConsumer().subscribe(Collections.singleton(KAFKA_TOPIC));
int i = 0;
@@ -147,8 +147,8 @@ void simpleProduceAndConsume() throws Exception {
Map> result = kConsumer
.getConsumer().listTopics(Duration.ofSeconds(1));
assertEquals(result.size(), 1);
- assertTrue(result.containsKey(KAFKA_TOPIC), "list of topics "
- + result.keySet().toString() + " does not contains " + KAFKA_TOPIC);
+ assertTrue(result.containsKey(KAFKA_TOPIC),
+ "list of topics " + result.keySet().toString() + " does not contains " + KAFKA_TOPIC);
}
@Test(timeOut = 20000)
diff --git a/src/test/java/io/streamnative/kop/coordinator/group/DistributedGroupCoordinatorTest.java b/src/test/java/io/streamnative/kop/coordinator/group/DistributedGroupCoordinatorTest.java
new file mode 100644
index 0000000000000..fde7ca8462620
--- /dev/null
+++ b/src/test/java/io/streamnative/kop/coordinator/group/DistributedGroupCoordinatorTest.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.kop.coordinator.group;
+
+import static io.streamnative.kop.KafkaProtocolHandler.PLAINTEXT_PREFIX;
+import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import io.streamnative.kop.KafkaService;
+import io.streamnative.kop.KafkaServiceConfiguration;
+import io.streamnative.kop.MockKafkaServiceBaseTest;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+
+/**
+ * Unit test {@link GroupCoordinator}.
+ */
+public class DistributedGroupCoordinatorTest extends MockKafkaServiceBaseTest {
+
+ protected KafkaServiceConfiguration conf1;
+ protected KafkaServiceConfiguration conf2;
+ protected KafkaService kafkaService1;
+ protected KafkaService kafkaService2;
+
+ protected int primaryBrokerWebservicePort;
+ protected int secondaryBrokerWebservicePort;
+ protected int primaryBrokerPort;
+ protected int secondaryBrokerPort;
+ protected int primaryKafkaBrokerPort;
+ protected int secondaryKafkaBrokerPort;
+
+ protected int offsetsTopicNumPartitions;
+
+ private static final Logger log = LoggerFactory.getLogger(DistributedGroupCoordinatorTest.class);
+
+ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kafkaPort) {
+ KafkaServiceConfiguration kConfig = new KafkaServiceConfiguration();
+ kConfig.setBrokerServicePort(Optional.ofNullable(brokerPort));
+ kConfig.setWebServicePort(Optional.ofNullable(webPort));
+ kConfig.setListeners(PLAINTEXT_PREFIX + "localhost:" + kafkaPort);
+
+ kConfig.setOffsetsTopicNumPartitions(offsetsTopicNumPartitions);
+ kConfig.setEnableGroupCoordinator(true);
+
+ kConfig.setAdvertisedAddress("localhost");
+ kConfig.setClusterName(configClusterName);
+ kConfig.setManagedLedgerCacheSizeMB(8);
+ kConfig.setActiveConsumerFailoverDelayTimeMillis(0);
+ kConfig.setDefaultNumberOfNamespaceBundles(2);
+ kConfig.setZookeeperServers("localhost:2181");
+ kConfig.setConfigurationStoreServers("localhost:3181");
+ kConfig.setEnableGroupCoordinator(true);
+ kConfig.setAuthenticationEnabled(false);
+ kConfig.setAuthorizationEnabled(false);
+ kConfig.setAllowAutoTopicCreation(true);
+ kConfig.setAllowAutoTopicCreationType("partitioned");
+ kConfig.setBrokerDeleteInactiveTopicsEnabled(false);
+
+ return kConfig;
+ }
+
+ @Override
+ protected void resetConfig() {
+ offsetsTopicNumPartitions = 16;
+ primaryBrokerWebservicePort = PortManager.nextFreePort();
+ secondaryBrokerWebservicePort = PortManager.nextFreePort();
+ primaryBrokerPort = PortManager.nextFreePort();
+ secondaryBrokerPort = PortManager.nextFreePort();
+ primaryKafkaBrokerPort = PortManager.nextFreePort();
+ secondaryKafkaBrokerPort = PortManager.nextFreePort();
+ conf1 = resetConfig(
+ primaryBrokerPort,
+ primaryBrokerWebservicePort,
+ primaryKafkaBrokerPort);
+ conf2 = resetConfig(
+ secondaryBrokerPort,
+ secondaryBrokerWebservicePort,
+ secondaryKafkaBrokerPort);
+ conf = conf1;
+
+ brokerPort = primaryBrokerPort;
+ brokerWebservicePort = primaryBrokerWebservicePort;
+ kafkaBrokerPort = primaryKafkaBrokerPort;
+
+ log.info("Ports -- broker1: {}, brokerWeb1:{}, kafka1: {}",
+ primaryBrokerPort, primaryBrokerWebservicePort, primaryKafkaBrokerPort);
+ log.info("Ports -- broker2: {}, brokerWeb2:{}, kafka2: {}\n",
+ secondaryBrokerPort, secondaryBrokerWebservicePort, secondaryKafkaBrokerPort);
+ }
+
+ @Override
+ protected void startBroker() throws Exception {
+ this.kafkaService1 = startBroker(conf1);
+ this.kafkaService = kafkaService1;
+ this.kafkaService2 = startBroker(conf2);
+ }
+
+ @Override
+ protected void stopBroker() throws Exception {
+ kafkaService1.close();
+ kafkaService2.close();
+ }
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ if (!admin.clusters().getClusters().contains(configClusterName)) {
+ // so that clients can test short names
+ admin.clusters().createCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ } else {
+ admin.clusters().updateCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ }
+
+ if (!admin.tenants().getTenants().contains("public")) {
+ admin.tenants().createTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ } else {
+ admin.tenants().updateTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/default")) {
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/default",
+ new RetentionPolicies(60, 1000));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) {
+ admin.namespaces().createNamespace("public/__kafka");
+ admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/__kafka",
+ new RetentionPolicies(-1, -1));
+ }
+
+ List brokers = admin.brokers().getActiveBrokers(configClusterName);
+ Assert.assertEquals(brokers.size(), 2);
+ log.info("broker1: {} broker2: {}", brokers.get(0), brokers.get(1));
+ }
+
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ log.info("--- Shutting down ---");
+ super.internalCleanup();
+ }
+
+ protected void kafkaPublishMessage(KProducer kProducer, int numMessages, String messageStrPrefix) throws Exception {
+ for (int i = 0; i < numMessages; i++) {
+ String messageStr = messageStrPrefix + i;
+ ProducerRecord record = new ProducerRecord<>(
+ kProducer.getTopic(),
+ i,
+ messageStr);
+
+ kProducer.getProducer()
+ .send(record)
+ .get();
+ if (log.isDebugEnabled()) {
+ log.debug("Kafka Producer {} Sent message with header: ({}, {})",
+ kProducer.getTopic(), i, messageStr);
+ }
+ }
+ }
+
+ protected void kafkaConsumeCommitMessage(KConsumer kConsumer,
+ int numMessages,
+ String messageStrPrefix,
+ List topicPartitions) {
+ kConsumer.getConsumer().assign(topicPartitions);
+ int i = 0;
+ while (i < numMessages) {
+ if (log.isDebugEnabled()) {
+ log.debug("kConsumer {} start poll message: {}",
+ kConsumer.getTopic() + kConsumer.getConsumerGroup(), i);
+ }
+ ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1));
+ for (ConsumerRecord record : records) {
+ Integer key = record.key();
+ assertEquals(messageStrPrefix + key.toString(), record.value());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Kafka consumer get message: {}, key: {} at offset {}",
+ record.key(), record.value(), record.offset());
+ }
+ i++;
+ }
+ }
+
+ kConsumer.getConsumer().commitSync();
+
+ if (log.isDebugEnabled()) {
+ log.debug("kConsumer {} finished poll and commit message: {}",
+ kConsumer.getTopic() + kConsumer.getConsumerGroup(), i);
+ }
+ assertEquals(i, numMessages);
+ }
+
+ @Test(timeOut = 30000)
+ public void testMutiBrokerAndCoordinator() throws Exception {
+ int partitionNumber = 10;
+ String kafkaTopicName = "kopMutiBrokerAndCoordinator" + partitionNumber;
+ String pulsarTopicName = "persistent://public/default/" + kafkaTopicName;
+
+ String offsetNs = conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace();
+ String offsetsTopicName = "persistent://" + offsetNs + "/" + GROUP_METADATA_TOPIC_NAME;
+
+ // 0. Preparing:
+ // create partitioned topic.
+ kafkaService1.getAdminClient().topics().createPartitionedTopic(kafkaTopicName, partitionNumber);
+ // Because kafkaService1 is start firstly. all the offset topics is served in broker1.
+ // In setting, each ns has 2 bundles. unload the first part, and this part will be served by broker2.
+ kafkaService1.getAdminClient().namespaces().unloadNamespaceBundle(offsetNs, "0x00000000_0x80000000");
+
+ // Offsets partitions should be served by 2 brokers now.
+ Map> offsetTopicMap = Maps.newHashMap();
+ for (int ii = 0; ii < offsetsTopicNumPartitions; ii++) {
+ String offsetsTopic = offsetsTopicName + PARTITIONED_TOPIC_SUFFIX + ii;
+ String result = admin.lookups().lookupTopic(offsetsTopic);
+ offsetTopicMap.putIfAbsent(result, Lists.newArrayList());
+ offsetTopicMap.get(result).add(offsetsTopic);
+ log.info("serving broker for offset topic {} is {}", offsetsTopic, result);
+ }
+ assertEquals(offsetTopicMap.size(), 2);
+
+ final AtomicInteger numberTopic = new AtomicInteger(0);
+ offsetTopicMap.values().stream().forEach(list -> numberTopic.addAndGet(list.size()));
+ assertEquals(numberTopic.get(), offsetsTopicNumPartitions);
+
+ // 1. produce message with Kafka producer.
+ int totalMsgs = 50;
+ String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_";
+ @Cleanup
+ KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort());
+ kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
+
+ // 2. create 4 kafka consumer from different consumer groups.
+ // consume data and commit offsets for 4 consumer group.
+ @Cleanup
+ KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
+ @Cleanup
+ KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
+ @Cleanup
+ KConsumer kConsumer3 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-3");
+ @Cleanup
+ KConsumer kConsumer4 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-4");
+
+ List topicPartitions = IntStream.range(0, partitionNumber)
+ .mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList());
+
+ log.info("Partition size: {}, will consume and commitOffset for 4 consumers",
+ topicPartitions.size());
+
+ kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer3, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer4, totalMsgs, messageStrPrefix, topicPartitions);
+
+ // 3. use a map for serving broker and topics , verify both broker has messages served.
+ Map> topicMap = Maps.newHashMap();
+ for (int ii = 0; ii < partitionNumber; ii++) {
+ String topicName = pulsarTopicName + PARTITIONED_TOPIC_SUFFIX + ii;
+ String result = admin.lookups().lookupTopic(topicName);
+ topicMap.putIfAbsent(result, Lists.newArrayList());
+ topicMap.get(result).add(topicName);
+ log.info("serving broker for topic {} is {}", topicName, result);
+ }
+ assertTrue(topicMap.size() == 2);
+
+ final AtomicInteger numberTopic2 = new AtomicInteger(0);
+ topicMap.values().stream().forEach(list -> numberTopic2.addAndGet(list.size()));
+ assertTrue(numberTopic2.get() == partitionNumber);
+
+ final PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(pulsarTopicName, true);
+ log.info("PartitionedTopicStats for topic {} : {}", pulsarTopicName, new Gson().toJson(topicStats));
+
+ topicMap.forEach((broker, topics) -> {
+ AtomicLong brokerStorageSize = new AtomicLong(0);
+ topics.forEach(topic -> {
+ brokerStorageSize.addAndGet(topicStats.partitions.get(topic).storageSize);
+ });
+ log.info("get data topics served by broker {}, broker storage size: {}", broker, brokerStorageSize.get());
+ assertTrue(brokerStorageSize.get() > 0L);
+ });
+
+ offsetTopicMap = Maps.newHashMap();
+ for (int ii = 0; ii < offsetsTopicNumPartitions; ii++) {
+ String offsetsTopic = offsetsTopicName + PARTITIONED_TOPIC_SUFFIX + ii;
+ String result = admin.lookups().lookupTopic(offsetsTopic);
+ offsetTopicMap.putIfAbsent(result, Lists.newArrayList());
+ offsetTopicMap.get(result).add(offsetsTopic);
+ log.info("serving broker for offset topic {} is {}", offsetsTopic, result);
+ }
+
+ log.info("producer size1: {}, size2: {}",
+ kafkaService1.getGroupCoordinator().getOffsetsProducers().size(),
+ kafkaService2.getGroupCoordinator().getOffsetsProducers().size());
+ log.info("reader size1: {}, size2: {}",
+ kafkaService1.getGroupCoordinator().getOffsetsReaders().size(),
+ kafkaService2.getGroupCoordinator().getOffsetsReaders().size());
+
+ // 4. unload ns, coordinator will be on another broker
+ // verify consumer group still keep the old offset, and consumers will poll no data.
+ log.info("Unload offset namespace, this will trigger another reload. After reload verify offset.");
+ kafkaService1.getAdminClient().namespaces().unload(offsetNs);
+
+ // verify offset be kept and no more records could read.
+ ConsumerRecords records = kConsumer1.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer2.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer3.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer4.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+
+ // 5. another round publish and consume after ns unload.
+ kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
+ kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer3, totalMsgs, messageStrPrefix, topicPartitions);
+ kafkaConsumeCommitMessage(kConsumer4, totalMsgs, messageStrPrefix, topicPartitions);
+
+ offsetTopicMap = Maps.newHashMap();
+ for (int ii = 0; ii < offsetsTopicNumPartitions; ii++) {
+ String offsetsTopic = offsetsTopicName + PARTITIONED_TOPIC_SUFFIX + ii;
+ String result = admin.lookups().lookupTopic(offsetsTopic);
+ offsetTopicMap.putIfAbsent(result, Lists.newArrayList());
+ offsetTopicMap.get(result).add(offsetsTopic);
+ log.info("serving broker for offset topic {} is {}", offsetsTopic, result);
+ }
+
+ log.info("producer broker 1 size : {}, broker 2 : {}",
+ kafkaService1.getGroupCoordinator().getOffsetsProducers().size(),
+ kafkaService2.getGroupCoordinator().getOffsetsProducers().size());
+ log.info("reader broker 1 size : {}, broker 2 : {}",
+ kafkaService1.getGroupCoordinator().getOffsetsReaders().size(),
+ kafkaService2.getGroupCoordinator().getOffsetsReaders().size());
+
+ assertTrue(kafkaService2.getGroupCoordinator().getOffsetsProducers().size() > 0);
+
+ // 6. unload ns, coordinator will be on another broker
+ // verify consumer group still keep the old offset, and consumers will poll no data.
+ log.info("Unload offset namespace, this will trigger another reload");
+ kafkaService1.getAdminClient().namespaces().unload(offsetNs);
+
+ // verify offset be kept and no more records could read.
+ records = kConsumer1.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer2.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer3.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ records = kConsumer4.getConsumer().poll(Duration.ofMillis(200));
+ assertTrue(records.isEmpty());
+ }
+}
diff --git a/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java b/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java
index fa0075c935a60..d84e3e521a1cc 100644
--- a/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java
+++ b/src/test/java/io/streamnative/kop/coordinator/group/GroupCoordinatorTest.java
@@ -51,8 +51,8 @@
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -68,8 +68,6 @@
*/
public class GroupCoordinatorTest extends MockKafkaServiceBaseTest {
- private static final String ClientId = "consumer-test";
- private static final String ClientHost = "localhost";
private static final int ConsumerMinSessionTimeout = 10;
private static final int ConsumerMaxSessionTimeout = 10000;
private static final int DefaultRebalanceTimeout = 500;
@@ -83,8 +81,10 @@ public class GroupCoordinatorTest extends MockKafkaServiceBaseTest {
String topicName;
MockTimer timer = null;
GroupCoordinator groupCoordinator = null;
- Producer producer;
- Reader reader;
+
+ ProducerBuilder producerBuilder;
+ ReaderBuilder readerBuilder;
+
Consumer consumer;
OrderedScheduler scheduler;
GroupMetadataManager groupMetadataManager;
@@ -127,15 +127,13 @@ public void setup() throws Exception {
ConsumerMaxSessionTimeout,
GroupInitialRebalanceDelay
);
- OffsetConfig offsetConfig = OffsetConfig.builder().build();
-
- timer = new MockTimer();
topicName = "test-coordinator-" + System.currentTimeMillis();
+ OffsetConfig offsetConfig = OffsetConfig.builder().offsetsTopicName(topicName).build();
- producer = pulsarClient.newProducer(Schema.BYTEBUFFER)
- .topic(topicName)
- .create();
+ timer = new MockTimer();
+
+ producerBuilder = pulsarClient.newProducer(Schema.BYTEBUFFER);
consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER)
.topic(topicName)
@@ -143,19 +141,17 @@ public void setup() throws Exception {
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
- reader = pulsarClient.newReader(Schema.BYTEBUFFER)
- .topic(topicName)
- .startMessageId(MessageId.earliest)
- .create();
+ readerBuilder = pulsarClient.newReader(Schema.BYTEBUFFER)
+ .startMessageId(MessageId.earliest);
groupPartitionId = 0;
otherGroupPartitionId = 1;
otherGroupId = "otherGroupId";
+ offsetConfig.offsetsTopicNumPartitions(4);
groupMetadataManager = spy(new GroupMetadataManager(
- 4,
offsetConfig,
- producer,
- reader,
+ producerBuilder,
+ readerBuilder,
scheduler,
timer.time(),
id -> {
@@ -203,8 +199,6 @@ public void setup() throws Exception {
public void cleanup() throws Exception {
groupCoordinator.shutdown();
groupMetadataManager.shutdown();
- producer.close();
- reader.close();
consumer.close();
scheduler.shutdown();
super.internalCleanup();
@@ -280,7 +274,7 @@ public void testJoinGroupUnknowMemberId() throws Exception {
JoinGroupResult joinGroupResult = joinGroup(
otherGroupId, memberId, protocolType, protocols
);
- assertEquals(Errors.NONE, joinGroupResult.getError());
+ assertEquals(Errors.NOT_COORDINATOR, joinGroupResult.getError());
}
@Test
@@ -389,7 +383,7 @@ public void testJoinGroupUnknownConsumerExistingGroup() throws Exception {
@Test
public void testHeartbeatWrongCoordinator() throws Exception {
Errors error = groupCoordinator.handleHeartbeat(otherGroupId, memberId, -1).get();
- assertEquals(Errors.UNKNOWN_MEMBER_ID, error);
+ assertEquals(Errors.NOT_COORDINATOR, error);
}
@Test
@@ -737,8 +731,7 @@ public void testSyncGroupEmptyAssignment() throws Exception {
assertEquals(Errors.NONE, heartbeatResult);
}
- @Test(enabled = false)
- // TODO: https://github.com/streamnative/kop/issues/32
+ @Test
public void testSyncGroupOtherGroupId() throws Exception {
int generation = 1;
KeyValue syncGroupResult = groupCoordinator.handleSyncGroup(
@@ -1542,8 +1535,7 @@ public void testFetchOffsetForUnknownPartition() {
assertEquals(OffsetFetchResponse.INVALID_OFFSET, fetchOffsetsResult.getValue().get(tp).offset);
}
- @Test(enabled = false)
- // TODO: https://github.com/streamnative/kop/issues/32
+ @Test
public void testFetchOffsetNotCoordinatorForGroup() {
TopicPartition tp = new TopicPartition("topic", 0);
KeyValue> fetchOffsetsResult =
@@ -1678,7 +1670,7 @@ public void testLeaveGroupWrongCoordinator() throws Exception {
Errors leaveGroupResult = groupCoordinator.handleLeaveGroup(
otherGroupId, JoinGroupRequest.UNKNOWN_MEMBER_ID
).get();
- assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult);
+ assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult);
}
@Test
@@ -1764,7 +1756,7 @@ groupId, memberId, protocolType, newProtocols()
@Test
public void testDescribeGroupWrongCoordinator() {
KeyValue describeGroupResult = groupCoordinator.handleDescribeGroup(otherGroupId);
- assertEquals(Errors.NONE, describeGroupResult.getKey());
+ assertEquals(Errors.NOT_COORDINATOR, describeGroupResult.getKey());
}
@Test
diff --git a/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java b/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java
index 10066b567a654..17b68c43ed548 100644
--- a/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java
+++ b/src/test/java/io/streamnative/kop/coordinator/group/GroupMetadataManagerTest.java
@@ -20,17 +20,17 @@
import static io.streamnative.kop.coordinator.group.GroupState.Empty;
import static io.streamnative.kop.coordinator.group.GroupState.PreparingRebalance;
import static io.streamnative.kop.coordinator.group.GroupState.Stable;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -59,7 +59,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
@@ -73,19 +72,21 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
+import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
/**
* Unit test {@link GroupMetadataManager}.
@@ -95,27 +96,18 @@ public class GroupMetadataManagerTest extends MockKafkaServiceBaseTest {
private static final String groupId = "foo";
private static final int groupPartitionId = 0;
- private static final TopicPartition groupTopicPartition =
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId);
+
private static final String protocolType = "protocolType";
private static final int rebalanceTimeout = 60000;
private static final int sessionTimeout = 10000;
- MockTime time = null;
GroupMetadataManager groupMetadataManager = null;
- Producer producer = null;
- Reader consumer = null;
+ ProducerBuilder producer = null;
+ ReaderBuilder consumer = null;
OffsetConfig offsetConfig = OffsetConfig.builder().build();
OrderedScheduler scheduler;
- @Override
- protected void resetConfig() {
- super.resetConfig();
- // since this test mock all Group Coordinator, we disable the one in Kafka broker.
- this.conf.setEnableGroupCoordinator(false);
- }
-
- @Before
+ @BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
@@ -125,30 +117,46 @@ public void setup() throws Exception {
.numThreads(1)
.build();
- admin.clusters().createCluster("test",
- new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
- admin.tenants().createTenant("public",
- new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
- admin.namespaces().setRetention("public/default",
- new RetentionPolicies(20, 100));
+ if (!admin.clusters().getClusters().contains(configClusterName)) {
+ // so that clients can test short names
+ admin.clusters().createCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ } else {
+ admin.clusters().updateCluster(configClusterName,
+ new ClusterData("http://127.0.0.1:" + brokerWebservicePort));
+ }
- time = new MockTime();
- groupMetadataManager = new GroupMetadataManager(
- 1,
- offsetConfig,
- producer,
- consumer,
- scheduler,
- time
- );
+ if (!admin.tenants().getTenants().contains("public")) {
+ admin.tenants().createTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ } else {
+ admin.tenants().updateTenant("public",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/default")) {
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/default",
+ new RetentionPolicies(60, 1000));
+ }
+ if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) {
+ admin.namespaces().createNamespace("public/__kafka");
+ admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test"));
+ admin.namespaces().setRetention("public/__kafka",
+ new RetentionPolicies(20, 100));
+ }
+
+ groupMetadataManager = kafkaService.getGroupCoordinator().getGroupManager();
}
- @After
+ @AfterMethod
@Override
public void cleanup() throws Exception {
+ if (groupMetadataManager != null) {
+ groupMetadataManager.shutdown();
+ }
+ scheduler.shutdown();
super.internalCleanup();
}
@@ -239,7 +247,7 @@ private ByteBuffer newMemoryRecordsBuffer(List records,
short producerEpoch,
boolean isTxnOffsetCommit) {
TimestampType timestampType = TimestampType.CREATE_TIME;
- long timestamp = time.milliseconds();
+ long timestamp = Time.SYSTEM.milliseconds();
ByteBuffer buffer = ByteBuffer.allocate(
AbstractRecords.estimateSizeInBytes(
@@ -291,7 +299,7 @@ private int completeTransactionalOffsetCommit(ByteBuffer buffer,
boolean isCommit) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(),
+ TimestampType.LOG_APPEND_TIME, baseOffset, Time.SYSTEM.milliseconds(),
producerId, producerEpoch, 0, true, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
ControlRecordType controlRecordType;
@@ -300,15 +308,13 @@ private int completeTransactionalOffsetCommit(ByteBuffer buffer,
} else {
controlRecordType = ControlRecordType.ABORT;
}
- builder.appendEndTxnMarker(time.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
+ builder.appendEndTxnMarker(Time.SYSTEM.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
builder.build();
return 1;
}
@Test
public void testLoadOffsetsWithoutGroup() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
-
Map committedOffsets = new HashMap<>();
committedOffsets.put(
new TopicPartition("foo", 0), 23L);
@@ -324,37 +330,35 @@ public void testLoadOffsetsWithoutGroup() throws Exception {
ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
- });
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
+
}
@Test
public void testLoadEmptyGroupWithOffsets() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
int generation = 15;
String protocolType = "consumer";
@@ -376,41 +380,38 @@ public void testLoadEmptyGroupWithOffsets() throws Exception {
ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- assertEquals(generation, group.generationId());
- assertEquals(Optional.of(protocolType), group.protocolType());
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- assertNull(group.leaderOrNull());
- assertNull(group.protocolOrNull());
- committedOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
- });
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ assertEquals(generation, group.generationId());
+ assertEquals(Optional.of(protocolType), group.protocolType());
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ assertNull(group.leaderOrNull());
+ assertNull(group.protocolOrNull());
+ committedOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
}
@Test
public void testLoadTransactionalOffsetsWithoutGroup() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -433,37 +434,34 @@ public void testLoadTransactionalOffsetsWithoutGroup() throws Exception {
buffer.flip();
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
- });
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
}
@Test
public void testDoNotLoadAbortedTransactionalOffsetCommits() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -483,26 +481,23 @@ public void testDoNotLoadAbortedTransactionalOffsetCommits() throws Exception {
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> {}
- ).get();
- Optional groupInCache = groupMetadataManager.getGroup(groupId);
- assertFalse(groupInCache.isPresent());
- });
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> {}
+ ).get();
+ Optional groupInCache = groupMetadataManager.getGroup(groupId);
+ assertFalse(groupInCache.isPresent());
}
@Test
public void testGroupLoadedWithPendingCommits() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -521,37 +516,35 @@ public void testGroupLoadedWithPendingCommits() throws Exception {
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- GroupMetadata group = onLoadedFuture.get();
- assertSame(group, groupInCache);
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- // Ensure that no offsets are materialized, but that we have offsets pending.
- assertEquals(0, group.allOffsets().size());
- assertTrue(group.hasOffsets());
- assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId));
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ GroupMetadata group = onLoadedFuture.get();
+ assertSame(group, groupInCache);
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ // Ensure that no offsets are materialized, but that we have offsets pending.
+ assertEquals(0, group.allOffsets().size());
+ assertTrue(group.hasOffsets());
+ assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId));
}
@Test
public void testLoadWithCommitedAndAbortedTransactionOffsetCommits() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -581,41 +574,39 @@ public void testLoadWithCommitedAndAbortedTransactionOffsetCommits() throws Exce
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- GroupMetadata group = onLoadedFuture.get();
- assertSame(group, groupInCache);
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- // Ensure that only the committed offsets are materialized, and that there are no pending
- // commits for the producer. This allows us to be certain that the aborted offset commits
- //
- // are truly discarded.
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
- assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ GroupMetadata group = onLoadedFuture.get();
+ assertSame(group, groupInCache);
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ // Ensure that only the committed offsets are materialized, and that there are no pending
+ // commits for the producer. This allows us to be certain that the aborted offset commits
+ //
+ // are truly discarded.
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
+ assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
}
@Test
public void testLoadWithCommitedAndAbortedAndPendingTransactionOffsetCommits() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -654,51 +645,50 @@ public void testLoadWithCommitedAndAbortedAndPendingTransactionOffsetCommits() t
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- GroupMetadata group = onLoadedFuture.get();
- assertSame(group, groupInCache);
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
-
- // Ensure that only the committed offsets are materialized, and that there are no pending commits
- // for the producer. This allows us to be certain that the aborted offset commits are truly discarded.
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
-
- // We should have pending commits.
- assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId));
-
- // The loaded pending commits should materialize after a commit marker comes in.
- groupMetadataManager.handleTxnCompletion(
- producerId,
- Sets.newHashSet(groupMetadataTopicPartition.partition()),
- true);
- assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
- pendingOffsets.forEach((tp, offset) ->
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ GroupMetadata group = onLoadedFuture.get();
+ assertSame(group, groupInCache);
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+
+ // Ensure that only the committed offsets are materialized, and that there are no pending commits
+ // for the producer. This allows us to be certain that the aborted offset commits are truly discarded.
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
+
+ // We should have pending commits.
+ assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId));
+
+ // The loaded pending commits should materialize after a commit marker comes in.
+ groupMetadataManager.handleTxnCompletion(
+ producerId,
+ Sets.newHashSet(groupPartitionId),
+ true);
+ assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
+ pendingOffsets.forEach((tp, offset) ->
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
+
}
@Test
public void testLoadTransactionalOffsetCommitsFromMultipleProducers() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long firstProducerId = 1000L;
short firstProducerEpoch = 2;
long secondProducerId = 1001L;
@@ -739,51 +729,50 @@ public void testLoadTransactionalOffsetCommitsFromMultipleProducers() throws Exc
buffer.flip();
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
-
- // Ensure that only the committed offsets are materialized, and that there are no pending commits
- // for the producer. This allows us to be certain that the aborted offset commits are truly discarded.
- assertEquals(committedOffsetsFirstProducer.size() + committedOffsetsSecondProducer.size(),
- group.allOffsets().size());
- committedOffsetsFirstProducer.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- assertEquals(
- Optional.of((long) firstProduceRecordOffset),
- group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset));
- });
- committedOffsetsSecondProducer.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- assertEquals(
- Optional.of((long) secondProduceRecordOffset),
- group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset));
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+
+ // Ensure that only the committed offsets are materialized, and that there are no pending commits
+ // for the producer. This allows us to be certain that the aborted offset commits are truly discarded.
+ assertEquals(committedOffsetsFirstProducer.size() + committedOffsetsSecondProducer.size(),
+ group.allOffsets().size());
+ committedOffsetsFirstProducer.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ assertEquals(
+ Optional.of((long) firstProduceRecordOffset),
+ group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset));
+ });
+ committedOffsetsSecondProducer.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ assertEquals(
+ Optional.of((long) secondProduceRecordOffset),
+ group.offsetWithRecordMetadata(tp).flatMap(CommitRecordMetadataAndOffset::appendedBatchOffset));
+ });
+
}
@Test
public void testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
long producerId = 1000L;
short producerEpoch = 2;
@@ -809,42 +798,49 @@ public void testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWin
buffer.flip();
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
-
- // The group should be loaded with pending offsets.
- assertEquals(1, group.allOffsets().size());
- assertTrue(group.hasOffsets());
- assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
- assertEquals(consumerOffsetCommits.size(), group.allOffsets().size());
- transactionalOffsetCommits.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+
+ // The group should be loaded with pending offsets.
+ assertEquals(1, group.allOffsets().size());
+ assertTrue(group.hasOffsets());
+ assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId));
+ assertEquals(consumerOffsetCommits.size(), group.allOffsets().size());
+ transactionalOffsetCommits.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ });
+
}
@Test
public void testGroupNotExits() {
+ groupMetadataManager = new GroupMetadataManager(
+ offsetConfig,
+ producer,
+ consumer,
+ scheduler,
+ new MockTime()
+ );
// group is not owned
assertFalse(groupMetadataManager.groupNotExists(groupId));
@@ -865,7 +861,6 @@ public void testGroupNotExits() {
@Test
public void testLoadOffsetsWithTombstones() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
TopicPartition tombstonePartition = new TopicPartition("foo", 1);
Map committedOffsets = new HashMap<>();
@@ -886,44 +881,43 @@ public void testLoadOffsetsWithTombstones() throws Exception {
ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
-
- // The group should be loaded with pending offsets.
- assertEquals(committedOffsets.size() - 1, group.allOffsets().size());
- committedOffsets.forEach((tp, offset) -> {
- if (tp == tombstonePartition) {
- assertEquals(Optional.empty(), group.offset(tp));
- } else {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- }
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+
+ // The group should be loaded with pending offsets.
+ assertEquals(committedOffsets.size() - 1, group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) -> {
+ if (tp == tombstonePartition) {
+ assertEquals(Optional.empty(), group.offset(tp));
+ } else {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ }
+ });
+
}
@Test
public void testLoadOffsetsAndGroup() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
int generation = 935;
String protocolType = "consumer";
String protocol = "range";
@@ -949,47 +943,46 @@ public void testLoadOffsetsAndGroup() throws Exception {
ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Stable, group.currentState());
- assertEquals(memberId, group.leaderOrNull());
- assertEquals(generation, group.generationId());
- assertEquals(Optional.of(protocolType), group.protocolType());
- assertEquals(
- Lists.newArrayList(memberId),
- group.allMembers().stream().collect(Collectors.toList()));
- assertEquals(
- committedOffsets.size(),
- group.allOffsets().size()
- );
- committedOffsets.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Stable, group.currentState());
+ assertEquals(memberId, group.leaderOrNull());
+ assertEquals(generation, group.generationId());
+ assertEquals(Optional.of(protocolType), group.protocolType());
+ assertEquals(
+ Lists.newArrayList(memberId),
+ group.allMembers().stream().collect(Collectors.toList()));
+ assertEquals(
+ committedOffsets.size(),
+ group.allOffsets().size()
+ );
+ committedOffsets.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ });
+
}
@Test
public void testLoadGroupWithTombstone() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
int generation = 935;
String memberId = "98098230493";
String protocolType = "consumer";
@@ -1011,20 +1004,20 @@ public void testLoadGroupWithTombstone() throws Exception {
));
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> {}
- ).get();
- assertFalse(groupMetadataManager.getGroup(groupId).isPresent());
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> {}
+ ).get();
+ assertFalse(groupMetadataManager.getGroup(groupId).isPresent());
+
}
@Test
@@ -1032,8 +1025,6 @@ public void testOffsetWriteAfterGroupRemoved() throws Exception {
// this test case checks the following scenario:
// 1. the group exists at some point in time, but is later removed (because all members left)
// 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
-
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
int generation = 293;
String memberId = "98098230493";
String protocolType = "consumer";
@@ -1067,38 +1058,36 @@ public void testOffsetWriteAfterGroupRemoved() throws Exception {
ByteBuffer buffer = newMemoryRecordsBuffer(newOffsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Empty, group.currentState());
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Empty, group.currentState());
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ });
}
@Test
public void testLoadGroupAndOffsetsFromDifferentSegments() throws Exception {
- TopicPartition groupMetadataTopicPartition = groupTopicPartition;
int generation = 293;
String protocolType = "consumer";
String protocol = "range";
@@ -1138,54 +1127,61 @@ public void testLoadGroupAndOffsetsFromDifferentSegments() throws Exception {
ByteBuffer segment2Buffer = newMemoryRecordsBuffer(segment2Records);
byte[] key = groupMetadataKey(groupId);
- runGroupMetadataManagerProducerTester("test-load-offsets-without-group",
- (groupMetadataManager, producer) -> {
- producer.newMessage()
- .keyBytes(key)
- .value(segment1Buffer)
- .eventTime(time.milliseconds())
- .send();
-
- producer.newMessage()
- .keyBytes(key)
- .value(segment2Buffer)
- .eventTime(time.milliseconds())
- .send();
-
- CompletableFuture onLoadedFuture = new CompletableFuture<>();
- groupMetadataManager.scheduleLoadGroupAndOffsets(
- groupMetadataTopicPartition.partition(),
- groupMetadata -> onLoadedFuture.complete(groupMetadata)
- ).get();
- GroupMetadata group = onLoadedFuture.get();
- GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
- fail("Group was not loaded into the cache");
- return null;
- });
- assertSame(group, groupInCache);
-
- assertEquals(groupId, group.groupId());
- assertEquals(Stable, group.currentState());
-
- assertEquals("segment2 group record member should be elected",
- segment2MemberId, group.leaderOrNull());
- assertEquals("segment2 group record member should be only member",
- Lists.newArrayList(segment2MemberId),
- group.allMembers().stream().collect(Collectors.toList()));
-
- // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
- Map committedOffsets = new HashMap<>();
- committedOffsets.putAll(segment1Offsets);
- committedOffsets.putAll(segment2Offsets);
- assertEquals(committedOffsets.size(), group.allOffsets().size());
- committedOffsets.forEach((tp, offset) -> {
- assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
- });
- });
+
+ Producer producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
+ producer.newMessage()
+ .keyBytes(key)
+ .value(segment1Buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ producer.newMessage()
+ .keyBytes(key)
+ .value(segment2Buffer)
+ .eventTime(Time.SYSTEM.milliseconds())
+ .send();
+
+ CompletableFuture onLoadedFuture = new CompletableFuture<>();
+ groupMetadataManager.scheduleLoadGroupAndOffsets(
+ groupPartitionId,
+ groupMetadata -> onLoadedFuture.complete(groupMetadata)
+ ).get();
+ GroupMetadata group = onLoadedFuture.get();
+ GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
+ fail("Group was not loaded into the cache");
+ return null;
+ });
+ assertSame(group, groupInCache);
+
+ assertEquals(groupId, group.groupId());
+ assertEquals(Stable, group.currentState());
+
+ assertEquals(segment2MemberId, group.leaderOrNull(),
+ "segment2 group record member should be elected");
+ assertEquals(Lists.newArrayList(segment2MemberId),
+ group.allMembers().stream().collect(Collectors.toList()),
+ "segment2 group record member should be only member");
+
+ // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
+ Map committedOffsets = new HashMap<>();
+ committedOffsets.putAll(segment1Offsets);
+ committedOffsets.putAll(segment2Offsets);
+ assertEquals(committedOffsets.size(), group.allOffsets().size());
+ committedOffsets.forEach((tp, offset) -> {
+ assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset));
+ });
+
}
@Test
public void testAddGroup() {
+ groupMetadataManager = new GroupMetadataManager(
+ offsetConfig,
+ producer,
+ consumer,
+ scheduler,
+ new MockTime()
+ );
GroupMetadata group = new GroupMetadata("foo", Empty);
assertEquals(group, groupMetadataManager.addGroup(group));
assertEquals(group, groupMetadataManager.addGroup(
@@ -1195,760 +1191,741 @@ public void testAddGroup() {
@Test
public void testStoreEmptyGroup() throws Exception {
- final String topicName = "test-store-empty-group";
-
- runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> {
- int generation = 27;
- String protocolType = "consumer";
- GroupMetadata group = GroupMetadata.loadGroup(
- groupId,
- Empty,
- generation,
- protocolType,
- null,
- null,
- Collections.emptyList()
- );
- groupMetadataManager.addGroup(group);
-
- Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get();
- assertEquals(Errors.NONE, errors);
-
- Message message = consumer.receive();
- assertTrue(message.getEventTime() > 0L);
- assertTrue(message.hasKey());
- byte[] key = message.getKeyBytes();
- BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
- assertTrue(groupKey instanceof GroupMetadataKey);
- GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
- assertEquals(groupId, groupMetadataKey.key());
-
- ByteBuffer value = message.getValue();
- MemoryRecords memRecords = MemoryRecords.readableRecords(value);
- AtomicBoolean verified = new AtomicBoolean(false);
- memRecords.batches().forEach(batch -> {
- for (Record record : batch) {
- assertFalse(verified.get());
- BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
- assertTrue(bk instanceof GroupMetadataKey);
- GroupMetadataKey gmk = (GroupMetadataKey) bk;
- assertEquals(groupId, gmk.key());
-
- GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
- groupId, record.value()
- );
- assertTrue(gm.is(Empty));
- assertEquals(generation, gm.generationId());
- assertEquals(Optional.of(protocolType), gm.protocolType());
- verified.set(true);
- }
- });
- assertTrue(verified.get());
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER)
+ .topic(groupMetadataManager.getTopicPartitionName())
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ int generation = 27;
+ String protocolType = "consumer";
+ GroupMetadata group = GroupMetadata.loadGroup(
+ groupId,
+ Empty,
+ generation,
+ protocolType,
+ null,
+ null,
+ Collections.emptyList()
+ );
+ groupMetadataManager.addGroup(group);
+
+ Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get();
+ assertEquals(Errors.NONE, errors);
+
+ Message message = consumer.receive();
+ while (message.getValue().array().length == 0) {
+ // bypass above place holder message.
+ message = consumer.receive();
+ }
+ assertTrue(message.getEventTime() > 0L);
+ assertTrue(message.hasKey());
+ byte[] key = message.getKeyBytes();
+ BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
+ assertTrue(groupKey instanceof GroupMetadataKey);
+ GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
+ assertEquals(groupId, groupMetadataKey.key());
+
+ ByteBuffer value = message.getValue();
+ MemoryRecords memRecords = MemoryRecords.readableRecords(value);
+ AtomicBoolean verified = new AtomicBoolean(false);
+ memRecords.batches().forEach(batch -> {
+ for (Record record : batch) {
+ assertFalse(verified.get());
+ BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
+ assertTrue(bk instanceof GroupMetadataKey);
+ GroupMetadataKey gmk = (GroupMetadataKey) bk;
+ assertEquals(groupId, gmk.key());
+
+ GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
+ groupId, record.value()
+ );
+ assertTrue(gm.is(Empty));
+ assertEquals(generation, gm.generationId());
+ assertEquals(Optional.of(protocolType), gm.protocolType());
+ verified.set(true);
+ }
});
+ assertTrue(verified.get());
+
}
@Test
public void testStoreEmptySimpleGroup() throws Exception {
- final String topicName = "test-store-empty-simple-group";
-
- runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> {
-
- GroupMetadata group = new GroupMetadata(groupId, Empty);
- groupMetadataManager.addGroup(group);
-
- Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get();
- assertEquals(Errors.NONE, errors);
-
- Message message = consumer.receive();
- assertTrue(message.getEventTime() > 0L);
- assertTrue(message.hasKey());
- byte[] key = message.getKeyBytes();
-
- BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
- assertTrue(groupKey instanceof GroupMetadataKey);
- GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
- assertEquals(groupId, groupMetadataKey.key());
-
- ByteBuffer value = message.getValue();
- MemoryRecords memRecords = MemoryRecords.readableRecords(value);
- AtomicBoolean verified = new AtomicBoolean(false);
- memRecords.batches().forEach(batch -> {
- for (Record record : batch) {
- assertFalse(verified.get());
- BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
- assertTrue(bk instanceof GroupMetadataKey);
- GroupMetadataKey gmk = (GroupMetadataKey) bk;
- assertEquals(groupId, gmk.key());
-
- GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
- groupId, record.value()
- );
- assertTrue(gm.is(Empty));
- assertEquals(0, gm.generationId());
- assertEquals(Optional.empty(), gm.protocolType());
- verified.set(true);
- }
- });
- assertTrue(verified.get());
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER)
+ .topic(groupMetadataManager.getTopicPartitionName())
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ GroupMetadata group = new GroupMetadata(groupId, Empty);
+ groupMetadataManager.addGroup(group);
+
+ Errors errors = groupMetadataManager.storeGroup(group, Collections.emptyMap()).get();
+ assertEquals(Errors.NONE, errors);
+
+ Message message = consumer.receive();
+ while (message.getValue().array().length == 0) {
+ // bypass above place holder message.
+ message = consumer.receive();
+ }
+ assertTrue(message.getEventTime() > 0L);
+ assertTrue(message.hasKey());
+ byte[] key = message.getKeyBytes();
+
+ BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
+ assertTrue(groupKey instanceof GroupMetadataKey);
+ GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
+ assertEquals(groupId, groupMetadataKey.key());
+
+ ByteBuffer value = message.getValue();
+ MemoryRecords memRecords = MemoryRecords.readableRecords(value);
+ AtomicBoolean verified = new AtomicBoolean(false);
+ memRecords.batches().forEach(batch -> {
+ for (Record record : batch) {
+ assertFalse(verified.get());
+ BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
+ assertTrue(bk instanceof GroupMetadataKey);
+ GroupMetadataKey gmk = (GroupMetadataKey) bk;
+ assertEquals(groupId, gmk.key());
+
+ GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
+ groupId, record.value()
+ );
+ assertTrue(gm.is(Empty));
+ assertEquals(0, gm.generationId());
+ assertEquals(Optional.empty(), gm.protocolType());
+ verified.set(true);
+ }
});
+ assertTrue(verified.get());
}
@Test
public void testStoreNoneEmptyGroup() throws Exception {
- final String topicName = "test-store-non-empty-group";
-
- runGroupMetadataManagerConsumerTester(topicName, (groupMetadataManager, consumer) -> {
- String memberId = "memberId";
- String clientId = "clientId";
- String clientHost = "localhost";
-
- GroupMetadata group = new GroupMetadata(groupId, Empty);
- groupMetadataManager.addGroup(group);
-
- Map protocols = new HashMap<>();
- protocols.put("protocol", new byte[0]);
- MemberMetadata member = new MemberMetadata(
- memberId,
- groupId,
- clientId,
- clientHost,
- rebalanceTimeout,
- sessionTimeout,
- protocolType,
- protocols
- );
- CompletableFuture joinFuture = new CompletableFuture<>();
- member.awaitingJoinCallback(joinFuture);
- group.add(member);
- group.transitionTo(GroupState.PreparingRebalance);
- group.initNextGeneration();
-
- Map assignments = new HashMap<>();
- assignments.put(memberId, new byte[0]);
- Errors errors = groupMetadataManager.storeGroup(group, assignments).get();
- assertEquals(Errors.NONE, errors);
-
- Message message = consumer.receive();
- assertTrue(message.getEventTime() > 0L);
- assertTrue(message.hasKey());
- byte[] key = message.getKeyBytes();
- BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
- assertTrue(groupKey instanceof GroupMetadataKey);
- GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
- assertEquals(groupId, groupMetadataKey.key());
-
- ByteBuffer value = message.getValue();
- MemoryRecords memRecords = MemoryRecords.readableRecords(value);
- AtomicBoolean verified = new AtomicBoolean(false);
- memRecords.batches().forEach(batch -> {
- for (Record record : batch) {
- assertFalse(verified.get());
- BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
- assertTrue(bk instanceof GroupMetadataKey);
- GroupMetadataKey gmk = (GroupMetadataKey) bk;
- assertEquals(groupId, gmk.key());
-
- GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
- groupId, record.value()
- );
- assertEquals(Stable, gm.currentState());
- assertEquals(1, gm.generationId());
- assertEquals(Optional.of(protocolType), gm.protocolType());
- assertEquals("protocol", gm.protocolOrNull());
- assertTrue(gm.has(memberId));
- verified.set(true);
- }
- });
- assertTrue(verified.get());
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER)
+ .topic(groupMetadataManager.getTopicPartitionName())
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ String memberId = "memberId";
+ String clientId = "clientId";
+ String clientHost = "localhost";
+
+ GroupMetadata group = new GroupMetadata(groupId, Empty);
+ groupMetadataManager.addGroup(group);
+
+ Map protocols = new HashMap<>();
+ protocols.put("protocol", new byte[0]);
+ MemberMetadata member = new MemberMetadata(
+ memberId,
+ groupId,
+ clientId,
+ clientHost,
+ rebalanceTimeout,
+ sessionTimeout,
+ protocolType,
+ protocols
+ );
+ CompletableFuture joinFuture = new CompletableFuture<>();
+ member.awaitingJoinCallback(joinFuture);
+ group.add(member);
+ group.transitionTo(GroupState.PreparingRebalance);
+ group.initNextGeneration();
+
+ Map assignments = new HashMap<>();
+ assignments.put(memberId, new byte[0]);
+ Errors errors = groupMetadataManager.storeGroup(group, assignments).get();
+ assertEquals(Errors.NONE, errors);
+
+ Message message = consumer.receive();
+ while (message.getValue().array().length == 0) {
+ // bypass above place holder message.
+ message = consumer.receive();
+ }
+ assertTrue(message.getEventTime() > 0L);
+ assertTrue(message.hasKey());
+ byte[] key = message.getKeyBytes();
+ BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
+ assertTrue(groupKey instanceof GroupMetadataKey);
+ GroupMetadataKey groupMetadataKey = (GroupMetadataKey) groupKey;
+ assertEquals(groupId, groupMetadataKey.key());
+
+ ByteBuffer value = message.getValue();
+ MemoryRecords memRecords = MemoryRecords.readableRecords(value);
+ AtomicBoolean verified = new AtomicBoolean(false);
+ memRecords.batches().forEach(batch -> {
+ for (Record record : batch) {
+ assertFalse(verified.get());
+ BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
+ assertTrue(bk instanceof GroupMetadataKey);
+ GroupMetadataKey gmk = (GroupMetadataKey) bk;
+ assertEquals(groupId, gmk.key());
+
+ GroupMetadata gm = GroupMetadataConstants.readGroupMessageValue(
+ groupId, record.value()
+ );
+ assertEquals(Stable, gm.currentState());
+ assertEquals(1, gm.generationId());
+ assertEquals(Optional.of(protocolType), gm.protocolType());
+ assertEquals("protocol", gm.protocolOrNull());
+ assertTrue(gm.has(memberId));
+ verified.set(true);
+ }
});
+ assertTrue(verified.get());
}
@Test
public void testCommitOffset() throws Exception {
- runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> {
- String memberId = "";
- TopicPartition topicPartition = new TopicPartition("foo", 0);
- groupMetadataManager.addPartitionOwnership(groupPartitionId);
- long offset = 37L;
-
- GroupMetadata group = new GroupMetadata(groupId, Empty);
- groupMetadataManager.addGroup(group);
-
- Map offsets = ImmutableMap.builder()
- .put(topicPartition, OffsetAndMetadata.apply(offset))
- .build();
-
- Map commitErrors = groupMetadataManager.storeOffsets(
- group, memberId, offsets
- ).get();
-
- assertTrue(group.hasOffsets());
- assertFalse(commitErrors.isEmpty());
- Errors maybeError = commitErrors.get(topicPartition);
- assertEquals(Errors.NONE, maybeError);
- assertTrue(group.hasOffsets());
-
- Map cachedOffsets = groupMetadataManager.getOffsets(
- groupId,
- Optional.of(Lists.newArrayList(topicPartition))
- );
- PartitionData maybePartitionResponse = cachedOffsets.get(topicPartition);
- assertNotNull(maybePartitionResponse);
-
- assertEquals(Errors.NONE, maybePartitionResponse.error);
- assertEquals(offset, maybePartitionResponse.offset);
-
- Message message = consumer.receive();
- assertTrue(message.getEventTime() > 0L);
- assertTrue(message.hasKey());
- byte[] key = message.getKeyBytes();
- BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
- assertTrue(groupKey instanceof OffsetKey);
-
- ByteBuffer value = message.getValue();
- MemoryRecords memRecords = MemoryRecords.readableRecords(value);
- AtomicBoolean verified = new AtomicBoolean(false);
- memRecords.batches().forEach(batch -> {
- for (Record record : batch) {
- assertFalse(verified.get());
- BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
- assertTrue(bk instanceof OffsetKey);
- OffsetKey ok = (OffsetKey) bk;
- GroupTopicPartition gtp = ok.key();
- assertEquals(groupId, gtp.group());
- assertEquals(topicPartition, gtp.topicPartition());
-
- OffsetAndMetadata gm = GroupMetadataConstants.readOffsetMessageValue(
- record.value()
- );
- assertEquals(offset, gm.offset());
- verified.set(true);
- }
- });
- assertTrue(verified.get());
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTEBUFFER)
+ .topic(groupMetadataManager.getTopicPartitionName())
+ .subscriptionName("test-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ String memberId = "fakeMemberId";
+ TopicPartition topicPartition = new TopicPartition("foo", 0);
+ groupMetadataManager.addPartitionOwnership(groupPartitionId);
+ long offset = 37L;
+
+ GroupMetadata group = new GroupMetadata(groupId, Empty);
+ groupMetadataManager.addGroup(group);
+
+ Map offsets = ImmutableMap.builder()
+ .put(topicPartition, OffsetAndMetadata.apply(offset))
+ .build();
+
+ Map commitErrors = groupMetadataManager.storeOffsets(
+ group, memberId, offsets
+ ).get();
+
+ assertTrue(group.hasOffsets());
+ assertFalse(commitErrors.isEmpty());
+ Errors maybeError = commitErrors.get(topicPartition);
+ assertEquals(Errors.NONE, maybeError);
+ assertTrue(group.hasOffsets());
+
+ Map cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ Optional.of(Lists.newArrayList(topicPartition))
+ );
+ PartitionData maybePartitionResponse = cachedOffsets.get(topicPartition);
+ assertNotNull(maybePartitionResponse);
+
+ assertEquals(Errors.NONE, maybePartitionResponse.error);
+ assertEquals(offset, maybePartitionResponse.offset);
+
+ Message message = consumer.receive();
+ while (message.getValue().array().length == 0) {
+ // bypass above place holder message.
+ message = consumer.receive();
+ }
+ assertTrue(message.getEventTime() > 0L);
+ assertTrue(message.hasKey());
+ byte[] key = message.getKeyBytes();
+ BaseKey groupKey = GroupMetadataConstants.readMessageKey(ByteBuffer.wrap(key));
+ assertTrue(groupKey instanceof OffsetKey);
+
+ ByteBuffer value = message.getValue();
+ MemoryRecords memRecords = MemoryRecords.readableRecords(value);
+ AtomicBoolean verified = new AtomicBoolean(false);
+ memRecords.batches().forEach(batch -> {
+ for (Record record : batch) {
+ assertFalse(verified.get());
+ BaseKey bk = GroupMetadataConstants.readMessageKey(record.key());
+ assertTrue(bk instanceof OffsetKey);
+ OffsetKey ok = (OffsetKey) bk;
+ GroupTopicPartition gtp = ok.key();
+ assertEquals(groupId, gtp.group());
+ assertEquals(topicPartition, gtp.topicPartition());
+
+ OffsetAndMetadata gm = GroupMetadataConstants.readOffsetMessageValue(
+ record.value()
+ );
+ assertEquals(offset, gm.offset());
+ verified.set(true);
+ }
});
+ assertTrue(verified.get());
}
@Test
public void testTransactionalCommitOffsetCommitted() throws Exception {
- runGroupMetadataManagerConsumerTester("test-commit-offset", (groupMetadataManager, consumer) -> {
- String memberId = "";
- TopicPartition topicPartition = new TopicPartition("foo", 0);
- long offset = 37L;
- long producerId = 232L;
- short producerEpoch = 0;
-
- groupMetadataManager.addPartitionOwnership(groupPartitionId);
-
- GroupMetadata group = new GroupMetadata(groupId, Empty);
- groupMetadataManager.addGroup(group);
-
- Map offsets = ImmutableMap.builder()
- .put(topicPartition, OffsetAndMetadata.apply(offset))
- .build();
-
- CompletableFuture writeOffsetMessageFuture = new CompletableFuture<>();
- AtomicReference> realWriteFutureRef = new AtomicReference<>();
- doAnswer(invocationOnMock -> {
- CompletableFuture realWriteFuture =
- (CompletableFuture) invocationOnMock.callRealMethod();
- realWriteFutureRef.set(realWriteFuture);
- return writeOffsetMessageFuture;
- }).when(groupMetadataManager).storeOffsetMessage(
- any(byte[].class), any(ByteBuffer.class), anyLong()
- );
-
- CompletableFuture