diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 27dcdad3ccb9..cc4c82ef1597 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -107,7 +107,7 @@ public class ConsumerConfig extends AbstractConfig {
* group.protocol
*/
public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
- public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT);
+ public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT);
public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " +
"support \"classic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " +
"used. Otherwise, the classic group protocol will be used.";
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index cbe0bd6ef35a..4804148812e5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -170,12 +170,12 @@ public void testCaseInsensitiveSecurityProtocol() {
}
@Test
- public void testDefaultConsumerGroupConfig() {
+ public void testDefaultGroupProtocol() {
final Map configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
- assertEquals("classic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+ assertEquals(ConsumerConfig.DEFAULT_GROUP_PROTOCOL, consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
assertNull(consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
@@ -199,6 +199,7 @@ public void testRemoteAssignorWithClassicGroupProtocol() {
final Map configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
+ configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
configs.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, remoteAssignorName);
ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(exception.getMessage().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name()));